日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

淺談Golang數據競態_Golang

作者:耳冉鵝 ? 更新時間: 2023-04-17 編程語言

本文以一個簡單事例的多種解決方案作為引子,用結構體Demo來總結各種并發讀寫的情況

一個數據競態的case

package main

import (
	"fmt"
	"testing"
	"time"
)

func Test(t *testing.T) {
  fmt.Print("getNum(): ")
	for i := 0; i < 10; i++ {
		fmt.Print(strconv.Itoa(getNum()) + " ")
	}
	fmt.Println()

}
func getNum() int {
	var num int
	go func() {
		num = 53
	}()
	time.Sleep(500)
	return num
}

在case中,getNum先聲明一個變量num,之后在goRoutine中單讀對num進行設置,而此時程序也正從函數中返回num, 因為不知道goRoutine是否完成了對num的修改,所以會導致以下兩種結果:

  • goRoutine先完成對num的修改,最后返回5
  • 變量num的值從函數返回,結果為默認值0

操作完成的順序不同,導致最后的輸出結果不同,這就是將其稱為數據竟態的原因。

檢查數據競態

Go有內置的數據競爭檢測器,可以使用它來查看潛在的數據競爭條件。使用它就像-race在普通的Go命令行工具中添加標志一樣。

  • 運行時檢查: go run -race main.go
  • 構建時檢查: go build -race main.go
  • 測試時檢查: go test -race main.go

所有避免產生競態背后的核心原則是防止對同一變量或內存位置同時進行讀寫訪問

解決方案

1、WaitGroup等待

解決數據競態的最直接方法是阻止讀取訪問操作直到寫操作完成為止。
可以以最少的麻煩解決問題,但必須要保證Add和Done出現次數一致,否則會一致阻塞程序,無限制消耗內存,直至資源耗盡服務宕機

func getNumByWaitGroup() int {
	var num int
	var wg sync.WaitGroup
	wg.Add(1) // 表示有一個任務需要等待,等待任務數+1
	go func() {
		num = 53
		wg.Done() // 完成一個處于等待隊列的任務,等待任務-1

		// Done decrements the WaitGroup counter by one.
		// func (wg *WaitGroup) Done() {
		//	wg.Add(-1)
		//}

	}()
	wg.Wait() // 阻塞等待,直到等待隊列的任務數為0
	return num
}

2、Channel阻塞等待

與1相似

func getNumByChannel() int {
	var num int
	ch := make(chan struct{}) // 創建一個類型為結構體的channel,并初始化為空
	go func() {
		num = 53
		ch <- struct{}{} // 推送一個空結構體到ch
	}()
	<-ch // 使程序處于阻塞狀態,直到ch獲取到推送的值
	return num
}

3、Channel通道

獲取結果后通過通道推送結果,與前兩種方法不同,該方法不會進行任何阻塞。
相反,保留了阻塞調用代碼的時機,因此它允許更高級別的功能決定自己的阻塞合并發機制,而不是將getXX功能視為同步功能

func getNumByChan() <-chan int {
	var num int
	ch := make(chan int) // 創建一個類型為int的channel
	go func() {
		num = 53
		ch <- num // 推送一個int到ch
	}()

	return ch // 返回chan
}

4、互斥鎖

上述三種方法解決的是num在寫操作完成后才能讀取的情況
不管讀寫順序如何,只要求它們不能同時發生——> 互斥鎖

// 首先,創建一個結構體,其中包含我們想要返回的值以及一個互斥實例
type NumLock struct {
	val int
	m   sync.Mutex
}

func (num *NumLock) Get() int {
	// The `Lock` method of the mutex blocks if it is already locked
	// if not, then it blocks other calls until the `Unlock` method is called
	// Lock方法
	// 調用結構體對象的Lock方法將會鎖定該對象中的變量;如果沒有,將會阻塞其他調用,直到該互斥對象的Unlock方法被調用

	num.m.Lock()
	// 直到該方法返回,該實例對象才會被解鎖
	defer num.m.Unlock()
	// 返回安全類型的實例對象中的值
	return num.val
}

func (num *NumLock) Set(val int) {
	// 類似于上面的getNum方法,鎖定num對象直到寫入“num.val”的值完成
	num.m.Lock()
	defer num.m.Unlock()
	num.val = val
}

