網(wǎng)站首頁(yè) 編程語(yǔ)言 正文
這次我們開始muduo
源代碼的實(shí)際編寫,首先我們知道muduo
是LT
模式,Reactor
模式,下圖為Reactor
模式的流程圖[來(lái)源1]
然后我們來(lái)看下muduo
的整體架構(gòu)[來(lái)源1]
首先muduo
有一個(gè)主反應(yīng)堆mainReactor
以及幾個(gè)子反應(yīng)堆subReactor
,其中子反應(yīng)堆的個(gè)數(shù)由用戶使用setThreadNum
函數(shù)設(shè)置,mainReactor
中主要有一個(gè)Acceptor
,當(dāng)用戶建立新的連接的時(shí)候,Acceptor
會(huì)將connfd
和對(duì)應(yīng)的事件打包為一個(gè)channel
然后采用輪詢的算法,指定將該channel
給所選擇的subReactor
,以后該subReactor
就負(fù)責(zé)該channel
的所有工作。
TcpServer類
我們按照從上到下的思路進(jìn)行講解,以下內(nèi)容我們按照一個(gè)簡(jiǎn)單的EchoServer
的實(shí)現(xiàn)思路來(lái)講解,我們知道當(dāng)我們自己實(shí)現(xiàn)一個(gè)Server
的時(shí)候,會(huì)在構(gòu)造函數(shù)中實(shí)例化一個(gè)TcpServer
EchoServer(EventLoop *loop, const InetAddress &addr, const std::string &name) : server_(loop, addr, name) , loop_(loop) { // 注冊(cè)回調(diào)函數(shù) server_.setConnectionCallback( std::bind(&EchoServer::onConnection, this, std::placeholders::_1) ); server_.setMessageCallback( std::bind(&EchoServer::onMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3) // 設(shè)置合適的loop線程數(shù)量 loopthread 不包括baseloop server_.setThreadNum(3); }
于是我們?nèi)タ聪?code>TcpServer的構(gòu)造函數(shù)是在干什么
TcpServer::TcpServer(EventLoop *loop, const InetAddress &listenAddr, const std::string &nameArg, Option option) : loop_(CheckLoopNotNull(loop)) , ipPort_(listenAddr.toIpPort()) , name_(nameArg) , acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)) , threadPool_(new EventLoopThreadPool(loop, name_)) , connectionCallback_() , messageCallback_() , nextConnId_(1) , started_(0) { // 當(dāng)有新用戶連接時(shí)候,會(huì)執(zhí)行該回調(diào)函數(shù) acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this, std::placeholders::_1, std::placeholders::_2)); }
我們只需要關(guān)注acceptor_(new Acceptor(loop, listenAddr, option == kReusePort))
和threadPool_(new EventLoopThreadPool(loop, name_))
首先很明確的一點(diǎn),構(gòu)造了一個(gè)Acceptor
,我們首先要知道Acceptor
主要就是連接新用戶并打包為一個(gè)Channel
,所以我們就應(yīng)該知道Acceptor
按道理應(yīng)該實(shí)現(xiàn)socket
,bind
,listen
,accept
這四個(gè)函數(shù)。
Acceptor::Acceptor(EventLoop *loop, const InetAddress &listenAddr, bool reuseport) : loop_(loop), acceptSocket_(createNonblocking()) // socket , acceptChannel_(loop, acceptSocket_.fd()), listenning_(false) { acceptSocket_.setReuseAddr(true); acceptSocket_.setReusePort(true); acceptSocket_.bindAddress(listenAddr); // 綁定套接字 // 有新用戶的連接,執(zhí)行一個(gè)回調(diào)(打包為channel) acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this)); }
其中Acceptor
中有個(gè)acceptSocket_
,其實(shí)就是我們平時(shí)所用的listenfd
,構(gòu)造函數(shù)中實(shí)現(xiàn)了socket
,bind
,而其余的兩個(gè)函數(shù)的使用在其余代碼
// 開啟服務(wù)器監(jiān)聽 void TcpServer::start() { // 防止一個(gè)TcpServer被start多次 if (started_++ == 0) { threadPool_->start(threadInitCallback_); // 啟動(dòng)底層的loop線程池,這里會(huì)按照設(shè)定了threadnum設(shè)置pool的數(shù)量 loop_->runInLoop(std::bind(&Acceptor::listen, acceptor_.get())); } }
我們知道,當(dāng)我們?cè)O(shè)置了threadnum
之后,就會(huì)有一個(gè)mainloop
,那么這個(gè)loop_
就是那個(gè)mainloop
,其中可以看見這個(gè)loop_
就只做一個(gè)事情Acceptor::listen
。
void Acceptor::listen() { listenning_ = true; acceptSocket_.listen(); // listen acceptChannel_.enableReading(); // acceptChannel_ => Poller }
這里就實(shí)現(xiàn)了listen
函數(shù),還有最后一個(gè)函數(shù)accept
,我們慢慢向下分析,從代碼可以知道acceptChannel_.enableReading()
之后就會(huì)使得這個(gè)listenfd
所在的channel
對(duì)讀事件感興趣,那什么時(shí)候會(huì)有讀事件呢,就是當(dāng)用戶建立新連接的時(shí)候,那么我們應(yīng)該想一下,那當(dāng)感興趣的事件發(fā)生之后,listenfd
應(yīng)該干什么呢,應(yīng)該執(zhí)行一個(gè)回調(diào)函數(shù)呀。注意Acceptor
構(gòu)造函數(shù)中有這樣一行代碼acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));
這就是那個(gè)回調(diào),我們?nèi)タ聪?code>handleRead在干嘛。
// listenfd有事件發(fā)生了,就是有新用戶連接了 void Acceptor::handleRead() { InetAddress peerAddr; int connfd = acceptSocket_.accept(&peerAddr); if (connfd >= 0) { // 若用戶實(shí)現(xiàn)定義了,則執(zhí)行,否則說(shuō)明用戶對(duì)新到來(lái)的連接沒有需要執(zhí)行的,所以直接關(guān)閉 if (newConnectionCallback_) { newConnectionCallback_(connfd, peerAddr); // 輪詢找到subLoop,喚醒,分發(fā)當(dāng)前的新客戶端的Channel } else { ::close(connfd); } } ... }
這里是不是就實(shí)現(xiàn)了accept
函數(shù),至此當(dāng)用戶建立一個(gè)新的連接時(shí)候,Acceptor
就會(huì)得到一個(gè)connfd
和其對(duì)應(yīng)的peerAddr
返回給mainloop
,這時(shí)候我們?cè)谧⒁獾?code>TcpServer構(gòu)造函數(shù)中有這樣一行代碼acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this,std::placeholders::_1, std::placeholders::_2));
我們給acceptor_
設(shè)置了一個(gè)newConnectionCallback_
,于是由上面的代碼就可以知道,if (newConnectionCallback_)
為真,就會(huì)執(zhí)行這個(gè)回調(diào)函數(shù),于是就會(huì)執(zhí)行TcpServer::newConnection
,我們?nèi)タ聪逻@個(gè)函數(shù)是在干嘛。
void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr) { // 輪詢算法選擇一個(gè)subloop來(lái)管理對(duì)應(yīng)的這個(gè)新連接 EventLoop *ioLoop = threadPool_->getNextLoop(); char buf[64] = {0}; snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_); ++nextConnId_; std::string connName = name_ + buf; LOG_INFO("TcpServer::newConnection [%s] - new connection [%s] from %s \n", name_.c_str(), connName.c_str(), peerAddr.toIpPort().c_str()); // 通過sockfd獲取其綁定的本地ip和端口 sockaddr_in local; ::bzero(&local, sizeof local); socklen_t addrlen = sizeof local; if (::getsockname(sockfd, (sockaddr*)&local, &addrlen) < 0) { LOG_ERROR("sockets::getLocalAddr"); } InetAddress localAddr(local); // 根據(jù)連接成功的sockfd,創(chuàng)建TcpConnection TcpConnectionPtr conn(new TcpConnection( ioLoop, connName, sockfd, // Socket Channel localAddr, peerAddr)); connections_[connName] = conn; // 下面的回調(diào)時(shí)用戶設(shè)置給TcpServer,TcpServer又設(shè)置給TcpConnection,TcpConnetion又設(shè)置給Channel,Channel又設(shè)置給Poller,Poller通知channel調(diào)用這個(gè)回調(diào) conn->setConnectionCallback(connectionCallback_); conn->setMessageCallback(messageCallback_); conn->setWriteCompleteCallback(writeCompleteCallback_); // 設(shè)置了如何關(guān)閉連接的回調(diào) conn->setCloseCallback( std::bind(&TcpServer::removeConnection, this, std::placeholders::_1) ); // 直接調(diào)用connectEstablished ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn)); }
這里就比較長(zhǎng)了,我先說(shuō)下大概他干了啥事情:首先通過輪詢找到下一個(gè)subloop
,然后將剛剛返回的connfd
和對(duì)應(yīng)的peerAddr
以及localAddr
構(gòu)造為一個(gè)TcpConnection
給subloop
,然后給這個(gè)conn
設(shè)置了一系列的回調(diào)函數(shù),比如讀回調(diào),寫回調(diào),斷開回調(diào)等等。下一章我們來(lái)說(shuō)下上面的代碼最后幾行在干嘛。
原文鏈接:https://www.cnblogs.com/shilinkun/archive/2022/04/24/16187509.html
相關(guān)推薦
- 2022-07-06 R語(yǔ)言可視化ggplot2繪制24小時(shí)動(dòng)態(tài)血糖圖_R語(yǔ)言
- 2022-03-28 Python獲取網(wǎng)絡(luò)時(shí)間戳的兩種方法詳解_python
- 2023-07-29 使用 XMLHttpRequest 實(shí)現(xiàn) ajax
- 2021-12-13 Go中時(shí)間與時(shí)區(qū)問題的深入講解_Golang
- 2022-05-10 動(dòng)態(tài)獲取從 [Element-ui] 的 Select (選擇器)選中的 label 值得 id 與
- 2023-08-30 linux服務(wù)器使用rsync 和 inotify或者sersync 實(shí)現(xiàn)服務(wù)器之間文件實(shí)時(shí)同步
- 2022-08-19 Python數(shù)據(jù)處理pandas讀寫操作IO工具CSV解析_python
- 2022-07-18 數(shù)據(jù)結(jié)構(gòu) III 深入理解棧和隊(duì)列實(shí)現(xiàn)
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細(xì)win安裝深度學(xué)習(xí)環(huán)境2025年最新版(
- Linux 中運(yùn)行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲(chǔ)小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎(chǔ)操作-- 運(yùn)算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認(rèn)證信息的處理
- Spring Security之認(rèn)證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權(quán)
- redisson分布式鎖中waittime的設(shè)
- maven:解決release錯(cuò)誤:Artif
- restTemplate使用總結(jié)
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實(shí)現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結(jié)構(gòu)-簡(jiǎn)單動(dòng)態(tài)字符串(SD
- arthas操作spring被代理目標(biāo)對(duì)象命令
- Spring中的單例模式應(yīng)用詳解
- 聊聊消息隊(duì)列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠(yuǎn)程分支