網站首頁 編程語言 正文
這次我們開始muduo
源代碼的實際編寫,首先我們知道muduo
是LT
模式,Reactor
模式,下圖為Reactor
模式的流程圖[來源1]
然后我們來看下muduo
的整體架構[來源1]
首先muduo
有一個主反應堆mainReactor
以及幾個子反應堆subReactor
,其中子反應堆的個數由用戶使用setThreadNum
函數設置,mainReactor
中主要有一個Acceptor
,當用戶建立新的連接的時候,Acceptor
會將connfd
和對應的事件打包為一個channel
然后采用輪詢的算法,指定將該channel
給所選擇的subReactor
,以后該subReactor
就負責該channel
的所有工作。
TcpServer類
我們按照從上到下的思路進行講解,以下內容我們按照一個簡單的EchoServer
的實現思路來講解,我們知道當我們自己實現一個Server
的時候,會在構造函數中實例化一個TcpServer
EchoServer(EventLoop *loop, const InetAddress &addr, const std::string &name) : server_(loop, addr, name) , loop_(loop) { // 注冊回調函數 server_.setConnectionCallback( std::bind(&EchoServer::onConnection, this, std::placeholders::_1) ); server_.setMessageCallback( std::bind(&EchoServer::onMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3) // 設置合適的loop線程數量 loopthread 不包括baseloop server_.setThreadNum(3); }
于是我們去看下TcpServer
的構造函數是在干什么
TcpServer::TcpServer(EventLoop *loop, const InetAddress &listenAddr, const std::string &nameArg, Option option) : loop_(CheckLoopNotNull(loop)) , ipPort_(listenAddr.toIpPort()) , name_(nameArg) , acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)) , threadPool_(new EventLoopThreadPool(loop, name_)) , connectionCallback_() , messageCallback_() , nextConnId_(1) , started_(0) { // 當有新用戶連接時候,會執行該回調函數 acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this, std::placeholders::_1, std::placeholders::_2)); }
我們只需要關注acceptor_(new Acceptor(loop, listenAddr, option == kReusePort))
和threadPool_(new EventLoopThreadPool(loop, name_))
首先很明確的一點,構造了一個Acceptor
,我們首先要知道Acceptor
主要就是連接新用戶并打包為一個Channel
,所以我們就應該知道Acceptor
按道理應該實現socket
,bind
,listen
,accept
這四個函數。
Acceptor::Acceptor(EventLoop *loop, const InetAddress &listenAddr, bool reuseport) : loop_(loop), acceptSocket_(createNonblocking()) // socket , acceptChannel_(loop, acceptSocket_.fd()), listenning_(false) { acceptSocket_.setReuseAddr(true); acceptSocket_.setReusePort(true); acceptSocket_.bindAddress(listenAddr); // 綁定套接字 // 有新用戶的連接,執行一個回調(打包為channel) acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this)); }
其中Acceptor
中有個acceptSocket_
,其實就是我們平時所用的listenfd
,構造函數中實現了socket
,bind
,而其余的兩個函數的使用在其余代碼
// 開啟服務器監聽 void TcpServer::start() { // 防止一個TcpServer被start多次 if (started_++ == 0) { threadPool_->start(threadInitCallback_); // 啟動底層的loop線程池,這里會按照設定了threadnum設置pool的數量 loop_->runInLoop(std::bind(&Acceptor::listen, acceptor_.get())); } }
我們知道,當我們設置了threadnum
之后,就會有一個mainloop
,那么這個loop_
就是那個mainloop
,其中可以看見這個loop_
就只做一個事情Acceptor::listen
。
void Acceptor::listen() { listenning_ = true; acceptSocket_.listen(); // listen acceptChannel_.enableReading(); // acceptChannel_ => Poller }
這里就實現了listen
函數,還有最后一個函數accept
,我們慢慢向下分析,從代碼可以知道acceptChannel_.enableReading()
之后就會使得這個listenfd
所在的channel
對讀事件感興趣,那什么時候會有讀事件呢,就是當用戶建立新連接的時候,那么我們應該想一下,那當感興趣的事件發生之后,listenfd
應該干什么呢,應該執行一個回調函數呀。注意Acceptor
構造函數中有這樣一行代碼acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));
這就是那個回調,我們去看下handleRead
在干嘛。
// listenfd有事件發生了,就是有新用戶連接了 void Acceptor::handleRead() { InetAddress peerAddr; int connfd = acceptSocket_.accept(&peerAddr); if (connfd >= 0) { // 若用戶實現定義了,則執行,否則說明用戶對新到來的連接沒有需要執行的,所以直接關閉 if (newConnectionCallback_) { newConnectionCallback_(connfd, peerAddr); // 輪詢找到subLoop,喚醒,分發當前的新客戶端的Channel } else { ::close(connfd); } } ... }
這里是不是就實現了accept
函數,至此當用戶建立一個新的連接時候,Acceptor
就會得到一個connfd
和其對應的peerAddr
返回給mainloop
,這時候我們在注意到TcpServer
構造函數中有這樣一行代碼acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this,std::placeholders::_1, std::placeholders::_2));
我們給acceptor_
設置了一個newConnectionCallback_
,于是由上面的代碼就可以知道,if (newConnectionCallback_)
為真,就會執行這個回調函數,于是就會執行TcpServer::newConnection
,我們去看下這個函數是在干嘛。
void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr) { // 輪詢算法選擇一個subloop來管理對應的這個新連接 EventLoop *ioLoop = threadPool_->getNextLoop(); char buf[64] = {0}; snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_); ++nextConnId_; std::string connName = name_ + buf; LOG_INFO("TcpServer::newConnection [%s] - new connection [%s] from %s \n", name_.c_str(), connName.c_str(), peerAddr.toIpPort().c_str()); // 通過sockfd獲取其綁定的本地ip和端口 sockaddr_in local; ::bzero(&local, sizeof local); socklen_t addrlen = sizeof local; if (::getsockname(sockfd, (sockaddr*)&local, &addrlen) < 0) { LOG_ERROR("sockets::getLocalAddr"); } InetAddress localAddr(local); // 根據連接成功的sockfd,創建TcpConnection TcpConnectionPtr conn(new TcpConnection( ioLoop, connName, sockfd, // Socket Channel localAddr, peerAddr)); connections_[connName] = conn; // 下面的回調時用戶設置給TcpServer,TcpServer又設置給TcpConnection,TcpConnetion又設置給Channel,Channel又設置給Poller,Poller通知channel調用這個回調 conn->setConnectionCallback(connectionCallback_); conn->setMessageCallback(messageCallback_); conn->setWriteCompleteCallback(writeCompleteCallback_); // 設置了如何關閉連接的回調 conn->setCloseCallback( std::bind(&TcpServer::removeConnection, this, std::placeholders::_1) ); // 直接調用connectEstablished ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn)); }
這里就比較長了,我先說下大概他干了啥事情:首先通過輪詢找到下一個subloop
,然后將剛剛返回的connfd
和對應的peerAddr
以及localAddr
構造為一個TcpConnection
給subloop
,然后給這個conn
設置了一系列的回調函數,比如讀回調,寫回調,斷開回調等等。下一章我們來說下上面的代碼最后幾行在干嘛。
原文鏈接:https://www.cnblogs.com/shilinkun/archive/2022/04/24/16187509.html
相關推薦
- 2022-05-11 兩分鐘完成創建virtualbox創建k8s集群詳解
- 2022-09-29 基于PyQt5實現一個無線網連接器_python
- 2022-11-15 SQL?Server?ISNULL?不生效原因及解決_MsSql
- 2022-11-24 詳解Linux定時任務Crontab的介紹與使用_linux shell
- 2022-10-15 python中mpi4py的所有基礎使用案例詳解_python
- 2022-04-09 Linux普通用戶使用sudo顯示權限不足,admin is not in the sudoers
- 2022-02-13 error C4996:‘scanf‘:This function or variable may
- 2022-10-04 C語言指針和數組深入探究使用方法_C 語言
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支