日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

Golang?編寫Tcp服務器的解決方案_Golang

作者:Finley ? 更新時間: 2022-12-01 編程語言

Golang 開發 Tcp 服務器及拆包粘包、優雅關閉的解決方案

Golang 作為廣泛用于服務端和云計算領域的編程語言,tcp socket 是其中至關重要的功能。您可以在 github.com/hdt3213/godis/tcp 中看到本文所述 TCP 服務器的完整代碼及其應用。

早期的 Tomcat/Apache 服務器使用的是阻塞 IO 模型。它使用一個線程處理一個連接,在沒有收到新數據時監聽線程處于阻塞狀態,直到數據就緒后線程被喚醒。因為阻塞 IO 模型需要開啟大量線程并且頻繁地進行上下文切換,所以效率很差。

IO 多路復用技術為了解決上述問題采用了一個線程監聽多路連接的方案。一個線程持有多個連接并阻塞等待,當其中某個連接可讀寫時線程被喚醒進行處理。因為多個連接復用了一個線程所以 IO 多路復用需要的線程數少很多。

主流操作系統都提供了IO多路復用技術的實現,比如 Linux上的 epoll,freeBSD 上的 kqueue 以及 Windows 平臺上的 iocp。有得必有失,因為 epoll 等技術提供的接口面向 IO 事件而非面向連接,所以需要編寫復雜的異步代碼,開發難度很大。

Golang 的 netpoller 基于IO多路復用和 goroutine scheduler 構建了一個簡潔高性能的網絡模型,并給開發者提供了 goroutine-per-connection 風格的極簡接口。

更多關于 netpoller 的剖析可以參考Golang實現四種負載均衡的算法(隨機,輪詢等), 接下來我們嘗試用 netpoller 編寫我們的服務器。

Echo 服務器

作為開始我們來實現一個簡單的 Echo 服務器。它會接受客戶端連接并將客戶端發送的內容原樣傳回客戶端。

package main

import (
    "fmt"
    "net"
    "io"
    "log"
    "bufio"
)

func ListenAndServe(address string) {
    // 綁定監聽地址
    listener, err := net.Listen("tcp", address)
    if err != nil {
        log.Fatal(fmt.Sprintf("listen err: %v", err))
    }
    defer listener.Close()
    log.Println(fmt.Sprintf("bind: %s, start listening...", address))

    for {
        // Accept 會一直阻塞直到有新的連接建立或者listen中斷才會返回
        conn, err := listener.Accept()
        if err != nil {
            // 通常是由于listener被關閉無法繼續監聽導致的錯誤
            log.Fatal(fmt.Sprintf("accept err: %v", err))
        }
        // 開啟新的 goroutine 處理該連接
        go Handle(conn)
    }
}

func Handle(conn net.Conn) {
    // 使用 bufio 標準庫提供的緩沖區功能
    reader := bufio.NewReader(conn)
    for {
        // ReadString 會一直阻塞直到遇到分隔符 '\n'
        // 遇到分隔符后會返回上次遇到分隔符或連接建立后收到的所有數據, 包括分隔符本身
        // 若在遇到分隔符之前遇到異常, ReadString 會返回已收到的數據和錯誤信息
        msg, err := reader.ReadString('\n')
        if err != nil {
            // 通常遇到的錯誤是連接中斷或被關閉,用io.EOF表示
            if err == io.EOF {
                log.Println("connection close")
            } else {
                log.Println(err)
            }
            return
        }
        b := []byte(msg)
        // 將收到的信息發送給客戶端
        conn.Write(b)
    }
}

func main() {
    ListenAndServe(":8000")
}

使用 telnet 工具測試我們編寫的 Echo 服務器:

$ telnet 127.0.0.1 8000
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
> a
a
> b
b
Connection closed by foreign host.

拆包與粘包問題

某些朋友可能看到"拆包與粘包"后表示極度震驚,并再三強調: TCP是個字節流協議,不存在粘包問題。

我們常說的 TCP 服務器并非「實現 TCP 協議的服務器」而是「基于TCP協議的應用層服務器」。TCP 是面向字節流的協議,而應用層協議大多是面向消息的,比如 HTTP 協議的請求/響應,Redis 協議的指令/回復都是以消息為單位進行通信的。

作為應用層服務器我們有責任從 TCP 提供的字節流中正確地解析出應用層消息,在這一步驟中我們會遇到「拆包/粘包」問題。

socket 允許我們通過 read 函數讀取新收到的一段數據(當然這段數據并不對應一個 TCP 包)。在上文的 Echo 服務器示例中我們用\n表示消息結束,從 read 函數讀取的數據可能存在下列幾種情況:

  • 收到兩段數據: "abc", "def\n" 它們屬于一條消息 "abcdef\n" 這是拆包的情況
  • 收到一段數據: "abc\ndef\n" 它們屬于兩條消息 "abc\n", "def\n" 這是粘包的情況

應用層協議通常采用下列幾種思路之一來定義消息,以保證完整地進行讀取:

  • 定長消息
  • 在消息尾部添加特殊分隔符,如示例中的Echo協議和FTP控制協議。bufio 標準庫會緩存收到的數據直到遇到分隔符才會返回,它可以幫助我們正確地分割字節流。
  • 將消息分為 header 和 body, 并在 header 中提供 body 總長度,這種分包方式被稱為 LTV(length,type,value) 包。這是應用最廣泛的策略,如HTTP協議。當從 header 中獲得 body 長度后, io.ReadFull 函數會讀取指定長度字節流,從而解析應用層消息。

