日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

go實現分布式鎖

作者:.番茄炒蛋 更新時間: 2022-07-03 編程語言

簡介

本文代碼地址

本文以扣減庫存為例,分別實現進程鎖;mysql的悲觀鎖;樂觀鎖以及redis的分布式鎖

CREATE TABLE `stocks` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  `goods` varchar(20) DEFAULT NULL COMMENT '商品id',
  `stocks` int(11) DEFAULT NULL COMMENT '庫存',
  `version` int(11) DEFAULT NULL COMMENT '樂觀鎖',
  PRIMARY KEY (`id`),
  KEY `idx_stocks_goods` (`goods`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

在這里插入圖片描述

無鎖

service

package service

import (
	context "context"
	"go-locks/no-lock/db"
	"go-locks/no-lock/model"
	"go-locks/no-lock/proto"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
	"google.golang.org/protobuf/types/known/emptypb"
)

type Server struct{}

func (s Server) SellStock(ctx context.Context, request *proto.StockRequest) (*emptypb.Empty, error) {
	tx := db.DB.Begin()
	for _, info := range request.StockInfos {
		var stock model.Stock
		if result := tx.Where(&model.Stock{Goods: info.GoodsId}).First(&stock); result.RowsAffected == 0 {
			return nil, status.Error(codes.NotFound, "商品信息不存在")
		}
		if stock.Stocks < info.Num {
			// 庫存不足 回滾事務
			tx.Rollback()
			return nil, status.Error(codes.ResourceExhausted, "庫存不足")
		}
		stock.Stocks -= info.Num
		tx.Save(stock)
	}
	tx.Commit()
	return &emptypb.Empty{}, nil
}

client

package main

import (
	"context"
	"go-locks/no-lock/proto"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"sync"
)

var client proto.StockClient

func main() {
	conn, err := grpc.Dial("127.0.0.1:8088", grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		panic(err)
	}
	client = proto.NewStockClient(conn)
	var wg sync.WaitGroup
	wg.Add(20)
	for i := 0; i < 20; i++ {
		go TestSellStock(&wg)
	}
	wg.Wait()
}

func TestSellStock(wg *sync.WaitGroup) {
	defer wg.Done()
	_, err := client.SellStock(context.Background(), &proto.StockRequest{
		StockInfos: []*proto.StockInfo{
			{
				GoodsId: "123456",
				Num:     1,
			},
		},
	})
	if err != nil {
		panic(err)
	}
}

測試

在這里插入圖片描述
開啟20gorouinte去扣減123456的庫存;正常情況下123456的庫存應該剩余480件,但由于我們沒有進行加鎖,導致庫存還剩485件.這種情況在真實場景下是絕對不能接受的

進程鎖

service

package service

import (
	context "context"
	"go-locks/process-lock/db"
	"go-locks/process-lock/model"
	"go-locks/process-lock/proto"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
	"google.golang.org/protobuf/types/known/emptypb"
	"sync"
)

type Server struct{}

var mutex sync.Mutex

func (s Server) SellStock(ctx context.Context, request *proto.StockRequest) (*emptypb.Empty, error) {
	// 加鎖
	mutex.Lock()
	tx := db.DB.Begin()
	for _, info := range request.StockInfos {
		var stock model.Stock
		if result := tx.Where(&model.Stock{Goods: info.GoodsId}).First(&stock); result.RowsAffected == 0 {
			return nil, status.Error(codes.NotFound, "商品信息不存在")
		}
		if stock.Stocks < info.Num {
			// 庫存不足 回滾事務
			tx.Rollback()
			return nil, status.Error(codes.ResourceExhausted, "庫存不足")
		}
		stock.Stocks -= info.Num
		tx.Save(stock)
	}
	tx.Commit()
	// 解鎖
	mutex.Unlock()
	return &emptypb.Empty{}, nil
}

client

package main

import (
	"context"
	"go-locks/process-lock/proto"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"sync"
)

var client proto.StockClient

func main() {
	conn, err := grpc.Dial("127.0.0.1:8088", grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		panic(err)
	}
	client = proto.NewStockClient(conn)
	var wg sync.WaitGroup
	wg.Add(20)
	for i := 0; i < 20; i++ {
		go TestSellStock(&wg)
	}
	wg.Wait()
}

func TestSellStock(wg *sync.WaitGroup) {
	defer wg.Done()
	_, err := client.SellStock(context.Background(), &proto.StockRequest{
		StockInfos: []*proto.StockInfo{
			{
				GoodsId: "123457",
				Num:     1,
			},
		},
	})
	if err != nil {
		panic(err)
	}
}

測試

在這里插入圖片描述
單從這次結果上看是沒有什么問題,但其實還是有問題的:

  1. 這里使用的鎖,他是鎖住的整塊代碼,不管進來的是那個商品都會要等待釋放鎖才能去獲取鎖執行,有很嚴重的性能問題
  2. 這里使用到的鎖,是進程級別的鎖,是go語言提供的鎖,但是在真實的場景下都是多實例部署的,在多實例場景下,仍然會出現無鎖時的問題

mysql悲觀鎖

悲觀鎖總是假設最壞的情況,每次去拿數據的時候都認為別人會修改,所以每次在拿數據的時候都會上鎖,這樣別人想拿這個數據就會阻塞直到它拿到鎖

service

package service

import (
	context "context"
	"go-locks/pessimistic-lock/db"
	"go-locks/pessimistic-lock/model"
	"go-locks/pessimistic-lock/proto"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
	"google.golang.org/protobuf/types/known/emptypb"
	"gorm.io/gorm/clause"
)

type Server struct{}

func (s Server) SellStock(ctx context.Context, request *proto.StockRequest) (*emptypb.Empty, error) {
	tx := db.DB.Begin()
	for _, info := range request.StockInfos {
		var stock model.Stock
		// 通過for update 語句實現mysql的悲觀鎖
		if result := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where(&model.Stock{Goods: info.GoodsId}).First(&stock); result.RowsAffected == 0 {
			return nil, status.Error(codes.NotFound, "商品信息不存在")
		}
		if stock.Stocks < info.Num {
			// 庫存不足 回滾事務
			tx.Rollback()
			return nil, status.Error(codes.ResourceExhausted, "庫存不足")
		}
		stock.Stocks -= info.Num
		tx.Save(stock)
	}
	tx.Commit()
	return &emptypb.Empty{}, nil
}

client

package main

import (
	"context"
	"go-locks/pessimistic-lock/proto"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"sync"
)

var client proto.StockClient

func main() {
	conn, err := grpc.Dial("127.0.0.1:8088", grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		panic(err)
	}
	client = proto.NewStockClient(conn)
	var wg sync.WaitGroup
	wg.Add(20)
	for i := 0; i < 20; i++ {
		go TestSellStock(&wg)
	}
	wg.Wait()
}

func TestSellStock(wg *sync.WaitGroup) {
	defer wg.Done()
	_, err := client.SellStock(context.Background(), &proto.StockRequest{
		StockInfos: []*proto.StockInfo{
			{
				GoodsId: "123458",
				Num:     1,
			},
		},
	})
	if err != nil {
		panic(err)
	}
}

測試

在這里插入圖片描述

mysql樂觀鎖

樂觀鎖顧名思義,就是很樂觀,每次去拿數據的時候都認為別人不會修改,所以不會上鎖,但是在更新的時候會判斷一下在此期間別人有沒有去更新這個數據,可以使用版本號等機制。樂觀鎖適用于多讀的應用類型,這樣可以提高吞吐量,

service

package service

import (
	context "context"
	"go-locks/optimistic-lock/db"
	"go-locks/optimistic-lock/model"
	"go-locks/optimistic-lock/proto"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
	"google.golang.org/protobuf/types/known/emptypb"
	"log"
)

type Server struct{}

func (s Server) SellStock(ctx context.Context, request *proto.StockRequest) (*emptypb.Empty, error) {
	tx := db.DB.Begin()
	for _, info := range request.StockInfos {
		var stock model.Stock
		for {
			if result := db.DB.Where(&model.Stock{Goods: info.GoodsId}).First(&stock); result.RowsAffected == 0 {
				return nil, status.Error(codes.NotFound, "商品信息不存在")
			}
			if stock.Stocks < info.Num {
				// 庫存不足 回滾事務
				tx.Rollback()
				return nil, status.Error(codes.ResourceExhausted, "庫存不足")
			}
			stock.Stocks -= info.Num
			if result := tx.Model(&model.Stock{}).Select("Stocks", "Version").
				Where("goods = ? AND version = ?", info.GoodsId, stock.Version).Updates(&model.Stock{Stocks: stock.Stocks, Version: stock.Version + 1}); result.RowsAffected == 0 {
				// version 字段沖突;扣減失敗
				log.Println("庫存扣減失敗;重試")
			} else {
				// 庫存扣減成功
				log.Println("庫存扣減成功")
				break
			}
		}
	}
	tx.Commit()
	return &emptypb.Empty{}, nil
}

client

package main

import (
	"context"
	"go-locks/optimistic-lock/proto"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"sync"
)

var client proto.StockClient

func main() {
	conn, err := grpc.Dial("127.0.0.1:8088", grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		panic(err)
	}
	client = proto.NewStockClient(conn)
	var wg sync.WaitGroup
	wg.Add(20)
	for i := 0; i < 20; i++ {
		go TestSellStock(&wg)
	}
	wg.Wait()
}

func TestSellStock(wg *sync.WaitGroup) {
	defer wg.Done()
	_, err := client.SellStock(context.Background(), &proto.StockRequest{
		StockInfos: []*proto.StockInfo{
			{
				GoodsId: "123459",
				Num:     1,
			},
		},
	})
	if err != nil {
		panic(err)
	}
}

測試

在這里插入圖片描述

redis分布式鎖

service

package service

import (
	context "context"
	"fmt"
	"go-locks/redis-lock/db"
	"go-locks/redis-lock/model"
	"go-locks/redis-lock/proto"
	"go-locks/redis-lock/redis"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
	"google.golang.org/protobuf/types/known/emptypb"
)

type Server struct{}

func (s Server) SellStock(ctx context.Context, request *proto.StockRequest) (*emptypb.Empty, error) {
	tx := db.DB.Begin()
	for _, info := range request.StockInfos {
		var stock model.Stock
		// 使用redis分布式鎖,僅對當前商品進行加鎖,不會影響其他商品
		mutex := redis.Redsy.NewMutex(fmt.Sprintf("goods_%s", info.GoodsId))
		if err := mutex.Lock(); err != nil {
			return nil, status.Error(codes.Internal, "獲取redis分布式鎖異常")
		}
		if result := tx.Where(&model.Stock{Goods: info.GoodsId}).First(&stock); result.RowsAffected == 0 {
			return nil, status.Error(codes.NotFound, "商品信息不存在")
		}
		if stock.Stocks < info.Num {
			// 庫存不足 回滾事務
			tx.Rollback()
			return nil, status.Error(codes.ResourceExhausted, "庫存不足")
		}
		stock.Stocks -= info.Num
		tx.Save(stock)
		if ok, err := mutex.Unlock(); !ok || err != nil {
			return nil, status.Error(codes.Internal, "釋放redis分布式鎖異常")
		}
	}
	tx.Commit()
	return &emptypb.Empty{}, nil
}

client

package main

import (
	"context"
	"go-locks/redis-lock/proto"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"sync"
)

var client proto.StockClient

func main() {
	conn, err := grpc.Dial("127.0.0.1:8088", grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		panic(err)
	}
	client = proto.NewStockClient(conn)
	var wg sync.WaitGroup
	wg.Add(20)
	for i := 0; i < 20; i++ {
		go TestSellStock(&wg)
	}
	wg.Wait()
}

func TestSellStock(wg *sync.WaitGroup) {
	defer wg.Done()
	_, err := client.SellStock(context.Background(), &proto.StockRequest{
		StockInfos: []*proto.StockInfo{
			{
				GoodsId: "123460",
				Num:     1,
			},
		},
	})
	if err != nil {
		panic(err)
	}
}

測試

在這里插入圖片描述

總結

  • 無鎖: 無鎖即使在單機情況下也會出問題,不建議使用
  • 進程鎖: 進程鎖僅在單機情況下安全,性能存在瓶頸
  • mysql悲觀鎖: 分布式環境下安全.比較適合寫入操作比較頻繁的場景,如果出現大量的讀取操作,每次讀取的時候都會進行加鎖,這樣會增加大量的鎖的開銷,降低了系統的吞吐量.在特殊情況下還會升級成表鎖,分布式環境下安全,但是性能依然存在瓶頸
  • mysql樂觀鎖: 分布式環境下安全,比較適合讀取操作比較頻繁的場景,如果出現大量的寫入操作,數據發生沖突的可能性就會增大,為了保證數據的一致性,應用層需要不斷的重新獲取數據,這樣會增加大量的查詢操作,降低了系統的吞吐量.
  • redis分布式鎖: 分布式環境下安全,并且redis有很好的性能,而且可以對單個商品進行加鎖,只會阻塞住對同一商品的請求,并不會阻塞所有請求,大大提升了吞吐量

原文鏈接:https://blog.csdn.net/qq_43135259/article/details/125508990

欄目分類
最近更新