日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

muduo源碼分析之TcpServer模塊詳細介紹_Redis

作者:小坤學習園 ? 更新時間: 2022-06-23 編程語言

這次我們開始muduo源代碼的實際編寫,首先我們知道muduoLT模式,Reactor模式,下圖為Reactor模式的流程圖[來源1]

然后我們來看下muduo的整體架構[來源1]

首先muduo有一個主反應堆mainReactor以及幾個子反應堆subReactor,其中子反應堆的個數由用戶使用setThreadNum函數設置,mainReactor中主要有一個Acceptor,當用戶建立新的連接的時候,Acceptor會將connfd和對應的事件打包為一個channel然后采用輪詢的算法,指定將該channel給所選擇的subReactor,以后該subReactor就負責該channel的所有工作。

TcpServer類

我們按照從上到下的思路進行講解,以下內容我們按照一個簡單的EchoServer的實現思路來講解,我們知道當我們自己實現一個Server的時候,會在構造函數中實例化一個TcpServer

EchoServer(EventLoop *loop,
           const InetAddress &addr, 
           const std::string &name)
    : server_(loop, addr, name)
        , loop_(loop)
    {
        // 注冊回調函數
        server_.setConnectionCallback(
            std::bind(&EchoServer::onConnection, this, std::placeholders::_1)
        );

        server_.setMessageCallback(
            std::bind(&EchoServer::onMessage, this,
                      std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)
        // 設置合適的loop線程數量 loopthread 不包括baseloop
        server_.setThreadNum(3);
    }

于是我們去看下TcpServer的構造函數是在干什么

TcpServer::TcpServer(EventLoop *loop,
                const InetAddress &listenAddr,
                const std::string &nameArg,
                Option option)
                : loop_(CheckLoopNotNull(loop))
                , ipPort_(listenAddr.toIpPort())
                , name_(nameArg)
                , acceptor_(new Acceptor(loop, listenAddr, option == kReusePort))
                , threadPool_(new EventLoopThreadPool(loop, name_))
                , connectionCallback_()
                , messageCallback_()
                , nextConnId_(1)
                , started_(0)
{
    // 當有新用戶連接時候,會執行該回調函數
    acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this, 
        std::placeholders::_1, std::placeholders::_2));
}

我們只需要關注acceptor_(new Acceptor(loop, listenAddr, option == kReusePort))threadPool_(new EventLoopThreadPool(loop, name_))
首先很明確的一點,構造了一個Acceptor,我們首先要知道Acceptor主要就是連接新用戶并打包為一個Channel,所以我們就應該知道Acceptor按道理應該實現socketbindlistenaccept這四個函數。

Acceptor::Acceptor(EventLoop *loop, const InetAddress &listenAddr, bool reuseport)
    : loop_(loop), acceptSocket_(createNonblocking()) // socket
      ,
      acceptChannel_(loop, acceptSocket_.fd()), listenning_(false)
{
    acceptSocket_.setReuseAddr(true);
    acceptSocket_.setReusePort(true);
    acceptSocket_.bindAddress(listenAddr); // 綁定套接字
    // 有新用戶的連接,執行一個回調(打包為channel)
    acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));
}

其中Acceptor中有個acceptSocket_,其實就是我們平時所用的listenfd,構造函數中實現了socketbind,而其余的兩個函數的使用在其余代碼

// 開啟服務器監聽
void TcpServer::start()
{
	// 防止一個TcpServer被start多次
    if (started_++ == 0) 
    {
        threadPool_->start(threadInitCallback_); // 啟動底層的loop線程池,這里會按照設定了threadnum設置pool的數量
        loop_->runInLoop(std::bind(&Acceptor::listen, acceptor_.get()));
    }
}

我們知道,當我們設置了threadnum之后,就會有一個mainloop,那么這個loop_就是那個mainloop,其中可以看見這個loop_就只做一個事情Acceptor::listen

