網站首頁 編程語言 正文
Redis6.x io事件驅動模型
一、redis啟動流程
- server.c中的main方法是啟動的入口,啟動主要分為三個步驟:initServer、InitServerLast和aeMain(server.el)。
二、 initServer初始化Server啟動相關的結構體
- aeCreateEventLoop:創建事件循環組el對象aeEventLoop
typedef struct aeEventLoop {
int maxfd; /* highest file descriptor currently registered */
int setsize; /* max number of file descriptors tracked */
long long timeEventNextId;
aeFileEvent *events; /* Registered events */
aeFiredEvent *fired; /* Fired events */
aeTimeEvent *timeEventHead;
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
aeBeforeSleepProc *aftersleep;
int flags;
} aeEventLoop;
- 設置accept監聽和監聽處理函數acceptTcpHandler
- aeCreateFileEvent函數設置accept監聽和文件描述符fd對應的aeFileEvent監聽函數rfileProc。其中accept監聽是由aeApiAddEvent函數實現的,不同操作系統,aeApiAddEvent的實現方式不同的,生產環境大部分操作系統是linux系統,所以我們主要跟蹤aeApiAddEvent的epoll實現,epoll_ctl()綁定文件描述符fd和ADD操作事件,后續el主線程通過epoll_await()監聽事件的發生。
- 事件循環組el監聽到accept事件后會根據fd調用對應accept監聽處理函數acceptTcpHandler,通過系統函數accept()獲得客戶端socket對應的cfd,connCreateAcceptedSocket通過cfd創建鏈接conn,給conn中的客戶端對應的fd綁定監聽并設置監聽函數readQueryFromClient(),最后,生成客戶端client結構體,并保存在server.clients。
readQueryFromClient是客戶端請求的入口函數,接收客戶端的具體流程,在另外一篇文章介紹
/*networking:acceptTcpHandler. accept監聽處理函數*/
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[NET_IP_STR_LEN];
UNUSED(el);
UNUSED(mask);
UNUSED(privdata);
while(max--) {
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
serverLog(LL_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
anetCloexec(cfd);
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
}
}
//創建connection
connection *connCreateSocket() {
connection *conn = zcalloc(sizeof(connection));
conn->type = &CT_Socket;
conn->fd = -1;
return conn;
}
ConnectionType CT_Socket = {
.ae_handler = connSocketEventHandler,
.close = connSocketClose,
.write = connSocketWrite,
.read = connSocketRead,
.accept = connSocketAccept,
.connect = connSocketConnect,
.set_write_handler = connSocketSetWriteHandler,
.set_read_handler = connSocketSetReadHandler,
.get_last_error = connSocketGetLastError,
.blocking_connect = connSocketBlockingConnect,
.sync_write = connSocketSyncWrite,
.sync_read = connSocketSyncRead,
.sync_readline = connSocketSyncReadLine,
.get_type = connSocketGetType
}
static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
if (func == conn->read_handler) return C_OK;
conn->read_handler = func;
if (!conn->read_handler)
aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);
else
if (aeCreateFileEvent(server.el,conn->fd,
AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
return C_OK;
}
void linkClient(client *c) {
listAddNodeTail(server.clients,c);
/* Note that we remember the linked list node where the client is stored,
* this way removing the client in unlinkClient() will not require
* a linear scan, but just a constant time operation. */
c->client_list_node = listLast(server.clients);
uint64_t id = htonu64(c->id);
raxInsert(server.clients_index,(unsigned char*)&id,sizeof(id),c,NULL);
}
三、InitServerLast 初始化設置IO線程
void InitServerLast() {
bioInit();
initThreadedIO();
set_jemalloc_bg_thread(server.jemalloc_bg_thread);
server.initial_memory_usage = zmalloc_used_memory();
}
/* 為 threaded I/O初始化的數據 */
void initThreadedIO(void) {
server.io_threads_active = 0;
/*只有一個io現場的話,不需要開啟初始化其他線程,io操作在main線程中 */
if (server.io_threads_num == 1) return;
if (server.io_threads_num > IO_THREADS_MAX_NUM) {
serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
"The maximum number is %d.", IO_THREADS_MAX_NUM);
exit(1);
}
/* Spawn and initialize the I/O threads. */
for (int i = 0; i < server.io_threads_num; i++) {
/* Things we do for all the threads including the main thread. */
io_threads_list[i] = listCreate();
if (i == 0) continue; /* Thread 0 is the main thread. */
/* Things we do only for the additional threads. */
pthread_t tid;
pthread_mutex_init(&io_threads_mutex[i],NULL);
setIOPendingCount(i, 0);
pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
exit(1);
}
io_threads[i] = tid;
}
}
四、aeMain 主線程
/*主線程el事件循環組*/
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
aeProcessEvents(eventLoop,
AE_ALL_EVENTS | AE_CALL_BEFORE_SLEEP| AE_CALL_AFTER_SLEEP);
}
}
- aeMain是主線程事件循環組el的具體實現,以時間流水線時間片的方式執行,循環執行四個步驟: eventLoop->beforesleep(eventLoop)、aeApiPoll(eventLoop, tvp)、 eventLoop->aftersleep(eventLoop)和處理aeApiPoll事件的功能。
- 這里主要介紹aeApiPoll(eventLoop, tvp)和處理aeApiPoll事件的功能:
- aeApiPoll和前面介紹的aeApiAddEvent一樣,根據不同的操作系統有不同的實現。這里linux系統,用epoll_wait來多路復用監聽accept和socket的read事件,和前面注冊的aeCreateFileEvent具體實現epoll_ctl對應。
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
- 處理aeApiPoll事件,根據事件的文件描述符df,調用對應的event的事件中的回調函數fe->rfileProc(eventLoop,fd,fe->clientData,mask),rfileProc具體實現是前面介紹的通過aeCreateFileEvent注冊的acceptTcpHandler或readQueryFromClient。
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int fired = 0; /* Number of events fired for current fd. */
//根據mask&AE_READABLE類型,調用讀處理函數
if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
}
/* Fire the writable event. */
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
/* If we have to invert the call, fire the readable event now
* after the writable one. */
if (invert) {
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
if ((fe->mask & mask & AE_READABLE) &&
(!fired || fe->wfileProc != fe->rfileProc))
{
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
processed++;
}
原文鏈接:https://blog.csdn.net/qlxin2080/article/details/126273430
- 上一篇:React組件通信
- 下一篇:python中的函數和變量的用法
相關推薦
- 2023-10-15 el-input有時候添加不了有時候刪不了
- 2022-09-19 Nginx最大連接數配置詳解_nginx
- 2022-09-22 Mybaits一級緩存和二級緩存分別是什么,區別是什么?
- 2022-09-29 DevExpress的DateEdit設置顯示日期和時間的方法_C#教程
- 2024-07-13 Springboot使用注解實現權限校驗
- 2022-07-08 Redis中Redisson布隆過濾器的學習_Redis
- 2022-03-26 C++函數指針的用法詳解_C 語言
- 2022-04-11 Android開發中checkBox自定義樣式
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支