網站首頁 編程語言 正文
前言:
在分布式場景下,相信你或多或少需要使用分布式鎖來訪問臨界資源,或者控制耗時操作的并發性。
當然,實現分布式鎖的方案也比較多,比如數據庫、redis、zk 等等。本文主要結合一個線上案例,講解 redis 分布式鎖的相關實現。
一、問題描述:
某天線上出現了數據重復處理問題,經排查后發現,竟然是單次處理時間較長,redis 分布式鎖提前釋放
導致相同請求并發處理。
其實,這是一個鎖續約
的問題,對于一把分布式鎖,我們需要考慮,設置鎖多長時間過期、出現異常如何釋放鎖?
以上問題便是本文要討論的主題。
二、原因分析:
? ? ? 項目采用較簡單的自定義 redis 分布式鎖,為避免死鎖定義默認過期時間 10s,如下:
override fun lock() { while (true) { //嘗試獲取鎖 if (tryLock()) { return } try { Thread.sleep(10) } catch (e: InterruptedException) { e.printStackTrace() } } } override fun tryLock(): Boolean { val value = getUniqueSign() // 隨機串 val flag = redisTemplate!!.opsForValue().setIfAbsent(name, value, 10000, TimeUnit.MILLISECONDS) if (flag != null && flag) { VALUE_lOCAL.set(value) INTO_NUM_LOCAL.set(if (INTO_NUM_LOCAL.get() != null) INTO_NUM_LOCAL.get() + 1 else 1) return true } return false }
缺乏對鎖自動續期等實現。
三、解決方案:
1、思考:?
針對這種場景,可以考慮的是如何給鎖自動續期-當業務沒有執行結束的情況下,當然也可以自定義實現 比如開一個后臺線程定時的給這些拿到鎖的線程續期。
Redisson 也正是基于這種思路實現自動續期的分布式鎖,各種異常情況也考慮的更加完善,綜合考慮采用 Redisson 的分布式鎖解決方案優化。
2、Redisson簡單配置:
@Configuration @EnableConfigurationProperties(RedissonProperties::class) class RedissonConfig { @Bean fun redissonClient(redissonProperties: RedissonProperties): RedissonClient { val config = Config() val singleServerConfig = redissonProperties.singleServerConfig!! config.useSingleServer().setAddress(singleServerConfig.address) .setDatabase(singleServerConfig.database) .setUsername(singleServerConfig.username) .setPassword(singleServerConfig.password) .setConnectionPoolSize(singleServerConfig.connectionPoolSize) .setConnectionMinimumIdleSize(singleServerConfig.connectionMinimumIdleSize) .setConnectTimeout(singleServerConfig.connectTimeout) .setIdleConnectionTimeout(singleServerConfig.idleConnectionTimeout) .setRetryInterval(singleServerConfig.retryInterval) .setRetryAttempts(singleServerConfig.retryAttempts) .setTimeout(singleServerConfig.timeout) return Redisson.create(config) } } @ConfigurationProperties(prefix = "xxx.redisson") class RedissonProperties { var singleServerConfig: SingleServerConfig? = null }
Redis 服務使用的騰訊云的哨兵模式架構,此架構對外開放一個代理地址訪問,因此這里配置單機模式配置即可。
如果你是自己搭建的 redis 哨兵模式架構,需要按照文檔配置相關必要參數
3、使用樣例:
... @Autowired lateinit var redissonClient: RedissonClient ... fun xxx() { ... val lock = redissonClient.getLock("mylock") lock.lock() try { ... } finally { lock.unlock() } ... }
使用方式和JDK提供的鎖是不是很像?是不是很簡單?
正是Redisson這類優秀的開源產品的出現,才讓我們將更多的時間投入到業務開發中...
四、源碼分析
下面來看看 Redisson 對常規分布式鎖的實現,主要分析 RedissonLock
1、lock加鎖操作
@Override public void lock() { try { lock(-1, null, false); } catch (InterruptedException e) { throw new IllegalStateException(); } } // 租約期限, 也就是expire時間, -1代表未設置 將使用系統默認的30s private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { // 嘗試拿鎖, 如果能拿到就直接返回 long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(-1, leaseTime, unit, threadId); // lock acquired if (ttl == null) { return; } RFuture<RedissonLockEntry> future = subscribe(threadId); if (interruptibly) { commandExecutor.syncSubscriptionInterrupted(future); } else { commandExecutor.syncSubscription(future); } // 如果拿不到鎖就嘗試一直輪循, 直到成功獲取鎖或者異常終止 try { while (true) { ttl = tryAcquire(-1, leaseTime, unit, threadId); // lock acquired if (ttl == null) { break; } ... } } finally { unsubscribe(future, threadId); } }
1.1、tryAcquire
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) { return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId)); } private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { RFuture<Long> ttlRemainingFuture; // 調用真正獲取鎖的操作 if (leaseTime != -1) { ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else { ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); } ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null) { return; } // lock acquired // 這里是成功獲取了鎖, 嘗試給鎖續約 if (ttlRemaining == null) { if (leaseTime != -1) { internalLockLeaseTime = unit.toMillis(leaseTime); } else { scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; } // 通過lua腳本真正執行加鎖的操作 <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { // 如果key不存在, 那正好, 直接set并設置過期時間 // 如果key存在, 就有兩種情況需要考慮 // - 同一線程獲取重入鎖,直接將field(也就是getLockName(threadId))對應的value值+1 // - 不同線程競爭鎖, 此次加鎖失敗, 并直接返回此key對應的過期時間 return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId)); }
1.2、續約
通過 scheduleExpirationRenewal 給鎖續約
protected void scheduleExpirationRenewal(long threadId) { ExpirationEntry entry = new ExpirationEntry(); ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry); if (oldEntry != null) { oldEntry.addThreadId(threadId); } else { entry.addThreadId(threadId); // 續約操作 renewExpiration(); } } private void renewExpiration() { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; } // 設置延遲任務task, 在時長internalLockLeaseTime/3之后執行, 定期給鎖續期 Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } // 真正執行續期命令操作 RFuture<Boolean> future = renewExpirationAsync(threadId); future.onComplete((res, e) -> { if (e != null) { log.error("Can't update lock " + getRawName() + " expiration", e); EXPIRATION_RENEWAL_MAP.remove(getEntryName()); return; } // 這次續期之后, 繼續schedule自己, 達到持續續期的效果 if (res) { // reschedule itself renewExpiration(); } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); ee.setTimeout(task); } // 所謂續期, 就是將expire過期時間再延長 protected RFuture<Boolean> renewExpirationAsync(long threadId) { // 如果key以及當前線程存在, 則延長expire時間, 并返回1代表成功;否則返回0代表失敗 return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.singletonList(getRawName()), internalLockLeaseTime, getLockName(threadId)); }
2、unlock解鎖操作
public void unlock() { try { get(unlockAsync(Thread.currentThread().getId())); } catch (RedisException e) { ... } } public RFuture<Void> unlockAsync(long threadId) { RPromise<Void> result = new RedissonPromise<>(); // 執行解鎖操作 RFuture<Boolean> future = unlockInnerAsync(threadId); // 操作成功之后做的事 future.onComplete((opStatus, e) -> { // 取消續約task cancelExpirationRenewal(threadId); ... }); return result; } protected RFuture<Boolean> unlockInnerAsync(long threadId) { // 如果key以及當前線程對應的記錄已經不存在, 直接返回空 // 否在將field(也就是getLockName(threadId))對應的value減1 // - 如果減去1之后值還大于0, 那么重新延長過期時間 // - 如果減去之后值小于等于0, 那么直接刪除key, 并發布訂閱消息 return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return nil;", Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); }
以上便是 redisson 客戶端工具對 redis 分布式鎖的加/解鎖具體實現,主要解決了以下幾個問題
? ? 1、死鎖問題:設置過期時間
? ? 2、可重入問題:重入+1, 釋放鎖-1,當值=0時代表完全釋放鎖
? ? 3、續約問題:可解決鎖提前釋放問題
? ? 4、鎖釋放:誰加鎖就由誰來釋放
總結:
本文由一個線上問題做引子,通過 redis 分布式鎖的常用實現方案,最終選定 redisson 的解決方案; 并分析 redisson 的具體實現細節
相關參考:
- Redisson官方文檔 - 分布式鎖和同步器
- Redisson官方文檔 - 配置方法
- CSDN - 如何使用Redis實現分布式鎖?
原文鏈接:https://blog.csdn.net/ldw201510803006/article/details/118141065
相關推薦
- 2022-05-11 Synchronized鎖優化
- 2022-07-14 React父子組件傳值(組件通信)的實現方法_React
- 2024-01-07 SpringSecurity Oauth2 解決 The bean ‘metaDataSourceA
- 2022-06-18 C++?詳細講解對象的構造順序_C 語言
- 2023-02-26 React?Fiber原理深入分析_React
- 2022-08-17 windows?server2008?開啟端口的實現方法_win服務器
- 2021-10-24 Linux多線程中fork與互斥鎖過程示例_Linux
- 2021-12-09 銀河麒麟4.0.2(Ubuntu)擴展boot分區過程介紹_Linux
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支