日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學(xué)無(wú)先后,達(dá)者為師

網(wǎng)站首頁(yè) 編程語(yǔ)言 正文

muduo源碼分析之TcpServer模塊詳細(xì)介紹_Redis

作者:小坤學(xué)習(xí)園 ? 更新時(shí)間: 2022-06-23 編程語(yǔ)言

這次我們開始muduo源代碼的實(shí)際編寫,首先我們知道muduoLT模式,Reactor模式,下圖為Reactor模式的流程圖[來(lái)源1]

然后我們來(lái)看下muduo的整體架構(gòu)[來(lái)源1]

首先muduo有一個(gè)主反應(yīng)堆mainReactor以及幾個(gè)子反應(yīng)堆subReactor,其中子反應(yīng)堆的個(gè)數(shù)由用戶使用setThreadNum函數(shù)設(shè)置,mainReactor中主要有一個(gè)Acceptor,當(dāng)用戶建立新的連接的時(shí)候,Acceptor會(huì)將connfd和對(duì)應(yīng)的事件打包為一個(gè)channel然后采用輪詢的算法,指定將該channel給所選擇的subReactor,以后該subReactor就負(fù)責(zé)該channel的所有工作。

TcpServer類

我們按照從上到下的思路進(jìn)行講解,以下內(nèi)容我們按照一個(gè)簡(jiǎn)單的EchoServer的實(shí)現(xiàn)思路來(lái)講解,我們知道當(dāng)我們自己實(shí)現(xiàn)一個(gè)Server的時(shí)候,會(huì)在構(gòu)造函數(shù)中實(shí)例化一個(gè)TcpServer

EchoServer(EventLoop *loop,
           const InetAddress &addr, 
           const std::string &name)
    : server_(loop, addr, name)
        , loop_(loop)
    {
        // 注冊(cè)回調(diào)函數(shù)
        server_.setConnectionCallback(
            std::bind(&EchoServer::onConnection, this, std::placeholders::_1)
        );

        server_.setMessageCallback(
            std::bind(&EchoServer::onMessage, this,
                      std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)
        // 設(shè)置合適的loop線程數(shù)量 loopthread 不包括baseloop
        server_.setThreadNum(3);
    }

于是我們?nèi)タ聪?code>TcpServer的構(gòu)造函數(shù)是在干什么

TcpServer::TcpServer(EventLoop *loop,
                const InetAddress &listenAddr,
                const std::string &nameArg,
                Option option)
                : loop_(CheckLoopNotNull(loop))
                , ipPort_(listenAddr.toIpPort())
                , name_(nameArg)
                , acceptor_(new Acceptor(loop, listenAddr, option == kReusePort))
                , threadPool_(new EventLoopThreadPool(loop, name_))
                , connectionCallback_()
                , messageCallback_()
                , nextConnId_(1)
                , started_(0)
{
    // 當(dāng)有新用戶連接時(shí)候,會(huì)執(zhí)行該回調(diào)函數(shù)
    acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this, 
        std::placeholders::_1, std::placeholders::_2));
}

我們只需要關(guān)注acceptor_(new Acceptor(loop, listenAddr, option == kReusePort))threadPool_(new EventLoopThreadPool(loop, name_))
首先很明確的一點(diǎn),構(gòu)造了一個(gè)Acceptor,我們首先要知道Acceptor主要就是連接新用戶并打包為一個(gè)Channel,所以我們就應(yīng)該知道Acceptor按道理應(yīng)該實(shí)現(xiàn)socketbind,listen,accept這四個(gè)函數(shù)。

Acceptor::Acceptor(EventLoop *loop, const InetAddress &listenAddr, bool reuseport)
    : loop_(loop), acceptSocket_(createNonblocking()) // socket
      ,
      acceptChannel_(loop, acceptSocket_.fd()), listenning_(false)
{
    acceptSocket_.setReuseAddr(true);
    acceptSocket_.setReusePort(true);
    acceptSocket_.bindAddress(listenAddr); // 綁定套接字
    // 有新用戶的連接,執(zhí)行一個(gè)回調(diào)(打包為channel)
    acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));
}

其中Acceptor中有個(gè)acceptSocket_,其實(shí)就是我們平時(shí)所用的listenfd,構(gòu)造函數(shù)中實(shí)現(xiàn)了socketbind,而其余的兩個(gè)函數(shù)的使用在其余代碼

// 開啟服務(wù)器監(jiān)聽
void TcpServer::start()
{
	// 防止一個(gè)TcpServer被start多次
    if (started_++ == 0) 
    {
        threadPool_->start(threadInitCallback_); // 啟動(dòng)底層的loop線程池,這里會(huì)按照設(shè)定了threadnum設(shè)置pool的數(shù)量
        loop_->runInLoop(std::bind(&Acceptor::listen, acceptor_.get()));
    }
}

我們知道,當(dāng)我們?cè)O(shè)置了threadnum之后,就會(huì)有一個(gè)mainloop,那么這個(gè)loop_就是那個(gè)mainloop,其中可以看見這個(gè)loop_就只做一個(gè)事情Acceptor::listen。

