日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

淺談Redis6.x io事件驅動模型

作者:cap_qin 更新時間: 2022-08-19 編程語言

Redis6.x io事件驅動模型

一、redis啟動流程

  • server.c中的main方法是啟動的入口,啟動主要分為三個步驟:initServer、InitServerLast和aeMain(server.el)。在這里插入圖片描述

二、 initServer初始化Server啟動相關的結構體

在這里插入圖片描述

  • aeCreateEventLoop:創建事件循環組el對象aeEventLoop
typedef struct aeEventLoop {
    int maxfd;   /* highest file descriptor currently registered */
    int setsize; /* max number of file descriptors tracked */
    long long timeEventNextId;
    aeFileEvent *events; /* Registered events */
    aeFiredEvent *fired; /* Fired events */
    aeTimeEvent *timeEventHead;
    int stop;
    void *apidata; /* This is used for polling API specific data */
    aeBeforeSleepProc *beforesleep;
    aeBeforeSleepProc *aftersleep;
    int flags;
} aeEventLoop;
  • 設置accept監聽和監聽處理函數acceptTcpHandler
    在這里插入圖片描述
  1. aeCreateFileEvent函數設置accept監聽和文件描述符fd對應的aeFileEvent監聽函數rfileProc。其中accept監聽是由aeApiAddEvent函數實現的,不同操作系統,aeApiAddEvent的實現方式不同的,生產環境大部分操作系統是linux系統,所以我們主要跟蹤aeApiAddEvent的epoll實現,epoll_ctl()綁定文件描述符fd和ADD操作事件,后續el主線程通過epoll_await()監聽事件的發生。
  2. 事件循環組el監聽到accept事件后會根據fd調用對應accept監聽處理函數acceptTcpHandler,通過系統函數accept()獲得客戶端socket對應的cfd,connCreateAcceptedSocket通過cfd創建鏈接conn,給conn中的客戶端對應的fd綁定監聽并設置監聽函數readQueryFromClient(),最后,生成客戶端client結構體,并保存在server.clients。
    readQueryFromClient是客戶端請求的入口函數,接收客戶端的具體流程,在另外一篇文章介紹
/*networking:acceptTcpHandler. accept監聽處理函數*/
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    char cip[NET_IP_STR_LEN];
    UNUSED(el);
    UNUSED(mask);
    UNUSED(privdata);

    while(max--) {
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                serverLog(LL_WARNING,
                    "Accepting client connection: %s", server.neterr);
            return;
        }
        anetCloexec(cfd);
        serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
        acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
    }
}
//創建connection
connection *connCreateSocket() {
    connection *conn = zcalloc(sizeof(connection));
    conn->type = &CT_Socket;
    conn->fd = -1;

    return conn;
}

ConnectionType CT_Socket = {
    .ae_handler = connSocketEventHandler,
    .close = connSocketClose,
    .write = connSocketWrite,
    .read = connSocketRead,
    .accept = connSocketAccept,
    .connect = connSocketConnect,
    .set_write_handler = connSocketSetWriteHandler,
    .set_read_handler = connSocketSetReadHandler,
    .get_last_error = connSocketGetLastError,
    .blocking_connect = connSocketBlockingConnect,
    .sync_write = connSocketSyncWrite,
    .sync_read = connSocketSyncRead,
    .sync_readline = connSocketSyncReadLine,
    .get_type = connSocketGetType
}
static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
    if (func == conn->read_handler) return C_OK;

    conn->read_handler = func;
    if (!conn->read_handler)
        aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);
    else
        if (aeCreateFileEvent(server.el,conn->fd,
                    AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
    return C_OK;
}
void linkClient(client *c) {
    listAddNodeTail(server.clients,c);
    /* Note that we remember the linked list node where the client is stored,
     * this way removing the client in unlinkClient() will not require
     * a linear scan, but just a constant time operation. */
    c->client_list_node = listLast(server.clients);
    uint64_t id = htonu64(c->id);
    raxInsert(server.clients_index,(unsigned char*)&id,sizeof(id),c,NULL);
}

三、InitServerLast 初始化設置IO線程

void InitServerLast() {
    bioInit();
    initThreadedIO();
    set_jemalloc_bg_thread(server.jemalloc_bg_thread);
    server.initial_memory_usage = zmalloc_used_memory();
}
/* 為 threaded I/O初始化的數據 */
void initThreadedIO(void) {
    server.io_threads_active = 0; 

    /*只有一個io現場的話,不需要開啟初始化其他線程,io操作在main線程中 */
    if (server.io_threads_num == 1) return;

    if (server.io_threads_num > IO_THREADS_MAX_NUM) {
        serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
                             "The maximum number is %d.", IO_THREADS_MAX_NUM);
        exit(1);
    }

    /* Spawn and initialize the I/O threads. */
    for (int i = 0; i < server.io_threads_num; i++) {
        /* Things we do for all the threads including the main thread. */
        io_threads_list[i] = listCreate();
        if (i == 0) continue; /* Thread 0 is the main thread. */

        /* Things we do only for the additional threads. */
        pthread_t tid;
        pthread_mutex_init(&io_threads_mutex[i],NULL);
        setIOPendingCount(i, 0);
        pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
        if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
            serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
            exit(1);
        }
        io_threads[i] = tid;
    }
}

四、aeMain 主線程

/*主線程el事件循環組*/
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        aeProcessEvents(eventLoop, 
        AE_ALL_EVENTS | AE_CALL_BEFORE_SLEEP| AE_CALL_AFTER_SLEEP);
    }
}
  • aeMain是主線程事件循環組el的具體實現,以時間流水線時間片的方式執行,循環執行四個步驟: eventLoop->beforesleep(eventLoop)、aeApiPoll(eventLoop, tvp)、 eventLoop->aftersleep(eventLoop)和處理aeApiPoll事件的功能。
    在這里插入圖片描述
  • 這里主要介紹aeApiPoll(eventLoop, tvp)和處理aeApiPoll事件的功能:
  1. aeApiPoll和前面介紹的aeApiAddEvent一樣,根據不同的操作系統有不同的實現。這里linux系統,用epoll_wait來多路復用監聽accept和socket的read事件,和前面注冊的aeCreateFileEvent具體實現epoll_ctl對應。
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);
    if (retval > 0) {
        int j;

        numevents = retval;
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;

            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}
  1. 處理aeApiPoll事件,根據事件的文件描述符df,調用對應的event的事件中的回調函數fe->rfileProc(eventLoop,fd,fe->clientData,mask),rfileProc具體實現是前面介紹的通過aeCreateFileEvent注冊的acceptTcpHandler或readQueryFromClient。
 for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int fired = 0; /* Number of events fired for current fd. */

             //根據mask&AE_READABLE類型,調用讀處理函數   
            if (!invert && fe->mask & mask & AE_READABLE) {
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
            }

            /* Fire the writable event. */
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            /* If we have to invert the call, fire the readable event now
             * after the writable one. */
            if (invert) {
                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
                if ((fe->mask & mask & AE_READABLE) &&
                    (!fired || fe->wfileProc != fe->rfileProc))
                {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }
            processed++;
        }

原文鏈接:https://blog.csdn.net/qlxin2080/article/details/126273430

欄目分類
最近更新