在沒有具體應用層協議的情況下,我們很難詳細地討論拆包與粘包問題。在本系列的第二篇文章: 實現 Redis 協議解析器 中我們可以看到 Redis 序列化協議(RESP)對分隔符和 LTV 包的結合應用,以及兩種分包方式的具體解析代碼。

優雅關閉

在生產環境下需要保證TCP服務器關閉前完成必要的清理工作,包括將完成正在進行的數據傳輸,關閉TCP連接等。這種關閉模式稱為優雅關閉,可以避免資源泄露以及客戶端未收到完整數據導致故障。

TCP 服務器的優雅關閉模式通常為: 先關閉listener阻止新連接進入,然后遍歷所有連接逐個進行關閉。首先修改一下TCP服務器:

// handler 是應用層服務器的抽象
type Handler interface {
    Handle(ctx context.Context, conn net.Conn)
    Close()error
}

// 監聽并提供服務,并在收到 closeChan 發來的關閉通知后關閉
func ListenAndServe(listener net.Listener, handler tcp.Handler, closeChan <-chan struct{}) {
    // 監聽關閉通知
    go func() {
        <-closeChan
        logger.Info("shutting down...")
        // 停止監聽,listener.Accept()會立即返回 io.EOF
        _ = listener.Close()
        // 關閉應用層服務器 
        _ = handler.Close()
    }()

    // 在異常退出后釋放資源
    defer func() {
        // close during unexpected error
        _ = listener.Close()
        _ = handler.Close()
    }()
    ctx := context.Background()
    var waitDone sync.WaitGroup
    for {
        // 監聽端口, 阻塞直到收到新連接或者出現錯誤
        conn, err := listener.Accept()
        if err != nil {
            break
        }
        // 開啟 goroutine 來處理新連接
        logger.Info("accept link")
        waitDone.Add(1)
        go func() {
            defer func() {
                waitDone.Done()
            }()
            handler.Handle(ctx, conn)
        }()
    }
    waitDone.Wait()
}

// ListenAndServeWithSignal 監聽中斷信號并通過 closeChan 通知服務器關閉
func ListenAndServeWithSignal(cfg *Config, handler tcp.Handler) error {
    closeChan := make(chan struct{})
    sigCh := make(chan os.Signal)
    signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
    go func() {
        sig := <-sigCh
        switch sig {
        case syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
            closeChan <- struct{}{}
        }
    }()
    listener, err := net.Listen("tcp", cfg.Address)
    if err != nil {
        return err
    }
    logger.Info(fmt.Sprintf("bind: %s, start listening...", cfg.Address))
    ListenAndServe(listener, handler, closeChan)
    return nil
}

接下來修改應用層服務器:

// 客戶端連接的抽象
type Client struct {
    // tcp 連接
    Conn net.Conn
    // 當服務端開始發送數據時進入waiting, 阻止其它goroutine關閉連接
    // wait.Wait是作者編寫的帶有最大等待時間的封裝: 
    // https://github.com/HDT3213/godis/blob/master/src/lib/sync/wait/wait.go
    Waiting wait.Wait
}

type EchoHandler struct {
    
    // 保存所有工作狀態client的集合(把map當set用)
    // 需使用并發安全的容器
    activeConn sync.Map 

    // 關閉狀態標識位
    closing atomic.AtomicBool
}

func MakeEchoHandler()(*EchoHandler) {
    return &EchoHandler{}
}

func (h *EchoHandler)Handle(ctx context.Context, conn net.Conn) {
    // 關閉中的 handler 不會處理新連接
    if h.closing.Get() {
        conn.Close()
        return 
    }

    client := &Client {
        Conn: conn,
    }
    h.activeConn.Store(client, struct{}{}) // 記住仍然存活的連接

    reader := bufio.NewReader(conn)
    for {
        msg, err := reader.ReadString('\n')
        if err != nil {
            if err == io.EOF {
                logger.Info("connection close")
                h.activeConn.Delete(client)
            } else {
                logger.Warn(err)
            }
            return
        }
        // 發送數據前先置為waiting狀態,阻止連接被關閉
        client.Waiting.Add(1)

        // 模擬關閉時未完成發送的情況
        //logger.Info("sleeping")
        //time.Sleep(10 * time.Second)

        b := []byte(msg)
        conn.Write(b)
        // 發送完畢, 結束waiting
        client.Waiting.Done()
    }
}

// 關閉客戶端連接
func (c *Client)Close()error {
    // 等待數據發送完成或超時
    c.Waiting.WaitWithTimeout(10 * time.Second)
    c.Conn.Close()
    return nil
}

// 關閉服務器
func (h *EchoHandler)Close()error {
    logger.Info("handler shutting down...")
    h.closing.Set(true)
    // 逐個關閉連接
    h.activeConn.Range(func(key interface{}, val interface{})bool {
        client := key.(*Client)
        client.Close()
        return true
    })
    return nil
}

原文鏈接:https://www.cnblogs.com/Finley/p/11070669.html

欄目分類
最近更新