網(wǎng)站首頁 編程語言 正文
一、前提
最近在使用分布式鎖redisson時遇到一個線上問題:發(fā)現(xiàn)是subscriptionsPerConnection or subscriptionConnectionPoolSize
的大小不夠,需要提高配置才能解決。
二、源碼分析
下面對其源碼進(jìn)行分析,才能找到到底是什么邏輯導(dǎo)致問題所在:
1、RedissonLock#lock() 方法
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { ? ? ? ? long threadId = Thread.currentThread().getId(); ? ? ? ? // 嘗試獲取,如果ttl == null,則表示獲取鎖成功 ? ? ? ? Long ttl = tryAcquire(leaseTime, unit, threadId); ? ? ? ? // lock acquired ? ? ? ? if (ttl == null) { ? ? ? ? ? ? return; ? ? ? ? } ? ? ? ? // 訂閱鎖釋放事件,并通過await方法阻塞等待鎖釋放,有效的解決了無效的鎖申請浪費(fèi)資源的問題 ? ? ? ? RFuture<RedissonLockEntry> future = subscribe(threadId); ? ? ? ? if (interruptibly) { ? ? ? ? ? ? commandExecutor.syncSubscriptionInterrupted(future); ? ? ? ? } else { ? ? ? ? ? ? commandExecutor.syncSubscription(future); ? ? ? ? } ? ? ? ? // 后面代碼忽略 ? ? ? ? try { ? ? ? ? ? ? // 無限循環(huán)獲取鎖,直到獲取鎖成功 ? ? ? ? ? ? // ... ? ? ? ? } finally { ? ? ? ? ? ? // 取消訂閱鎖釋放事件 ? ? ? ? ? ? unsubscribe(future, threadId); ? ? ? ? } }
總結(jié)下主要邏輯:
- 獲取當(dāng)前線程的線程id;
- tryAquire嘗試獲取鎖,并返回ttl
- 如果ttl為空,則結(jié)束流程;否則進(jìn)入后續(xù)邏輯;
- this.subscribe(threadId)訂閱當(dāng)前線程,返回一個RFuture;
- 如果在指定時間沒有監(jiān)聽到,則會產(chǎn)生如上異常。
- 訂閱成功后, 通過while(true)循環(huán),一直嘗試獲取鎖
- fially代碼塊,會解除訂閱
所以上述這情況問題應(yīng)該出現(xiàn)在subscribe()方法中
2、詳細(xì)看下subscribe()方法
protected RFuture<RedissonLockEntry> subscribe(long threadId) { ? ? // entryName 格式:“id:name”; ? ? // channelName 格式:“redisson_lock__channel:name”; ? ? return pubSub.subscribe(getEntryName(), getChannelName()); }
RedissonLock#pubSub 是在RedissonLock構(gòu)造函數(shù)中初始化的:
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) { ? ? // .... ? ? this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub(); }
而subscribeService在MasterSlaveConnectionManager的實現(xiàn)中又是通過如下方式構(gòu)造的
public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config, UUID id) { ? ? this(config, id); ? ? this.config = cfg; ? ? // 初始化 ? ? initTimer(cfg); ? ? initSingleEntry(); } protected void initTimer(MasterSlaveServersConfig config) { ? ? int[] timeouts = new int[]{config.getRetryInterval(), config.getTimeout()}; ? ? Arrays.sort(timeouts); ? ? int minTimeout = timeouts[0]; ? ? if (minTimeout % 100 != 0) { ? ? ? ? minTimeout = (minTimeout % 100) / 2; ? ? } else if (minTimeout == 100) { ? ? ? ? minTimeout = 50; ? ? } else { ? ? ? ? minTimeout = 100; ? ? } ? ? timer = new HashedWheelTimer(new DefaultThreadFactory("redisson-timer"), minTimeout, TimeUnit.MILLISECONDS, 1024, false); ? ? connectionWatcher = new IdleConnectionWatcher(this, config); ? ? // 初始化:其中this就是MasterSlaveConnectionManager實例,config則為MasterSlaveServersConfig實例: ? ? subscribeService = new PublishSubscribeService(this, config); }
PublishSubscribeService構(gòu)造函數(shù)
private final SemaphorePubSub semaphorePubSub = new SemaphorePubSub(this); public PublishSubscribeService(ConnectionManager connectionManager, MasterSlaveServersConfig config) { ? ? super(); ? ? this.connectionManager = connectionManager; ? ? this.config = config; ? ? for (int i = 0; i < locks.length; i++) { ? ? ? ? // 這里初始化了一組信號量,每個信號量的初始值為1 ? ? ? ? locks[i] = new AsyncSemaphore(1); ? ? } }
3、回到subscribe()方法主要邏輯還是交給了 LockPubSub#subscribe()里面
private final ConcurrentMap<String, E> entries = new ConcurrentHashMap<>(); public RFuture<E> subscribe(String entryName, String channelName) { ? ? ? // 從PublishSubscribeService獲取對應(yīng)的信號量。 相同的channelName獲取的是同一個信號量 ? ? ?// public AsyncSemaphore getSemaphore(ChannelName channelName) { ? ? // ? ?return locks[Math.abs(channelName.hashCode() % locks.length)]; ? ? // } ? ? AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName)); ? ? AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>(); ? ? ? ? RPromise<E> newPromise = new RedissonPromise<E>() { ? ? ? ? @Override ? ? ? ? public boolean cancel(boolean mayInterruptIfRunning) { ? ? ? ? ? ? return semaphore.remove(listenerHolder.get()); ? ? ? ? } ? ? }; ? ? Runnable listener = new Runnable() { ? ? ? ? @Override ? ? ? ? public void run() { ? ? ? ? ? ? // ?如果存在RedissonLockEntry, 則直接利用已有的監(jiān)聽 ? ? ? ? ? ? E entry = entries.get(entryName); ? ? ? ? ? ? if (entry != null) { ? ? ? ? ? ? ? ? entry.acquire(); ? ? ? ? ? ? ? ? semaphore.release(); ? ? ? ? ? ? ? ? entry.getPromise().onComplete(new TransferListener<E>(newPromise)); ? ? ? ? ? ? ? ? return; ? ? ? ? ? ? } ? ? ? ? ? ? E value = createEntry(newPromise); ? ? ? ? ? ? value.acquire(); ? ? ? ? ? ? E oldValue = entries.putIfAbsent(entryName, value); ? ? ? ? ? ? if (oldValue != null) { ? ? ? ? ? ? ? ? oldValue.acquire(); ? ? ? ? ? ? ? ? semaphore.release(); ? ? ? ? ? ? ? ? oldValue.getPromise().onComplete(new TransferListener<E>(newPromise)); ? ? ? ? ? ? ? ? return; ? ? ? ? ? ? } ? ? ? ? ? ? // 創(chuàng)建監(jiān)聽, ? ? ? ? ? ? RedisPubSubListener<Object> listener = createListener(channelName, value); ? ? ? ? ? ? // 訂閱監(jiān)聽 ? ? ? ? ? ? service.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener); ? ? ? ? } ? ? }; ? ? // 最終會執(zhí)行l(wèi)istener.run方法 ? ? semaphore.acquire(listener); ? ? listenerHolder.set(listener); ? ? return newPromise; }
AsyncSemaphore#acquire()方法
public void acquire(Runnable listener) { ? ? acquire(listener, 1); } public void acquire(Runnable listener, int permits) { ? ? boolean run = false; ? ? synchronized (this) { ? ? ? ? // counter初始化值為1 ? ? ? ? if (counter < permits) { ? ? ? ? ? ? // 如果不是第一次執(zhí)行,則將listener加入到listeners集合中 ? ? ? ? ? ? listeners.add(new Entry(listener, permits)); ? ? ? ? ? ? return; ? ? ? ? } else { ? ? ? ? ? ? counter -= permits; ? ? ? ? ? ? run = true; ? ? ? ? } ? ? } ? ? // 第一次執(zhí)行acquire, 才會執(zhí)行l(wèi)istener.run()方法 ? ? if (run) { ? ? ? ? listener.run(); ? ? } }
梳理上述邏輯:
1、從PublishSubscribeService獲取對應(yīng)的信號量, 相同的channelName獲取的是同一個信號量
2、如果是第一次請求,則會立馬執(zhí)行l(wèi)istener.run()方法, 否則需要等上個線程獲取到該信號量執(zhí)行完方能執(zhí)行;
3、如果已經(jīng)存在RedissonLockEntry, 則利用已經(jīng)訂閱就行
4、如果不存在RedissonLockEntry, 則會創(chuàng)建新的RedissonLockEntry,然后進(jìn)行。
從上面代碼看,主要邏輯是交給了PublishSubscribeService#subscribe方法
4、PublishSubscribeService#subscribe邏輯如下:
private final ConcurrentMap<ChannelName, PubSubConnectionEntry> name2PubSubConnection = new ConcurrentHashMap<>(); private final Queue<PubSubConnectionEntry> freePubSubConnections = new ConcurrentLinkedQueue<>(); public RFuture<PubSubConnectionEntry> subscribe(Codec codec, String channelName, AsyncSemaphore semaphore, RedisPubSubListener<?>... listeners) { ? ? RPromise<PubSubConnectionEntry> promise = new RedissonPromise<PubSubConnectionEntry>(); ? ? // 主要邏輯入口, 這里要主要channelName每次都是新對象, 但內(nèi)部覆寫hashCode+equals。 ? ? subscribe(codec, new ChannelName(channelName), promise, PubSubType.SUBSCRIBE, semaphore, listeners); ? ? return promise; } private void subscribe(Codec codec, ChannelName channelName, ?RPromise<PubSubConnectionEntry> promise, PubSubType type, AsyncSemaphore lock, RedisPubSubListener<?>... listeners) { ? ? PubSubConnectionEntry connEntry = name2PubSubConnection.get(channelName); ? ? if (connEntry != null) { ? ? ? ? // 從已有Connection中取,如果存在直接把listeners加入到PubSubConnectionEntry中 ? ? ? ? addListeners(channelName, promise, type, lock, connEntry, listeners); ? ? ? ? return; ? ? } ? ? // 沒有時,才是最重要的邏輯 ? ? freePubSubLock.acquire(new Runnable() { ? ? ? ? @Override ? ? ? ? public void run() { ? ? ? ? ? ? if (promise.isDone()) { ? ? ? ? ? ? ? ? lock.release(); ? ? ? ? ? ? ? ? freePubSubLock.release(); ? ? ? ? ? ? ? ? return; ? ? ? ? ? ? } ? ? ? ? ? ? // 從隊列中取頭部元素 ? ? ? ? ? ? PubSubConnectionEntry freeEntry = freePubSubConnections.peek(); ? ? ? ? ? ? if (freeEntry == null) { ? ? ? ? ? ? ? ? // 第一次肯定是沒有的需要建立 ? ? ? ? ? ? ? ? connect(codec, channelName, promise, type, lock, listeners); ? ? ? ? ? ? ? ? return; ? ? ? ? ? ? } ? ? ? ? ? ? // 如果存在則嘗試獲取,如果remainFreeAmount小于0則拋出異常終止了。 ? ? ? ? ? ? int remainFreeAmount = freeEntry.tryAcquire(); ? ? ? ? ? ? if (remainFreeAmount == -1) { ? ? ? ? ? ? ? ? throw new IllegalStateException(); ? ? ? ? ? ? } ? ? ? ? ? ? PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, freeEntry); ? ? ? ? ? ? if (oldEntry != null) { ? ? ? ? ? ? ? ? freeEntry.release(); ? ? ? ? ? ? ? ? freePubSubLock.release(); ? ? ? ? ? ? ? ? addListeners(channelName, promise, type, lock, oldEntry, listeners); ? ? ? ? ? ? ? ? return; ? ? ? ? ? ? } ? ? ? ? ? ? // 如果remainFreeAmount=0, 則從隊列中移除 ? ? ? ? ? ? if (remainFreeAmount == 0) { ? ? ? ? ? ? ? ? freePubSubConnections.poll(); ? ? ? ? ? ? } ? ? ? ? ? ? freePubSubLock.release(); ? ? ? ? ? ? // 增加監(jiān)聽 ? ? ? ? ? ? RFuture<Void> subscribeFuture = addListeners(channelName, promise, type, lock, freeEntry, listeners); ? ? ? ? ? ? ChannelFuture future; ? ? ? ? ? ? if (PubSubType.PSUBSCRIBE == type) { ? ? ? ? ? ? ? ? future = freeEntry.psubscribe(codec, channelName); ? ? ? ? ? ? } else { ? ? ? ? ? ? ? ? future = freeEntry.subscribe(codec, channelName); ? ? ? ? ? ? } ? ? ? ? ? ? future.addListener(new ChannelFutureListener() { ? ? ? ? ? ? ? ? @Override ? ? ? ? ? ? ? ? public void operationComplete(ChannelFuture future) throws Exception { ? ? ? ? ? ? ? ? ? ? if (!future.isSuccess()) { ? ? ? ? ? ? ? ? ? ? ? ? if (!promise.isDone()) { ? ? ? ? ? ? ? ? ? ? ? ? ? ? subscribeFuture.cancel(false); ? ? ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ? ? ? ? return; ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ? ? connectionManager.newTimeout(new TimerTask() { ? ? ? ? ? ? ? ? ? ? ? ? @Override ? ? ? ? ? ? ? ? ? ? ? ? public void run(Timeout timeout) throws Exception { ? ? ? ? ? ? ? ? ? ? ? ? ? ? subscribeFuture.cancel(false); ? ? ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ? ? }, config.getTimeout(), TimeUnit.MILLISECONDS); ? ? ? ? ? ? ? ? } ? ? ? ? ? ? }); ? ? ? ? } ? ? }); } private void connect(Codec codec, ChannelName channelName, RPromise<PubSubConnectionEntry> promise, PubSubType type, AsyncSemaphore lock, RedisPubSubListener<?>... listeners) { ? ? // 根據(jù)channelName計算出slot獲取PubSubConnection ? ? int slot = connectionManager.calcSlot(channelName.getName()); ? ? RFuture<RedisPubSubConnection> connFuture = nextPubSubConnection(slot); ? ? promise.onComplete((res, e) -> { ? ? ? ? if (e != null) { ? ? ? ? ? ? ((RPromise<RedisPubSubConnection>) connFuture).tryFailure(e); ? ? ? ? } ? ? }); ? ? connFuture.onComplete((conn, e) -> { ? ? ? ? if (e != null) { ? ? ? ? ? ? freePubSubLock.release(); ? ? ? ? ? ? lock.release(); ? ? ? ? ? ? promise.tryFailure(e); ? ? ? ? ? ? return; ? ? ? ? } ? ? ? ? // 這里會從配置中讀取subscriptionsPerConnection ? ? ? ? PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection()); ? ? ? ? // 每獲取一次,subscriptionsPerConnection就會減直到為0 ? ? ? ? int remainFreeAmount = entry.tryAcquire(); ? ? ? ? // 如果舊的存在,則將現(xiàn)有的entry釋放,然后將listeners加入到oldEntry中 ? ? ? ? PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry); ? ? ? ? if (oldEntry != null) { ? ? ? ? ? ? releaseSubscribeConnection(slot, entry); ? ? ? ? ? ? freePubSubLock.release(); ? ? ? ? ? ? addListeners(channelName, promise, type, lock, oldEntry, listeners); ? ? ? ? ? ? return; ? ? ? ? } ? ? ? ? if (remainFreeAmount > 0) { ? ? ? ? ? ? // 加入到隊列中 ? ? ? ? ? ? freePubSubConnections.add(entry); ? ? ? ? } ? ? ? ? freePubSubLock.release(); ? ? ? ? RFuture<Void> subscribeFuture = addListeners(channelName, promise, type, lock, entry, listeners); ? ? ? ? // 這里真正的進(jìn)行訂閱(底層與redis交互) ? ? ? ? ChannelFuture future; ? ? ? ? if (PubSubType.PSUBSCRIBE == type) { ? ? ? ? ? ? future = entry.psubscribe(codec, channelName); ? ? ? ? } else { ? ? ? ? ? ? future = entry.subscribe(codec, channelName); ? ? ? ? } ? ? ? ? future.addListener(new ChannelFutureListener() { ? ? ? ? ? ? @Override ? ? ? ? ? ? public void operationComplete(ChannelFuture future) throws Exception { ? ? ? ? ? ? ? ? if (!future.isSuccess()) { ? ? ? ? ? ? ? ? ? ? if (!promise.isDone()) { ? ? ? ? ? ? ? ? ? ? ? ? subscribeFuture.cancel(false); ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ? ? return; ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? connectionManager.newTimeout(new TimerTask() { ? ? ? ? ? ? ? ? ? ? @Override ? ? ? ? ? ? ? ? ? ? public void run(Timeout timeout) throws Exception { ? ? ? ? ? ? ? ? ? ? ? ? subscribeFuture.cancel(false); ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? }, config.getTimeout(), TimeUnit.MILLISECONDS); ? ? ? ? ? ? } ? ? ? ? }); ? ? }); }
PubSubConnectionEntry#tryAcquire方法, subscriptionsPerConnection代表了每個連接的最大訂閱數(shù)。當(dāng)tryAcqcurie的時候會減少這個數(shù)量:
?public int tryAcquire() { ????while (true) { ????????int value = subscribedChannelsAmount.get(); ????????if (value == 0) { ????????????return -1; ????????} ????????if (subscribedChannelsAmount.compareAndSet(value, value - 1)) { ????????????return value - 1; ????????} ????} }
梳理上述邏輯:
1、還是進(jìn)行重復(fù)判斷, 根據(jù)channelName從name2PubSubConnection中獲取,看是否存在已經(jīng)訂閱:PubSubConnectionEntry; 如果存在直接把新的listener加入到PubSubConnectionEntry。
2、從隊列freePubSubConnections中取公用的PubSubConnectionEntry, 如果沒有就進(jìn)入connect()方法
2.1 會根據(jù)subscriptionsPerConnection創(chuàng)建PubSubConnectionEntry, 然后調(diào)用其tryAcquire()方法 - 每調(diào)用一次就會減1
2.2 將新的PubSubConnectionEntry放入全局的name2PubSubConnection, 方便后續(xù)重復(fù)使用;
2.3 同時也將PubSubConnectionEntry放入隊列freePubSubConnections中。- remainFreeAmount > 0
2.4 后面就是進(jìn)行底層的subscribe和addListener
3、如果已經(jīng)存在PubSubConnectionEntry,則利用已有的PubSubConnectionEntry進(jìn)行tryAcquire;
4、如果remainFreeAmount < 0 會拋出IllegalStateException異常;如果remainFreeAmount=0,則會將其從隊列中移除, 那么后續(xù)請求會重新獲取一個可用的連接
5、最后也是進(jìn)行底層的subscribe和addListener;
三 總結(jié)
根因: 從上面代碼分析, 導(dǎo)致問題的根因是因為PublishSubscribeService 會使用公共隊列中的freePubSubConnections, 如果同一個key一次性請求超過subscriptionsPerConnection它的默認(rèn)值5時,remainFreeAmount就可能出現(xiàn)-1的情況, 那么就會導(dǎo)致commandExecutor.syncSubscription(future)中等待超時,也就拋出如上異常Subscribe timeout: (7500ms). Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters.
解決方法: 在初始化Redisson可以可指定這個配置項的值。
相關(guān)參數(shù)的解釋以及默認(rèn)值請參考官網(wǎng):https://github.com/redisson/redisson/wiki/2.-Configuration#23-common-settings
原文鏈接:https://www.cnblogs.com/yuanfy008/p/15799743.html
相關(guān)推薦
- 2022-08-04 Python中reduce函數(shù)詳解_python
- 2023-07-29 iview的表格實現(xiàn)單元格行編輯功能
- 2022-10-17 C++模擬實現(xiàn)vector的示例代碼_C 語言
- 2022-12-26 C語言逆向分析語法超詳細(xì)分析_C 語言
- 2022-04-16 python基礎(chǔ)之定義類和對象詳解_python
- 2022-05-20 python繪制餅圖的方法詳解_python
- 2023-12-21 JDBC中ResultSet的使用
- 2022-11-13 kvm?透傳顯卡至win10虛擬機(jī)的方法_Kvm
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細(xì)win安裝深度學(xué)習(xí)環(huán)境2025年最新版(
- Linux 中運(yùn)行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎(chǔ)操作-- 運(yùn)算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認(rèn)證信息的處理
- Spring Security之認(rèn)證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權(quán)
- redisson分布式鎖中waittime的設(shè)
- maven:解決release錯誤:Artif
- restTemplate使用總結(jié)
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結(jié)構(gòu)-簡單動態(tài)字符串(SD
- arthas操作spring被代理目標(biāo)對象命令
- Spring中的單例模式應(yīng)用詳解
- 聊聊消息隊列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠(yuǎn)程分支