網站首頁 編程語言 正文
1. 前言
請思考以下幾個問題:
1).為什么需要設計網絡緩沖區,內核中不是有讀寫緩沖區嗎?
需要設計的網絡緩沖區和內核中TCP緩沖區的關系如下圖所示,通過socket進行進行綁定。具體來說網絡緩沖區包括讀(接收)緩沖區和寫(發送)緩沖區。設計讀緩沖區的目的是:當從TCP中讀數據時,不能確定讀到的是一個完整的數據包,如果是不完整的數據包,需要先放入緩沖區中進行緩存,直到數據包完整才進行業務處理。設計寫緩沖區的目的是:向TCP寫數據不能保證所有數據寫成功,如果TCP寫緩沖區已滿,則會丟棄數據包,所以需要一個寫緩沖區暫時存儲需要寫的數據。
2).緩沖區應該設置為堆內存還是棧內存?
假設有一個服務端程序,需要同時連接多個客戶端,每一個socket就是一個連接對象,所以不同的socket都需要自己對應的讀寫緩沖區。如果將緩沖區設置為棧內存,很容易爆掉,故將將其設置為堆內存更加合理。此外,緩沖區容量上限一般是有限制的,一開始不需要分配過大,僅僅在緩沖區不足時進行擴展。
3).讀寫緩沖區的基本要求是什么?
通過以上分析,不難得出讀寫緩沖區雖然是兩個獨立的緩沖區,但是其核心功能相同,可以復用其代碼。
讀寫緩沖區至少提供兩類接口:存儲數據和讀取數據
讀寫緩沖區要求:先進先出,保證存儲的數據是有序的
4).如何界定數據包?
第一種使用特殊字符界定數據包:例如\n
,\r\n
,第二種通過長度界定數據包,數據包中首先存儲的是整個數據包的長度,再根據長度進行讀取。
5).幾種常見的緩沖區設計
①ringbuffer+讀寫指針
ringbuffer是一段連續的內存,當末端已經寫入數據后,會從頭部繼續寫數據,所以感覺上像一個環,實際是一個循環數組。ringbuffer的缺點也很明顯:不能夠擴展、對于首尾部分的數據需要增加一次IO調用。
②可擴展的讀寫緩沖區+讀寫指針
下圖設計了一種可擴展的讀寫緩沖區,在創建時會分配一塊固定大小的內存,整個結構分為預留空間數據空間。預留空間用于存儲必要的信息,真正存儲數據的空間由連續內存組成。此種緩沖區設計相對于ringbuffer能夠擴展,但是也有一定的缺點:由于需要最大化利用空間,會將數據移動至開頭,移動操作會降低讀寫速度。
本文實現可擴展的讀寫緩沖區+讀寫指針
2. 數據結構
①Buffer類的設計與初始化
Buffer類的數據結構如下所示,m_s
是指向緩沖區的指針,m_max_size
是緩沖區的長度,初始設置為10,并根據擴展因子m_expand_par
進行倍增。擴展因子m_expand_par
設置為2,表示每次擴增長度翻倍,也就是說緩沖區的長度隨擴展依次為10、20、40、80。
class Buffer{ public: Buffer(); //構造 ~Buffer(); int init(); //分配緩沖區 private: char* m_s; //緩沖區指針 size_t m_read_index; //讀指針位置 size_t m_write_index; //寫指針位置 size_t m_max_size; //緩沖區長度 size_t m_expand_par; //擴展因子 };
構造函數的初始化列表中初始化成員變量。實際初始化緩沖區在init函數中分配內存,大小為m_max_size
。不在構造函數中初始化緩沖區的原因是:如果構造函數中分配失敗,無法處理,也可使用RAII手段進行處理
Buffer::Buffer() :m_read_index(0),m_write_index(0),m_max_size(10), m_expand_par(2),m_s(nullptr) {} Buffer::~Buffer() { delete[] m_s; } int Buffer::init() { m_s = new char[m_max_size](); if (m_s == nullptr) { cout << "分配m_s失敗\n"; return -1; } return 0; }
②讀寫指針的位置變化
當緩沖區為空時,讀寫指針位置相同都為0。
在寫入長度為6的數據后,讀寫指針位置如圖
接著讀取兩個字節后,讀寫指針如圖
③擴展緩沖區實現
擴展緩沖區實際分為兩步,將有效數據前移至緩沖區頭(最大化利用數據),再進行擴展。根據成員變量擴展因子m_expand_par
的值,將緩沖區按倍數擴大。
假設當前存儲數據4個字節,讀寫指針如下圖。需要新增9個字節
將數據前移至緩沖區頭
擴展緩沖區為2倍
寫入9個字節
實際需要實現的兩個私有成員函數:調整數據位置至緩沖區頭adjust_buffer()
和擴展expand_buffer()
,設置為私有屬性則是因為不希望用戶調用,僅僅在寫入緩沖區前判斷不夠就進行擴展,用戶不應該知道與主動調用。
class Buffer { public: ... private: void adjust_buffer(); //調整數據位置至緩沖區頭部頭 void expand_buffer(size_t need_size); //擴展緩沖區長度 ... }
adjust_buffer()
實現如下,注釋寫的較為清楚,不再贅述
void Buffer::adjust_buffer() { if (m_read_index == 0) //數據已經在頭部,直接返回 return; int used_size = m_write_index - m_read_index; if (used_size == 0) { //緩沖區為空,重置讀寫指針 m_write_index = 0; m_read_index = 0; } else { cout << "調整前read_index write_index" << m_read_index << " " << m_write_index << endl; memcpy(m_s, &m_s[m_read_index], used_size); //將數據拷貝至頭部 m_write_index -= m_read_index; //寫指針也前移 cout << "調整了" << used_size << "個字節" << endl; m_read_index = 0; //讀指針置0 } cout << "調整后read_index write_index" << m_read_index << " " << m_write_index << endl; }
擴展緩沖區實現如下:
- 首先根據需要寫入的字節數判斷緩沖區長度多大才能夠容下
- 申請新的存儲區,并將數據拷貝到新存儲區
- 釋放舊緩沖區,將新存儲區作為緩沖區
void Buffer::expand_buffer(size_t need_size) //need_size需要寫入的字節數 { size_t used_size = m_write_index - m_read_index; //used_size表示已經存儲的字節數 size_t remain_size = m_max_size - used_size; //remain_size表示剩余空間 size_t expand_size = m_max_size; while (remain_size < need_size) { //剩余空間不夠時擴展,用while表示直到擴展至夠用 expand_size *= m_expand_par; remain_size = expand_size - used_size; //cout << "擴展長度中... 總剩余 總長度 " << remain_size << " " << expand_size << endl; } char* s1 = new char[expand_size](); //申請新的空間 memcpy(s1, m_s, m_max_size); free(m_s); m_s = s1; //將新空間掛載到緩沖區 m_max_size = expand_size; //更新緩沖區總長度 //cout << "擴展結束,總長度為" << m_max_size << endl; }
3. 外部接口設計與實現
以讀緩沖區為例需要提供的接口有:向緩沖區寫入數據write_to_buffer()
,向緩沖區讀取數據read_from_buffer()
,得到能夠讀取的最大字節數readable_bytes()
。
class Buffer { public: void write_to_buffer(char* src); //從src中寫數據 size_t readable_bytes(); //存儲數據的字節數 size_t read_from_buffer(char *dst,int bytes); //讀數據 size_t pop_bytes(size_t bytes); //丟棄數據 }
① 寫入緩沖區write_to_buffer()
write_to_buffer()
實現的思路如流程圖所示:
判斷剩余空間:
剩余空間不夠:調整數據至頭部、擴展緩沖區
剩余空間足夠:向下繼續
判斷當前空間:
當前空間不夠:調整數據至頭部
剩余空間足夠:向下繼續
存儲數據
根據流程圖實現起來邏輯非常清晰,src表示原始數據
void Buffer::write_to_buffer(char* src) { size_t used_size = m_write_index - m_read_index; //used_size表示已經存儲的字節數 size_t remain_size = m_max_size - used_size; //remain_size表示剩余空間 size_t cur_size = m_max_size - m_write_index; //cur_size表示當前能夠存儲的空間 size_t size = init_random_write(&src); //printf("已經使用%d,剩余總長度%d,剩余當前長度%d\n", used_size, remain_size, cur_size); if (size > remain_size) { //剩余空間不夠 adjust_buffer(); expand_buffer(size); } else if (size > cur_size) { //剩余空間夠,當前存不下 adjust_buffer(); } memcpy(&m_s[m_write_index], src, size); //存儲數據 m_write_index += size; delete[] src; //更新并打印log //used_size = m_write_index - m_read_index; //remain_size = m_max_size - used_size; //cur_size = m_max_size - m_write_index; //printf("已經使用%d,剩余總長度%d,剩余當前長度%d\n", used_size, remain_size, cur_size); }
流程圖中還出現隨機一段數據,這是用來調試的。隨機初始化一段長度為0~ 40,字符a~ z的數據,并寫緩存區
static int get_random_len() { return rand() % 40; } static int get_random_ala() { return rand() % 26; } size_t Buffer::init_random_write(char** src) { int size = get_random_len(); char ala = get_random_ala(); *src = new char[size]; cout << "準備寫入的長度為" << size << " 值全是 " << (unsigned char)('a' + ala) << endl; for (int i = 0; i < size; i++) { (*src)[i] = 'a' + ala; } return size; }
② 讀取緩沖區read_from_buffer()
read_from_buffer(char*dst,int read_size)
傳入需要拷貝到目的地址和需要讀取的字節數,需要注意的是需要讀取的字節數為-1
表示全部讀取,函數返回實際讀取的字節數。實現如流程圖所示:
代碼如下
size_t Buffer::read_from_buffer(char*dst,int read_size) { size_t read_max = m_write_index - m_read_index; //read_max存儲的字節數 if (read_size == 0 || read_max == 0) //讀取0字節和空緩存區時直接返回 return 0; if (read_size == -1) { //全讀走 memcpy(dst, &m_s[m_read_index], read_max); m_read_index += read_max; cout << "讀取了" << read_max << "個字節" << endl; } else if (read_size > 0) { //讀取指定字節 if ((size_t)read_size > read_max) read_size = read_max; memcpy(dst, &m_s[m_read_index], read_size); m_read_index += read_size; cout << "讀取了" << read_size << "個字節" << endl; } return read_size; //返回讀取的字節數 }
③ 丟棄數據pop_bytes
size_t pop_bytes(size_t size)
傳入需要丟棄的字節數,需要注意的是需要丟棄的字節數為-1
表示全部丟棄;-2表示隨機丟棄0~ 40字節,函數返回實際丟棄的字節數。實現如流程圖所示:
size_t Buffer::pop_bytes(size_t size) { size_t read_max = m_write_index - m_read_index; //存儲數據長度 //test random if (size == -2) size = get_random_len(); if (size == 0 || read_max == 0) //緩沖區為空或丟棄0字節返回 return 0; if (size == -1) { //全丟 m_read_index += read_max; cout << "丟棄了" << read_max << "個字節" << endl; return read_max; } if (size > 0) { //丟棄指定字節 if (size > read_max) size = read_max; m_read_index += size; cout << "丟棄了" << size << "個字節" << endl; } return size; }
④ 其他接口
peek_read()
和peek_write()
返回讀寫指針的位置
size_t peek_read(); //指向準備讀的位置(調試用) size_t peek_write(); //指向準備寫的位置(調試用) size_t Buffer::peek_write() { return m_write_index; } size_t Buffer::peek_read() { return m_read_index; }
4. 完整代碼與測試
① 完整代碼
Buffer.h
#pragma once class Buffer { public: Buffer(); //構造 ~Buffer(); //析構 int init(); //分配緩沖區 void write_to_buffer(char* src); //寫數據 size_t pop_bytes(size_t bytes); //丟棄數據 size_t read_from_buffer(char *dst,int bytes);//讀數據 size_t readable_bytes(); //得到存儲數據的字節數 size_t peek_read(); //指向準備讀的位置(調試用) size_t peek_write(); //指向準備寫的位置(調試用) private: void adjust_buffer(); //調整數據位置至緩沖區頭 void expand_buffer(size_t need_size); //擴展緩沖區長度 size_t init_random_write(char** src); //隨機初始化一段數據(調試用) private: char* m_s; //緩沖區指針 size_t m_read_index; //讀指針位置 size_t m_write_index; //寫指針位置 size_t m_max_size; //緩沖區長度 size_t m_expand_par; //擴展因子 };
Buffer.cpp
:
#include "Buffer.h" #include<iostream> #include<time.h> using namespace std; int total_write = 0; //記錄總寫入 int total_read = 0; //記錄總讀取 static int get_random_len() { return rand() % 40; } static int get_random_ala() { return rand() % 26; } Buffer::Buffer() :m_read_index(0),m_write_index(0),m_max_size(10), m_expand_par(2),m_s(nullptr) {} Buffer::~Buffer() { delete[] m_s; } int Buffer::init() { m_s = new char[m_max_size](); if (m_s == nullptr) { cout << "分配m_s失敗\n"; return -1; } return 0; } size_t Buffer::read_from_buffer(char*dst,int read_size) { size_t read_max = m_write_index - m_read_index; //read_max存儲的字節數 if (read_size == 0 || read_max == 0) //讀取0字節和空緩存區時直接返回 return 0; if (read_size == -1) { //全讀走 memcpy(dst, &m_s[m_read_index], read_max); m_read_index += read_max; printf("讀取完成:\t讀取%d個字節\n", read_max); total_read += read_max; } else if (read_size > 0) { //讀取指定字節 if ((size_t)read_size > read_max) read_size = read_max; memcpy(dst, &m_s[m_read_index], read_size); m_read_index += read_size; printf("讀取完成:\t讀取%d個字節\n", read_size); total_read += read_size; } return read_size; //返回讀取的字節數 } size_t Buffer::readable_bytes() { return m_write_index - m_read_index; } size_t Buffer::peek_write() { return m_write_index; } size_t Buffer::peek_read() { return m_read_index; } void Buffer::write_to_buffer(char* src) { size_t used_size = m_write_index - m_read_index; //used_size表示已經存儲的字節數 size_t remain_size = m_max_size - used_size; //remain_size表示剩余空間 size_t cur_size = m_max_size - m_write_index; //cur_size表示當前能夠存儲的空間 size_t size = init_random_write(&src); //printf("已經使用%d,剩余總長度%d,剩余當前長度%d\n", used_size, remain_size, cur_size); if (size > remain_size) { //剩余空間不夠 adjust_buffer(); expand_buffer(size); } else if (size > cur_size) { //剩余空間夠,當前存不下 adjust_buffer(); } memcpy(&m_s[m_write_index], src, size); //存儲數據 m_write_index += size; delete[] src; //更新并打印log used_size = m_write_index - m_read_index; remain_size = m_max_size - used_size; cur_size = m_max_size - m_write_index; printf("寫入完成:\t總存儲%d,剩余空間%d,剩余當前空間%d\n", used_size, remain_size, cur_size); } size_t Buffer::pop_bytes(size_t size) { size_t read_max = m_write_index - m_read_index; //存儲數據長度 //test random if (size == -2) size = get_random_len(); if (size == 0 || read_max == 0) //緩沖區為空或丟棄0字節返回 return 0; if (size == -1) { //全丟 m_read_index += read_max; cout << "丟棄了" << read_max << "個字節" << endl; total_read += read_max; return read_max; } if (size > 0) { //丟棄指定字節 if (size > read_max) size = read_max; m_read_index += size; cout << "丟棄了" << size << "個字節" << endl; total_read += size; } return size; } size_t Buffer::init_random_write(char** src) { int size = get_random_len(); total_write += size; *src = new char[size]; char ala = get_random_ala(); cout << "隨機寫入:\t長度為" << size << " 值全是 " << (unsigned char)('a' + ala) << endl; for (int i = 0; i < size; i++) { (*src)[i] = 'a' + ala; } return size; } void Buffer::adjust_buffer() { if (m_read_index == 0) //數據已經在頭部,直接返回 return; int used_size = m_write_index - m_read_index; if (used_size == 0) { //緩沖區為空,重置讀寫指針 m_write_index = 0; m_read_index = 0; } else { cout << "調整前read_index write_index" << m_read_index << " " << m_write_index << endl; memcpy(m_s, &m_s[m_read_index], used_size); //將數據拷貝至頭部 m_write_index -= m_read_index; //寫指針也前移 cout << "調整了" << used_size << "個字節" << endl; m_read_index = 0; //讀指針置0 } cout << "調整后read_index write_index" << m_read_index << " " << m_write_index << endl; } void Buffer::expand_buffer(size_t need_size) //need_size需要寫入的字節數 { size_t used_size = m_write_index - m_read_index; //used_size表示已經存儲的字節數 size_t remain_size = m_max_size - used_size; //remain_size表示剩余空間 size_t expand_size = m_max_size; while (remain_size < need_size) { //剩余空間不夠時擴展,用while表示直到擴展至夠用 expand_size *= m_expand_par; remain_size = expand_size - used_size; cout << "擴展長度中... 總剩余 總長度 " << remain_size << " " << expand_size << endl; } char* s1 = new char[expand_size](); //申請新的空間 memcpy(s1, m_s, m_max_size); free(m_s); m_s = s1; //將新空間掛載到緩沖區 m_max_size = expand_size; //更新緩沖區總長度 cout << "擴展結束,總長度為" << m_max_size << endl; }
② 測試
int main() { srand((unsigned)time(NULL)); //調試需要初始化隨機種子 Buffer* pbuffer = new Buffer(); //創建Buffer對象 if (pbuffer->init() != 0) //init函數分配緩沖區 return 0; { char* s = nullptr; //s是指向隨機數據的指針 char* read = new char[1000]; //讀取時將數據存儲到的指針read size_t read_size = 0; //本次讀取到的字節數 pbuffer->write_to_buffer(s); read_size = pbuffer-> read_from_buffer(read, -1); pbuffer->write_to_buffer(s); pbuffer->pop_bytes(-2); read_size = read_size = pbuffer-> read_from_buffer(read, 0); pbuffer->write_to_buffer(s); cout << "總寫入\t" << total_write << endl;; cout << "總讀取\t" << total_read << endl; cout << "目前寫入" << total_write - total_read << endl; cout << "可讀取\t" << pbuffer->readable_bytes()<< endl; printf(" write %d read %d \n", pbuffer->peek_write(),pbuffer->peek_read()); if (total_write - total_read != pbuffer->readable_bytes()) { //根據總寫入-總讀取和一共存儲的字節數判斷是否存儲正確 cout << "error!!!" << endl; } else cout << "test is ok\n\n\n"; } delete s; delete[] read; delete pbuffer; return 0; }
隨機1000000次測試
int main() { srand((unsigned)time(NULL)); //調試需要初始化隨機種子 Buffer* pbuffer = new Buffer(); //創建Buffer對象 if (pbuffer->init() != 0) //init函數分配緩沖區 return 0; char* s = nullptr; //s是指向隨機數據的指針 char* read = new char[1000]; //讀取時將數據存儲到的指針read size_t read_size = 0; //本次讀取到的字節數 unsigned long long time = 0; //調試的循環次數 while (1) { pbuffer->write_to_buffer(s); read_size = pbuffer-> read_from_buffer(read, -1); pbuffer->write_to_buffer(s); pbuffer->write_to_buffer(s); pbuffer->pop_bytes(-2); read_size = read_size = pbuffer-> read_from_buffer(read, 0); pbuffer->write_to_buffer(s); pbuffer->pop_bytes(-2); pbuffer->write_to_buffer(s); read_size = pbuffer-> read_from_buffer(read, -1); pbuffer->write_to_buffer(s); pbuffer->write_to_buffer(s); read_size = pbuffer-> read_from_buffer(read, 22); pbuffer->write_to_buffer(s); pbuffer->write_to_buffer(s); read_size = pbuffer-> read_from_buffer(read, -1); pbuffer->pop_bytes(-2); pbuffer->pop_bytes(-2); pbuffer->write_to_buffer(s); pbuffer->write_to_buffer(s); read_size = pbuffer-> read_from_buffer(read, 2); pbuffer->write_to_buffer(s); read_size = pbuffer-> read_from_buffer(read, 17); pbuffer->write_to_buffer(s); pbuffer->pop_bytes(-2); pbuffer->write_to_buffer(s); pbuffer->write_to_buffer(s); pbuffer->read_from_buffer(read, 18); cout << "總寫入\t" << total_write << endl;; cout << "總讀取\t" << total_read << endl; cout << "目前寫入" << total_write - total_read << endl; cout << "可讀取\t" << pbuffer->readable_bytes()<< endl; printf(" write %d read %d \n", pbuffer->peek_write(),pbuffer->peek_read()); if (total_write - total_read != pbuffer->readable_bytes()) { //根據總寫入-總讀取和一共存儲的字節數判斷是否存儲正確 cout << "error!!!" << endl; break; } if (time == 1000000) //循環1000000次 { cout << "1000000 ok!!!" << endl; break; } cout << time++ << " is ok\n\n\n"; } delete s; delete[] read; delete pbuffer; return 0; }
原文鏈接:https://blog.csdn.net/shi_xiao_xuan/article/details/122054839
相關推薦
- 2023-05-03 Pycharm?Terminal?與Project?interpreter?安裝包不同步問題解決_p
- 2024-01-06 RocketMQ消息丟失問題
- 2022-12-13 Prometheus?Operator架構介紹_云其它
- 2022-04-12 Qt實現實時鼠標繪制圖形_C 語言
- 2022-09-04 docker部署可執行jar包的思路與完整步驟_docker
- 2022-04-12 常見的狀態碼出現原因200、301、302、403、404、500、503
- 2022-11-20 更強大的React?狀態管理庫Zustand使用詳解_React
- 2022-04-28 Pytorch中torch.flatten()和torch.nn.Flatten()實例詳解_pyt
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支