func getNumByLock() int {
	// 創建一個`NumLock`的示例
	num := &NumLock{}
	// 使用“Set”和“Get”來代替常規的復制修改和讀取值,這樣就可以確保只有在寫操作完成時我們才能進行閱讀,反之亦然
	go func() {
		num.Set(53)
	}()
	time.Sleep(500)
	return num.Get()
}

這里要注意,我們無法保證最后取得的num值
當有多個寫入和讀取操作混合在一起時,使用Mutex互斥可以保證讀寫的值與預期結果一致

附上結果:

完整代碼:

package main

import (
	"fmt"
	"strconv"
	"sync"
	"testing"
	"time"
)

func Test(t *testing.T) {
	fmt.Print("getNum(): ")
	for i := 0; i < 10; i++ {
		fmt.Print(strconv.Itoa(getNum()) + " ")
	}
	fmt.Println()
	fmt.Print("getNumByWaitGroup(): ")
	for i := 0; i < 10; i++ {
		fmt.Print(strconv.Itoa(getNumByWaitGroup()) + " ")
	}
	fmt.Println()
	fmt.Print("getNumByChannel(): ")
	for i := 0; i < 10; i++ {
		fmt.Print(strconv.Itoa(getNumByChannel()) + " ")
	}
	fmt.Println()
	fmt.Print("getNumByChan(): ")
	for i := 0; i < 10; i++ {
		fmt.Print(strconv.Itoa(<-getNumByChan()) + " ")
	}
	fmt.Println()
	fmt.Print("getNumByLock(): ")
	for i := 0; i < 10; i++ {
		fmt.Print(strconv.Itoa(getNumByLock()) + " ")
	}
	fmt.Println()
	fmt.Print("getFact(): ")
	fmt.Println(getFact())
	fmt.Println()
}
func getNum() int {
	var num int
	go func() {
		num = 53
	}()
	time.Sleep(500)
	return num
}

func getNumByWaitGroup() int {
	var num int
	var wg sync.WaitGroup
	wg.Add(1) // 表示有一個任務需要等待,等待任務數+1
	go func() {
		num = 53
		wg.Done() // 完成一個處于等待隊列的任務,等待任務-1

		// Done decrements the WaitGroup counter by one.
		// func (wg *WaitGroup) Done() {
		//	wg.Add(-1)
		//}

	}()
	wg.Wait() // 阻塞等待,直到等待隊列的任務數為0
	return num
}

func getNumByChannel() int {
	var num int
	ch := make(chan struct{}) // 創建一個類型為結構體的channel,并初始化為空
	go func() {
		num = 53
		ch <- struct{}{} // 推送一個空結構體到ch
	}()
	<-ch // 使程序處于阻塞狀態,直到ch獲取到推送的值
	return num
}

func getNumByChan() <-chan int {
	var num int
	ch := make(chan int) // 創建一個類型為int的channel
	go func() {
		num = 53
		ch <- num // 推送一個int到ch
	}()

	return ch // 返回chan
}

// 首先,創建一個結構體,其中包含我們想要返回的值以及一個互斥實例
type NumLock struct {
	val int
	m   sync.Mutex
}

func (num *NumLock) Get() int {
	// The `Lock` method of the mutex blocks if it is already locked
	// if not, then it blocks other calls until the `Unlock` method is called
	// Lock方法
	// 調用結構體對象的Lock方法將會鎖定該對象中的變量;如果沒有,將會阻塞其他調用,直到該互斥對象的Unlock方法被調用

	num.m.Lock()
	// 直到該方法返回,該實例對象才會被解鎖
	defer num.m.Unlock()
	// 返回安全類型的實例對象中的值
	return num.val
}

func (num *NumLock) Set(val int) {
	// 類似于上面的getNum方法,鎖定num對象直到寫入“num.val”的值完成
	num.m.Lock()
	defer num.m.Unlock()
	num.val = val
}

func getNumByLock() int {
	// 創建一個`NumLock`的示例
	num := &NumLock{}
	// 使用“Set”和“Get”來代替常規的復制修改和讀取值,這樣就可以確保只有在寫操作完成時我們才能進行閱讀,反之亦然
	go func() {
		num.Set(53)
	}()
	time.Sleep(500)
	return num.Get()
}

func getFact() []string {
	ch := make(chan string)
	//defer close(ch)
	res := make([]string, 0)
	num := &NumLock{}
	go func() {
		for i := 10; i > 0; i-- {
			num.Set(i)
			ch <- strconv.Itoa(num.Get())
		}
		close(ch)
	}()
	for i := range ch {
		res = append(res, i)
	}
	return res
}

