網站首頁 編程語言 正文
Golang 開發 Tcp 服務器及拆包粘包、優雅關閉的解決方案
Golang 作為廣泛用于服務端和云計算領域的編程語言,tcp socket 是其中至關重要的功能。您可以在 github.com/hdt3213/godis/tcp 中看到本文所述 TCP 服務器的完整代碼及其應用。
早期的 Tomcat/Apache 服務器使用的是阻塞 IO 模型。它使用一個線程處理一個連接,在沒有收到新數據時監聽線程處于阻塞狀態,直到數據就緒后線程被喚醒。因為阻塞 IO 模型需要開啟大量線程并且頻繁地進行上下文切換,所以效率很差。
IO 多路復用技術為了解決上述問題采用了一個線程監聽多路連接的方案。一個線程持有多個連接并阻塞等待,當其中某個連接可讀寫時線程被喚醒進行處理。因為多個連接復用了一個線程所以 IO 多路復用需要的線程數少很多。
主流操作系統都提供了IO多路復用技術的實現,比如 Linux上的 epoll,freeBSD 上的 kqueue 以及 Windows 平臺上的 iocp。有得必有失,因為 epoll 等技術提供的接口面向 IO 事件而非面向連接,所以需要編寫復雜的異步代碼,開發難度很大。
Golang 的 netpoller
基于IO多路復用和 goroutine scheduler 構建了一個簡潔高性能的網絡模型,并給開發者提供了 goroutine-per-connection
風格的極簡接口。
更多關于 netpoller
的剖析可以參考Golang實現四種負載均衡的算法(隨機,輪詢等), 接下來我們嘗試用 netpoller
編寫我們的服務器。
Echo 服務器
作為開始我們來實現一個簡單的 Echo 服務器。它會接受客戶端連接并將客戶端發送的內容原樣傳回客戶端。
package main import ( "fmt" "net" "io" "log" "bufio" ) func ListenAndServe(address string) { // 綁定監聽地址 listener, err := net.Listen("tcp", address) if err != nil { log.Fatal(fmt.Sprintf("listen err: %v", err)) } defer listener.Close() log.Println(fmt.Sprintf("bind: %s, start listening...", address)) for { // Accept 會一直阻塞直到有新的連接建立或者listen中斷才會返回 conn, err := listener.Accept() if err != nil { // 通常是由于listener被關閉無法繼續監聽導致的錯誤 log.Fatal(fmt.Sprintf("accept err: %v", err)) } // 開啟新的 goroutine 處理該連接 go Handle(conn) } } func Handle(conn net.Conn) { // 使用 bufio 標準庫提供的緩沖區功能 reader := bufio.NewReader(conn) for { // ReadString 會一直阻塞直到遇到分隔符 '\n' // 遇到分隔符后會返回上次遇到分隔符或連接建立后收到的所有數據, 包括分隔符本身 // 若在遇到分隔符之前遇到異常, ReadString 會返回已收到的數據和錯誤信息 msg, err := reader.ReadString('\n') if err != nil { // 通常遇到的錯誤是連接中斷或被關閉,用io.EOF表示 if err == io.EOF { log.Println("connection close") } else { log.Println(err) } return } b := []byte(msg) // 將收到的信息發送給客戶端 conn.Write(b) } } func main() { ListenAndServe(":8000") }
使用 telnet 工具測試我們編寫的 Echo 服務器:
$ telnet 127.0.0.1 8000 Trying 127.0.0.1... Connected to 127.0.0.1. Escape character is '^]'. > a a > b b Connection closed by foreign host.
拆包與粘包問題
某些朋友可能看到"拆包與粘包"后表示極度震驚,并再三強調: TCP是個字節流協議,不存在粘包問題。
我們常說的 TCP 服務器并非「實現 TCP 協議的服務器」而是「基于TCP協議的應用層服務器」。TCP 是面向字節流的協議,而應用層協議大多是面向消息的,比如 HTTP 協議的請求/響應,Redis 協議的指令/回復都是以消息為單位進行通信的。
作為應用層服務器我們有責任從 TCP 提供的字節流中正確地解析出應用層消息,在這一步驟中我們會遇到「拆包/粘包」問題。
socket 允許我們通過 read 函數讀取新收到的一段數據(當然這段數據并不對應一個 TCP 包)。在上文的 Echo 服務器示例中我們用\n
表示消息結束,從 read 函數讀取的數據可能存在下列幾種情況:
- 收到兩段數據: "abc", "def\n" 它們屬于一條消息 "abcdef\n" 這是拆包的情況
- 收到一段數據: "abc\ndef\n" 它們屬于兩條消息 "abc\n", "def\n" 這是粘包的情況
應用層協議通常采用下列幾種思路之一來定義消息,以保證完整地進行讀取:
- 定長消息
- 在消息尾部添加特殊分隔符,如示例中的Echo協議和FTP控制協議。bufio 標準庫會緩存收到的數據直到遇到分隔符才會返回,它可以幫助我們正確地分割字節流。
- 將消息分為 header 和 body, 并在 header 中提供 body 總長度,這種分包方式被稱為 LTV(length,type,value) 包。這是應用最廣泛的策略,如HTTP協議。當從 header 中獲得 body 長度后, io.ReadFull 函數會讀取指定長度字節流,從而解析應用層消息。
在沒有具體應用層協議的情況下,我們很難詳細地討論拆包與粘包問題。在本系列的第二篇文章: 實現 Redis 協議解析器 中我們可以看到 Redis 序列化協議(RESP)對分隔符和 LTV 包的結合應用,以及兩種分包方式的具體解析代碼。
優雅關閉
在生產環境下需要保證TCP服務器關閉前完成必要的清理工作,包括將完成正在進行的數據傳輸,關閉TCP連接等。這種關閉模式稱為優雅關閉,可以避免資源泄露以及客戶端未收到完整數據導致故障。
TCP 服務器的優雅關閉模式通常為: 先關閉listener阻止新連接進入,然后遍歷所有連接逐個進行關閉。首先修改一下TCP服務器:
// handler 是應用層服務器的抽象 type Handler interface { Handle(ctx context.Context, conn net.Conn) Close()error } // 監聽并提供服務,并在收到 closeChan 發來的關閉通知后關閉 func ListenAndServe(listener net.Listener, handler tcp.Handler, closeChan <-chan struct{}) { // 監聽關閉通知 go func() { <-closeChan logger.Info("shutting down...") // 停止監聽,listener.Accept()會立即返回 io.EOF _ = listener.Close() // 關閉應用層服務器 _ = handler.Close() }() // 在異常退出后釋放資源 defer func() { // close during unexpected error _ = listener.Close() _ = handler.Close() }() ctx := context.Background() var waitDone sync.WaitGroup for { // 監聽端口, 阻塞直到收到新連接或者出現錯誤 conn, err := listener.Accept() if err != nil { break } // 開啟 goroutine 來處理新連接 logger.Info("accept link") waitDone.Add(1) go func() { defer func() { waitDone.Done() }() handler.Handle(ctx, conn) }() } waitDone.Wait() } // ListenAndServeWithSignal 監聽中斷信號并通過 closeChan 通知服務器關閉 func ListenAndServeWithSignal(cfg *Config, handler tcp.Handler) error { closeChan := make(chan struct{}) sigCh := make(chan os.Signal) signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT) go func() { sig := <-sigCh switch sig { case syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT: closeChan <- struct{}{} } }() listener, err := net.Listen("tcp", cfg.Address) if err != nil { return err } logger.Info(fmt.Sprintf("bind: %s, start listening...", cfg.Address)) ListenAndServe(listener, handler, closeChan) return nil }
接下來修改應用層服務器:
// 客戶端連接的抽象 type Client struct { // tcp 連接 Conn net.Conn // 當服務端開始發送數據時進入waiting, 阻止其它goroutine關閉連接 // wait.Wait是作者編寫的帶有最大等待時間的封裝: // https://github.com/HDT3213/godis/blob/master/src/lib/sync/wait/wait.go Waiting wait.Wait } type EchoHandler struct { // 保存所有工作狀態client的集合(把map當set用) // 需使用并發安全的容器 activeConn sync.Map // 關閉狀態標識位 closing atomic.AtomicBool } func MakeEchoHandler()(*EchoHandler) { return &EchoHandler{} } func (h *EchoHandler)Handle(ctx context.Context, conn net.Conn) { // 關閉中的 handler 不會處理新連接 if h.closing.Get() { conn.Close() return } client := &Client { Conn: conn, } h.activeConn.Store(client, struct{}{}) // 記住仍然存活的連接 reader := bufio.NewReader(conn) for { msg, err := reader.ReadString('\n') if err != nil { if err == io.EOF { logger.Info("connection close") h.activeConn.Delete(client) } else { logger.Warn(err) } return } // 發送數據前先置為waiting狀態,阻止連接被關閉 client.Waiting.Add(1) // 模擬關閉時未完成發送的情況 //logger.Info("sleeping") //time.Sleep(10 * time.Second) b := []byte(msg) conn.Write(b) // 發送完畢, 結束waiting client.Waiting.Done() } } // 關閉客戶端連接 func (c *Client)Close()error { // 等待數據發送完成或超時 c.Waiting.WaitWithTimeout(10 * time.Second) c.Conn.Close() return nil } // 關閉服務器 func (h *EchoHandler)Close()error { logger.Info("handler shutting down...") h.closing.Set(true) // 逐個關閉連接 h.activeConn.Range(func(key interface{}, val interface{})bool { client := key.(*Client) client.Close() return true }) return nil }
原文鏈接:https://www.cnblogs.com/Finley/p/11070669.html
相關推薦
- 2022-03-21 windows設置開機自動運行批處理的方法_DOS/BAT
- 2022-05-25 excel動態生成Sql語句
- 2023-10-10 微信授權與拒絕授權的彈窗處理
- 2022-10-10 pycharm創建并使用虛擬環境的詳細圖文教程_python
- 2022-12-04 Nginx?禁止直接訪問目錄或文件的操作方法_nginx
- 2022-08-11 C++超詳細講解強制類型轉換的用法_C 語言
- 2022-06-09 FreeRTOS動態內存分配管理heap_2示例_操作系統
- 2023-01-20 python如何實現完全數_python
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支