void Acceptor::listen()
{
    listenning_ = true;
    acceptSocket_.listen();         // listen
    acceptChannel_.enableReading(); // acceptChannel_ => Poller
}

這里就實(shí)現(xiàn)了listen函數(shù),還有最后一個(gè)函數(shù)accept,我們慢慢向下分析,從代碼可以知道acceptChannel_.enableReading()之后就會(huì)使得這個(gè)listenfd所在的channel對(duì)讀事件感興趣,那什么時(shí)候會(huì)有讀事件呢,就是當(dāng)用戶建立新連接的時(shí)候,那么我們應(yīng)該想一下,那當(dāng)感興趣的事件發(fā)生之后,listenfd應(yīng)該干什么呢,應(yīng)該執(zhí)行一個(gè)回調(diào)函數(shù)呀。注意Acceptor構(gòu)造函數(shù)中有這樣一行代碼acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));這就是那個(gè)回調(diào),我們?nèi)タ聪?code>handleRead在干嘛。

// listenfd有事件發(fā)生了,就是有新用戶連接了
void Acceptor::handleRead()
{
    InetAddress peerAddr;
    int connfd = acceptSocket_.accept(&peerAddr);
    if (connfd >= 0)
    {
        // 若用戶實(shí)現(xiàn)定義了,則執(zhí)行,否則說(shuō)明用戶對(duì)新到來(lái)的連接沒有需要執(zhí)行的,所以直接關(guān)閉
        if (newConnectionCallback_)
        {
            newConnectionCallback_(connfd, peerAddr); // 輪詢找到subLoop,喚醒,分發(fā)當(dāng)前的新客戶端的Channel
        }
        else
        {
            ::close(connfd);
        }
    }
    ...
}

這里是不是就實(shí)現(xiàn)了accept函數(shù),至此當(dāng)用戶建立一個(gè)新的連接時(shí)候,Acceptor就會(huì)得到一個(gè)connfd和其對(duì)應(yīng)的peerAddr返回給mainloop,這時(shí)候我們?cè)谧⒁獾?code>TcpServer構(gòu)造函數(shù)中有這樣一行代碼acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this,std::placeholders::_1, std::placeholders::_2));我們給acceptor_設(shè)置了一個(gè)newConnectionCallback_,于是由上面的代碼就可以知道,if (newConnectionCallback_)為真,就會(huì)執(zhí)行這個(gè)回調(diào)函數(shù),于是就會(huì)執(zhí)行TcpServer::newConnection,我們?nèi)タ聪逻@個(gè)函數(shù)是在干嘛。

void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr)
{
    // 輪詢算法選擇一個(gè)subloop來(lái)管理對(duì)應(yīng)的這個(gè)新連接
    EventLoop *ioLoop = threadPool_->getNextLoop(); 
    char buf[64] = {0};
    snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);
    ++nextConnId_;
    std::string connName = name_ + buf;

    LOG_INFO("TcpServer::newConnection [%s] - new connection [%s] from %s \n",
        name_.c_str(), connName.c_str(), peerAddr.toIpPort().c_str());
    // 通過sockfd獲取其綁定的本地ip和端口
    sockaddr_in local;
    ::bzero(&local, sizeof local);
    socklen_t addrlen = sizeof local;
    if (::getsockname(sockfd, (sockaddr*)&local, &addrlen) < 0)
    {
        LOG_ERROR("sockets::getLocalAddr");
    }
    InetAddress localAddr(local);
    // 根據(jù)連接成功的sockfd,創(chuàng)建TcpConnection
    TcpConnectionPtr conn(new TcpConnection(
                            ioLoop,
                            connName,
                            sockfd,   // Socket Channel
                            localAddr,
                            peerAddr));
    connections_[connName] = conn;
	// 下面的回調(diào)時(shí)用戶設(shè)置給TcpServer,TcpServer又設(shè)置給TcpConnection,TcpConnetion又設(shè)置給Channel,Channel又設(shè)置給Poller,Poller通知channel調(diào)用這個(gè)回調(diào)
    conn->setConnectionCallback(connectionCallback_);
    conn->setMessageCallback(messageCallback_);
    conn->setWriteCompleteCallback(writeCompleteCallback_);
    // 設(shè)置了如何關(guān)閉連接的回調(diào)
    conn->setCloseCallback(
        std::bind(&TcpServer::removeConnection, this, std::placeholders::_1)
    );
    // 直接調(diào)用connectEstablished
    ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}

這里就比較長(zhǎng)了,我先說(shuō)下大概他干了啥事情:首先通過輪詢找到下一個(gè)subloop,然后將剛剛返回的connfd和對(duì)應(yīng)的peerAddr以及localAddr構(gòu)造為一個(gè)TcpConnectionsubloop,然后給這個(gè)conn設(shè)置了一系列的回調(diào)函數(shù),比如讀回調(diào),寫回調(diào),斷開回調(diào)等等。下一章我們來(lái)說(shuō)下上面的代碼最后幾行在干嘛。

原文鏈接:https://www.cnblogs.com/shilinkun/archive/2022/04/24/16187509.html

欄目分類
最近更新