網(wǎng)站首頁 編程語言 正文
一、Reactor介紹
reactor設(shè)計(jì)模式是event-driven architecture的一種實(shí)現(xiàn)方式,處理多個(gè)客戶端并發(fā)的向服務(wù)端請(qǐng)求服務(wù)的場景。每種服務(wù)在服務(wù)端可能由多個(gè)方法組成。reactor會(huì)解耦并發(fā)請(qǐng)求的服務(wù)并分發(fā)給對(duì)應(yīng)的事件處理器來處理。
中心思想是將所有要處理的I/o事件注冊(cè)到一個(gè)中心I/o多路復(fù)用器上,同時(shí)主線程/進(jìn)程阻塞在多路復(fù)用器上;一旦有I/o事件到來或是準(zhǔn)備就緒(文件描述符或socket可讀、寫),多路復(fù)用器返回并將事先注冊(cè)的相應(yīng)l/o事件分發(fā)到對(duì)應(yīng)的處理器中。
處理機(jī)制為:主程序?qū)⑹录约皩?duì)應(yīng)事件處理的方法在Reactor上進(jìn)行注冊(cè), 如果相應(yīng)的事件發(fā)生,Reactor將會(huì)主動(dòng)調(diào)用事件注冊(cè)的接口,即 回調(diào)函數(shù).
二、代碼實(shí)現(xiàn)
前提準(zhǔn)備:1單例模式:單例模式(Singleton Pattern,也稱為單件模式),使用最廣泛的設(shè)計(jì)模式之一。其意圖是保證一個(gè)類(結(jié)構(gòu)體)僅有一個(gè)實(shí)例,并提供一個(gè)訪問它的全局訪問點(diǎn),該實(shí)例被所有程序模塊共享。
2.回調(diào)函數(shù):把一段可執(zhí)行的代碼像參數(shù)傳遞那樣傳給其他代碼,而這段代碼會(huì)在某個(gè)時(shí)刻被調(diào)用執(zhí)行,這就叫做回調(diào)。
對(duì)epoll反應(yīng)堆中結(jié)構(gòu)體定義
/*fd包含的屬性*/
struct nitem { // fd
int fd; //要監(jiān)聽的文件描述符
int status; //是否在監(jiān)聽:1->在紅黑樹上(監(jiān)聽),0->不在(不監(jiān)聽)
int events; //對(duì)應(yīng)的監(jiān)聽事件, EPOLLIN和EPOLLOUT(不同的事件,走不同的回調(diào)函數(shù))
void *arg; //指向自己結(jié)構(gòu)體指針
#if 0
NCALLBACK callback;
#else
NCALLBACK *readcb; // epollin
NCALLBACK *writecb; // epollout
NCALLBACK *acceptcb; // epollin
#endif
unsigned char sbuffer[BUFFER_LENGTH]; //
int slength;
unsigned char rbuffer[BUFFER_LENGTH];
int rlength;
};
/*分塊存儲(chǔ)*/
struct itemblock {
struct itemblock *next;
struct nitem *items;
};
/*epoll反應(yīng)堆中包括通信的fd以及epoll的(epfd)*/
struct reactor {
int epfd;
struct itemblock *head;
};
單例模式,創(chuàng)建reactor的一個(gè)實(shí)例
/*單例模式*/
struct reactor *instance = NULL;
int init_reactor(struct reactor *r) {
if (r == NULL) return -1;
int epfd = epoll_create(1); //int size
r->epfd = epfd;
// fd --> item
r->head = (struct itemblock*)malloc(sizeof(struct itemblock));
if (r->head == NULL) {
close(epfd);
return -2;
}
memset(r->head, 0, sizeof(struct itemblock));
r->head->items = (struct nitem *)malloc(MAX_EPOLL_EVENT * sizeof(struct nitem));
if (r->head->items == NULL) {
free(r->head);
close(epfd);
return -2;
}
memset(r->head->items, 0, (MAX_EPOLL_EVENT * sizeof(struct nitem)));
r->head->next = NULL;
return 0;
}
struct reactor *getInstance(void) { //singleton
if (instance == NULL) {
instance = (struct reactor *)malloc(sizeof(struct reactor));
if (instance == NULL) return NULL;
memset(instance, 0, sizeof(struct reactor));
if (0 > init_reactor(instance)) {
free(instance);
return NULL;
}
}
return instance;
}
事件注冊(cè)
/*nreactor_set_event(listenfd, accept_callback, ACCEPT_CB, NULL);*/
/*nreactor_set_event(fd, read_callback, READ_CB, NULL);*/
/*fd找到對(duì)應(yīng)事件*/
/*驅(qū)動(dòng)注冊(cè)*/
int nreactor_set_event(int fd, NCALLBACK cb, int event, void *arg) {
struct reactor *r = getInstance();
struct epoll_event ev = {0};
//1
if (event == READ_CB) {
r->head->items[fd].fd = fd;
r->head->items[fd].readcb = cb;
r->head->items[fd].arg = arg;
ev.events = EPOLLIN;
}
//2
else if (event == WRITE_CB) {
r->head->items[fd].fd = fd;
r->head->items[fd].writecb = cb;
r->head->items[fd].arg = arg;
ev.events = EPOLLOUT;
}
//3
else if (event == ACCEPT_CB) {
r->head->items[fd].fd = fd;
r->head->items[fd].acceptcb = cb; //回調(diào)函數(shù)
r->head->items[fd].arg = arg;
ev.events = EPOLLIN;
}
ev.data.ptr = &r->head->items[fd];
/*NOSET_CB 0*/
if (r->head->items[fd].events == NOSET_CB) {
if (epoll_ctl(r->epfd, EPOLL_CTL_ADD, fd, &ev) < 0) {
printf("epoll_ctl EPOLL_CTL_ADD failed, %d\n", errno);
return -1;
}
r->head->items[fd].events = event;
} else if (r->head->items[fd].events != event) {
if (epoll_ctl(r->epfd, EPOLL_CTL_MOD, fd, &ev) < 0) {
printf("epoll_ctl EPOLL_CTL_MOD failed\n");
return -1;
}
r->head->items[fd].events = event;
}
return 0;
}
回調(diào)函數(shù)書寫
int write_callback(int fd, int event, void *arg) {
struct reactor *R = getInstance();
unsigned char *sbuffer = R->head->items[fd].sbuffer;
int length = R->head->items[fd].slength;
int ret = send(fd, sbuffer, length, 0);
if (ret < length) {
nreactor_set_event(fd, write_callback, WRITE_CB, NULL);
} else {
nreactor_set_event(fd, read_callback, READ_CB, NULL);
}
return 0;
}
// 5k qps
int read_callback(int fd, int event, void *arg) {
struct reactor *R = getInstance();
unsigned char *buffer = R->head->items[fd].rbuffer;
#if 0 //ET
int idx = 0, ret = 0;
while (idx < BUFFER_LENGTH) {
ret = recv(fd, buffer+idx, BUFFER_LENGTH-idx, 0);
if (ret == -1) {
break;
} else if (ret > 0) {
idx += ret;
} else {// == 0
break;
}
}
if (idx == BUFFER_LENGTH && ret != -1) {
nreactor_set_event(fd, read_callback, READ_CB, NULL);
} else if (ret == 0) {
nreactor_set_event
//close(fd);
} else {
nreactor_set_event(fd, write_callback, WRITE_CB, NULL);
}
#else //LT
int ret = recv(fd, buffer, BUFFER_LENGTH, 0);
if (ret == 0) { // fin
nreactor_del_event(fd, NULL, 0, NULL);
close(fd);
} else if (ret > 0) {
unsigned char *sbuffer = R->head->items[fd].sbuffer;
memcpy(sbuffer, buffer, ret);
R->head->items[fd].slength = ret;
printf("readcb: %s\n", sbuffer);
nreactor_set_event(fd, write_callback, WRITE_CB, NULL);
}
#endif
}
// web server
// ET / LT
int accept_callback(int fd, int event, void *arg) {
int connfd;
struct sockaddr_in client;
socklen_t len = sizeof(client);
if ((connfd = accept(fd, (struct sockaddr *)&client, &len)) == -1) {
printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno);
return 0;
}
nreactor_set_event(connfd, read_callback, READ_CB, NULL);
}
監(jiān)聽描述符變化
// accept --> EPOLL
/*epoll_wait監(jiān)聽0*/
int reactor_loop(int listenfd) {
struct reactor *R = getInstance();
struct epoll_event events[POLL_SIZE] = {0};
while (1) {
int nready = epoll_wait(R->epfd, events, POLL_SIZE, -1);
if (nready == -1) {
continue;
}
int i = 0;
for (i = 0;i < nready;i ++) {
struct nitem *item = (struct nitem *)events[i].data.ptr;
int connfd = item->fd;
if (connfd == listenfd) { //
item->acceptcb(listenfd, 0, NULL);
} else {
if (events[i].events & EPOLLIN) { //
item->readcb(connfd, 0, NULL);
}
if (events[i].events & EPOLLOUT) {
item->writecb(connfd, 0, NULL);
}
}
}
}
return 0;
}
完整代碼實(shí)現(xiàn)
#define MAXLNE 4096
#define POLL_SIZE 1024
#define BUFFER_LENGTH 1024
#define MAX_EPOLL_EVENT 1024
#define NOSET_CB 0
#define READ_CB 1
#define WRITE_CB 2
#define ACCEPT_CB 3
/*單例模式*/
typedef int NCALLBACK(int fd, int event, void *arg);
/*fd包含的屬性*/
struct nitem { // fd
int fd; //要監(jiān)聽的文件描述符
int status; //是否在監(jiān)聽:1->在紅黑樹上(監(jiān)聽),0->不在(不監(jiān)聽)
int events; //對(duì)應(yīng)的監(jiān)聽事件, EPOLLIN和EPOLLOUT(不同的事件,走不同的回調(diào)函數(shù))
void *arg; //指向自己結(jié)構(gòu)體指針
#if 0
NCALLBACK callback;
#else
NCALLBACK *readcb; // epollin
NCALLBACK *writecb; // epollout
NCALLBACK *acceptcb; // epollin
#endif
unsigned char sbuffer[BUFFER_LENGTH]; //
int slength;
unsigned char rbuffer[BUFFER_LENGTH];
int rlength;
};
/*分塊存儲(chǔ)*/
struct itemblock {
struct itemblock *next;
struct nitem *items;
};
/*epoll反應(yīng)堆中包括通信的fd以及epoll的(epfd)*/
struct reactor {
int epfd;
struct itemblock *head;
};
/*初始化結(jié)構(gòu)體*/
int init_reactor(struct reactor *r);
int read_callback(int fd, int event, void *arg);
int write_callback(int fd, int event, void *arg);
int accept_callback(int fd, int event, void *arg);
/*單例模式*/
struct reactor *instance = NULL;
struct reactor *getInstance(void) { //singleton
if (instance == NULL) {
instance = (struct reactor *)malloc(sizeof(struct reactor));
if (instance == NULL) return NULL;
memset(instance, 0, sizeof(struct reactor));
if (0 > init_reactor(instance)) {
free(instance);
return NULL;
}
}
return instance;
}
/*nreactor_set_event(listenfd, accept_callback, ACCEPT_CB, NULL);*/
/*nreactor_set_event(fd, read_callback, READ_CB, NULL);*/
/*fd找到對(duì)應(yīng)事件*/
/*驅(qū)動(dòng)注冊(cè)*/
int nreactor_set_event(int fd, NCALLBACK cb, int event, void *arg) {
struct reactor *r = getInstance();
struct epoll_event ev = {0};
//1
if (event == READ_CB) {
r->head->items[fd].fd = fd;
r->head->items[fd].readcb = cb;
r->head->items[fd].arg = arg;
ev.events = EPOLLIN;
}
//2
else if (event == WRITE_CB) {
r->head->items[fd].fd = fd;
r->head->items[fd].writecb = cb;
r->head->items[fd].arg = arg;
ev.events = EPOLLOUT;
}
//3
else if (event == ACCEPT_CB) {
r->head->items[fd].fd = fd;
r->head->items[fd].acceptcb = cb; //回調(diào)函數(shù)
r->head->items[fd].arg = arg;
ev.events = EPOLLIN;
}
ev.data.ptr = &r->head->items[fd];
/*NOSET_CB 0*/
if (r->head->items[fd].events == NOSET_CB) {
if (epoll_ctl(r->epfd, EPOLL_CTL_ADD, fd, &ev) < 0) {
printf("epoll_ctl EPOLL_CTL_ADD failed, %d\n", errno);
return -1;
}
r->head->items[fd].events = event;
} else if (r->head->items[fd].events != event) {
if (epoll_ctl(r->epfd, EPOLL_CTL_MOD, fd, &ev) < 0) {
printf("epoll_ctl EPOLL_CTL_MOD failed\n");
return -1;
}
r->head->items[fd].events = event;
}
return 0;
}
/*nreactor_del_event(fd, NULL, 0, NULL);*/
/*下樹*/
/*nreactor_del_event(fd, NULL, 0, NULL);*/
int nreactor_del_event(int fd, NCALLBACK cb, int event, void *arg) {
struct reactor *r = getInstance();
struct epoll_event ev = {0};
ev.data.ptr = arg;
epoll_ctl(r->epfd, EPOLL_CTL_DEL, fd, &ev);
r->head->items[fd].events = 0;
return 0;
}
int write_callback(int fd, int event, void *arg) {
struct reactor *R = getInstance();
unsigned char *sbuffer = R->head->items[fd].sbuffer;
int length = R->head->items[fd].slength;
int ret = send(fd, sbuffer, length, 0);
if (ret < length) {
nreactor_set_event(fd, write_callback, WRITE_CB, NULL);
} else {
nreactor_set_event(fd, read_callback, READ_CB, NULL);
}
return 0;
}
// 5k qps
int read_callback(int fd, int event, void *arg) {
struct reactor *R = getInstance();
unsigned char *buffer = R->head->items[fd].rbuffer;
#if 0 //ET
int idx = 0, ret = 0;
while (idx < BUFFER_LENGTH) {
ret = recv(fd, buffer+idx, BUFFER_LENGTH-idx, 0);
if (ret == -1) {
break;
} else if (ret > 0) {
idx += ret;
} else {// == 0
break;
}
}
if (idx == BUFFER_LENGTH && ret != -1) {
nreactor_set_event(fd, read_callback, READ_CB, NULL);
} else if (ret == 0) {
nreactor_set_event
//close(fd);
} else {
nreactor_set_event(fd, write_callback, WRITE_CB, NULL);
}
#else //LT
int ret = recv(fd, buffer, BUFFER_LENGTH, 0);
if (ret == 0) { // fin
nreactor_del_event(fd, NULL, 0, NULL);
close(fd);
} else if (ret > 0) {
unsigned char *sbuffer = R->head->items[fd].sbuffer;
memcpy(sbuffer, buffer, ret);
R->head->items[fd].slength = ret;
printf("readcb: %s\n", sbuffer);
nreactor_set_event(fd, write_callback, WRITE_CB, NULL);
}
#endif
}
// web server
// ET / LT
int accept_callback(int fd, int event, void *arg) {
int connfd;
struct sockaddr_in client;
socklen_t len = sizeof(client);
if ((connfd = accept(fd, (struct sockaddr *)&client, &len)) == -1) {
printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno);
return 0;
}
nreactor_set_event(connfd, read_callback, READ_CB, NULL);
}
int init_server(int port) {
int listenfd;
struct sockaddr_in servaddr;
char buff[MAXLNE];
if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
printf("create socket error: %s(errno: %d)\n", strerror(errno), errno);
return 0;
}
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(port);
if (bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) == -1) {
printf("bind socket error: %s(errno: %d)\n", strerror(errno), errno);
return 0;
}
if (listen(listenfd, 10) == -1) {
printf("listen socket error: %s(errno: %d)\n", strerror(errno), errno);
return 0;
}
return listenfd;
}
int init_reactor(struct reactor *r) {
if (r == NULL) return -1;
int epfd = epoll_create(1); //int size
r->epfd = epfd;
// fd --> item
r->head = (struct itemblock*)malloc(sizeof(struct itemblock));
if (r->head == NULL) {
close(epfd);
return -2;
}
memset(r->head, 0, sizeof(struct itemblock));
r->head->items = (struct nitem *)malloc(MAX_EPOLL_EVENT * sizeof(struct nitem));
if (r->head->items == NULL) {
free(r->head);
close(epfd);
return -2;
}
memset(r->head->items, 0, (MAX_EPOLL_EVENT * sizeof(struct nitem)));
r->head->next = NULL;
return 0;
}
// accept --> EPOLL
/*epoll_wait監(jiān)聽0*/
int reactor_loop(int listenfd) {
struct reactor *R = getInstance();
struct epoll_event events[POLL_SIZE] = {0};
while (1) {
int nready = epoll_wait(R->epfd, events, POLL_SIZE, -1);
if (nready == -1) {
continue;
}
int i = 0;
for (i = 0;i < nready;i ++) {
struct nitem *item = (struct nitem *)events[i].data.ptr;
int connfd = item->fd;
if (connfd == listenfd) { //
item->acceptcb(listenfd, 0, NULL);
} else {
if (events[i].events & EPOLLIN) { //
item->readcb(connfd, 0, NULL);
}
if (events[i].events & EPOLLOUT) {
item->writecb(connfd, 0, NULL);
}
}
}
}
return 0;
}
int main(int argc, char **argv)
{
int connfd, n;
int listenfd = init_server(9999);
nreactor_set_event(listenfd, accept_callback, ACCEPT_CB, NULL);
//nreactor_set_event(listenfd, accept_callback, read_callback, write_callback);
reactor_loop(listenfd);
return 0;
}
原文鏈接:https://blog.csdn.net/weixin_52259848/article/details/125354898
相關(guān)推薦
- 2022-08-21 golang數(shù)組內(nèi)存分配原理_Golang
- 2021-12-04 解決ASP.NET?Core中使用漏桶算法限流的問題_實(shí)用技巧
- 2023-06-20 Python實(shí)用技巧之臨時(shí)文件的妙用_python
- 2022-08-02 源碼解析python中randint函數(shù)的效率缺陷_python
- 2023-01-31 利用Rust編寫一個(gè)簡單的字符串時(shí)鐘_Rust語言
- 2022-04-21 Python?實(shí)現(xiàn)驅(qū)動(dòng)AI機(jī)器人_python
- 2023-02-14 C++深入分析數(shù)據(jù)在內(nèi)存中的存儲(chǔ)形態(tài)_C 語言
- 2022-11-08 Python?Panda中索引和選擇?series?的數(shù)據(jù)_python
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細(xì)win安裝深度學(xué)習(xí)環(huán)境2025年最新版(
- Linux 中運(yùn)行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲(chǔ)小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎(chǔ)操作-- 運(yùn)算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認(rèn)證信息的處理
- Spring Security之認(rèn)證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權(quán)
- redisson分布式鎖中waittime的設(shè)
- maven:解決release錯(cuò)誤:Artif
- restTemplate使用總結(jié)
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實(shí)現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結(jié)構(gòu)-簡單動(dòng)態(tài)字符串(SD
- arthas操作spring被代理目標(biāo)對(duì)象命令
- Spring中的單例模式應(yīng)用詳解
- 聊聊消息隊(duì)列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠(yuǎn)程分支