void Acceptor::listen()
{
    listenning_ = true;
    acceptSocket_.listen();         // listen
    acceptChannel_.enableReading(); // acceptChannel_ => Poller
}

這里就實現了listen函數,還有最后一個函數accept,我們慢慢向下分析,從代碼可以知道acceptChannel_.enableReading()之后就會使得這個listenfd所在的channel對讀事件感興趣,那什么時候會有讀事件呢,就是當用戶建立新連接的時候,那么我們應該想一下,那當感興趣的事件發生之后,listenfd應該干什么呢,應該執行一個回調函數呀。注意Acceptor構造函數中有這樣一行代碼acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));這就是那個回調,我們去看下handleRead在干嘛。

// listenfd有事件發生了,就是有新用戶連接了
void Acceptor::handleRead()
{
    InetAddress peerAddr;
    int connfd = acceptSocket_.accept(&peerAddr);
    if (connfd >= 0)
    {
        // 若用戶實現定義了,則執行,否則說明用戶對新到來的連接沒有需要執行的,所以直接關閉
        if (newConnectionCallback_)
        {
            newConnectionCallback_(connfd, peerAddr); // 輪詢找到subLoop,喚醒,分發當前的新客戶端的Channel
        }
        else
        {
            ::close(connfd);
        }
    }
    ...
}

這里是不是就實現了accept函數,至此當用戶建立一個新的連接時候,Acceptor就會得到一個connfd和其對應的peerAddr返回給mainloop,這時候我們在注意到TcpServer構造函數中有這樣一行代碼acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this,std::placeholders::_1, std::placeholders::_2));我們給acceptor_設置了一個newConnectionCallback_,于是由上面的代碼就可以知道,if (newConnectionCallback_)為真,就會執行這個回調函數,于是就會執行TcpServer::newConnection,我們去看下這個函數是在干嘛。

void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr)
{
    // 輪詢算法選擇一個subloop來管理對應的這個新連接
    EventLoop *ioLoop = threadPool_->getNextLoop(); 
    char buf[64] = {0};
    snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);
    ++nextConnId_;
    std::string connName = name_ + buf;

    LOG_INFO("TcpServer::newConnection [%s] - new connection [%s] from %s \n",
        name_.c_str(), connName.c_str(), peerAddr.toIpPort().c_str());
    // 通過sockfd獲取其綁定的本地ip和端口
    sockaddr_in local;
    ::bzero(&local, sizeof local);
    socklen_t addrlen = sizeof local;
    if (::getsockname(sockfd, (sockaddr*)&local, &addrlen) < 0)
    {
        LOG_ERROR("sockets::getLocalAddr");
    }
    InetAddress localAddr(local);
    // 根據連接成功的sockfd,創建TcpConnection
    TcpConnectionPtr conn(new TcpConnection(
                            ioLoop,
                            connName,
                            sockfd,   // Socket Channel
                            localAddr,
                            peerAddr));
    connections_[connName] = conn;
	// 下面的回調時用戶設置給TcpServer,TcpServer又設置給TcpConnection,TcpConnetion又設置給Channel,Channel又設置給Poller,Poller通知channel調用這個回調
    conn->setConnectionCallback(connectionCallback_);
    conn->setMessageCallback(messageCallback_);
    conn->setWriteCompleteCallback(writeCompleteCallback_);
    // 設置了如何關閉連接的回調
    conn->setCloseCallback(
        std::bind(&TcpServer::removeConnection, this, std::placeholders::_1)
    );
    // 直接調用connectEstablished
    ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}

這里就比較長了,我先說下大概他干了啥事情:首先通過輪詢找到下一個subloop,然后將剛剛返回的connfd和對應的peerAddr以及localAddr構造為一個TcpConnectionsubloop,然后給這個conn設置了一系列的回調函數,比如讀回調,寫回調,斷開回調等等。下一章我們來說下上面的代碼最后幾行在干嘛。

原文鏈接:https://www.cnblogs.com/shilinkun/archive/2022/04/24/16187509.html

欄目分類
最近更新