典型數據競態

1、循環計數上的競態

func main() {
	var wg sync.WaitGroup
	wg.Add(5)
	for i := 0; i < 5; i++ {
		go func() {
			fmt.Println(i) // Not the 'i' you are looking for.
			wg.Done()
		}()
	}
	wg.Wait()
}

函數文字中的變量i與循環使用的變量相同,因此goroutine中的讀取與循環增量競爭。
(此程序通常打印55555,而不是01234)
該程序可以通過復制變量來修復:

func main() {
	var wg sync.WaitGroup
	wg.Add(5)
	for i := 0; i < 5; i++ {
		go func(j int) {
			fmt.Println(j) // Good. Read local copy of the loop counter.
			wg.Done()
		}(i)
	}
	wg.Wait()
}

2、意外共享變量

func ParallelWrite(data []byte) chan error {
	res := make(chan error, 2)
	f1, err := os.Create("file1")
	if err != nil {
		res <- err
	} else {
		go func() {
			// This err is shared with the main goroutine,
			// so the write races with the write below.
			_, err = f1.Write(data)
			res <- err
			f1.Close()
		}()
	}
	f2, err := os.Create("file2") // The second conflicting write to err.
	if err != nil {
		res <- err
	} else {
		go func() {
			_, err = f2.Write(data)
			res <- err
			f2.Close()
		}()
	}
	return res
}

修復方法是在goroutines中引入新變量(注意使用:=):

	...
	_, err := f1.Write(data)
	...
	_, err := f2.Write(data)
	...

3、無保護的全局變量

如果從幾個goroutine調用以下代碼,則會導致service的map產生競態。同一map的并發讀寫不安全:

var service map[string]net.Addr

func RegisterService(name string, addr net.Addr) {
	service[name] = addr
}

func LookupService(name string) net.Addr {
	return service[name]
}

To make the code safe, protect the accesses with a mutex:

var (
	service   map[string]net.Addr
	serviceMu sync.Mutex
)

func RegisterService(name string, addr net.Addr) {
	serviceMu.Lock()
	defer serviceMu.Unlock()
	service[name] = addr
}

func LookupService(name string) net.Addr {
	serviceMu.Lock()
	defer serviceMu.Unlock()
	return service[name]
}

4、原始無保護變量

數據競態也可以發生在原始類型的變量上(bool、int、int64等)

type Watchdog struct{ last int64 }

func (w *Watchdog) KeepAlive() {
	w.last = time.Now().UnixNano() // First conflicting access.
}

func (w *Watchdog) Start() {
	go func() {
		for {
			time.Sleep(time.Second)
			// Second conflicting access.
			if w.last < time.Now().Add(-10*time.Second).UnixNano() {
				fmt.Println("No keepalives for 10 seconds. Dying.")
				os.Exit(1)
			}
		}
	}()
}

即使這種“無辜”的數據競爭也可能導致因內存訪問的非原子性、干擾編譯器優化或訪問處理器內存的重新排序問題而導致難以調試的問題。

這場比賽的一個典型修復方法是使用通道或互斥體。為了保持無鎖行為,也可以使用sync/atomic包

type Watchdog struct{ last int64 }

func (w *Watchdog) KeepAlive() {
	atomic.StoreInt64(&w.last, time.Now().UnixNano())
}

func (w *Watchdog) Start() {
	go func() {
		for {
			time.Sleep(time.Second)
			if atomic.LoadInt64(&w.last) < time.Now().Add(-10*time.Second).UnixNano() {
				fmt.Println("No keepalives for 10 seconds. Dying.")
				os.Exit(1)
			}
		}
	}()
}

5、未同步的發送和關閉操作

同一通道上的非同步發送和關閉操作也可能是一個競態條件

c := make(chan struct{}) // or buffered channel

// The race detector cannot derive the happens before relation
// for the following send and close operations. These two operations
// are unsynchronized and happen concurrently.
go func() { c <- struct{}{} }()
close(c)

根據Go內存模型,通道上的發送發生在該通道的相應接收完成之前。要同步發送和關閉操作,請使用接收操作來保證發送在關閉前完成:

c := make(chan struct{}) // or buffered channel

go func() { c <- struct{}{} }()
<-c
close(c)

原文鏈接:https://blog.csdn.net/qq_45366447/article/details/128871688

欄目分類
最近更新