網(wǎng)站首頁 編程語言 正文
1、注冊
@Bean
@ConditionalOnMissingBean
@ConditionalOnProperty(value = "spring.cloud.nacos.discovery.watch.enabled",
matchIfMissing = true)
public NacosWatch nacosWatch(NacosServiceManager nacosServiceManager,
NacosDiscoveryProperties nacosDiscoveryProperties) {
return new NacosWatch(nacosServiceManager, nacosDiscoveryProperties);
}
2、NacosWatch
構(gòu)造器
public NacosWatch(NacosServiceManager nacosServiceManager,
NacosDiscoveryProperties properties) {
this.nacosServiceManager = nacosServiceManager;
this.properties = properties;
this.taskScheduler = getTaskScheduler();
}
構(gòu)造器中創(chuàng)建了一個線程池
private final ThreadPoolTaskScheduler taskScheduler;
private static ThreadPoolTaskScheduler getTaskScheduler() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setBeanName("Nacos-Watch-Task-Scheduler");
taskScheduler.initialize();
return taskScheduler;
}
Scheduler的名稱是:Nacos-Watch-Task-Scheduler
3、啟動
NacosWatch的繼承關(guān)系
public class NacosWatch
implements ApplicationEventPublisherAware, SmartLifecycle, DisposableBean
NacosWatch實現(xiàn)了SmartLifecycle接口,實現(xiàn)了spring容器的生命周期的方法。
start方法
@Override
public void start() {
//private final AtomicBoolean running = new AtomicBoolean(false); 標識符
//這里用了cas方法,只會有一個線程獲取到執(zhí)行代碼的權(quán)限
if (this.running.compareAndSet(false, true)) {
//生成一個事件監(jiān)聽,這里監(jiān)聽的是NamingEvent
//private Map<String, EventListener> listenerMap = new ConcurrentHashMap<>(16); 監(jiān)聽器緩存
//key的格式是 spring.application.name:group
EventListener eventListener = listenerMap.computeIfAbsent(buildKey(),
event -> new EventListener() {
@Override
public void onEvent(Event event) {
//監(jiān)聽器回調(diào) 判斷是否NamingEvent 事件
if (event instanceof NamingEvent) {
//從事件中獲取服務(wù)列表
List<Instance> instances = ((NamingEvent) event)
.getInstances();
//獲取當前服務(wù)
Optional<Instance> instanceOptional = selectCurrentInstance(
instances);
//如果存在
instanceOptional.ifPresent(currentInstance -> {
//是否需要重設(shè)matadata
resetIfNeeded(currentInstance);
});
}
}
});
//獲取NamingService
NamingService namingService = nacosServiceManager
.getNamingService(properties.getNacosProperties());
try {
//訂閱
namingService.subscribe(properties.getService(), properties.getGroup(),
Arrays.asList(properties.getClusterName()), eventListener);
}
catch (Exception e) {
log.error("namingService subscribe failed, properties:{}", properties, e);
}
//啟動定時器,默認30秒執(zhí)行一次
this.watchFuture = this.taskScheduler.scheduleWithFixedDelay(
this::nacosServicesWatch, this.properties.getWatchDelay());
}
}
重設(shè)Metadata
private void resetIfNeeded(Instance instance) {
if (!properties.getMetadata().equals(instance.getMetadata())) {
properties.setMetadata(instance.getMetadata());
}
}
發(fā)布HeartbeatEvent
public void nacosServicesWatch() {
//由上面的定時器定時發(fā)布事件 默認30秒
// nacos doesn't support watch now , publish an event every 30 seconds.
this.publisher.publishEvent(
new HeartbeatEvent(this, nacosWatchIndex.getAndIncrement()));
}
4、NamingService的訂閱
入?yún)⑹巧厦鏄?gòu)建的EventListener,即NamingEvent
//com.alibaba.nacos.client.naming.NacosNamingService#subscribe
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
throws NacosException {
if (null == listener) {
return;
}
//獲取clusterString,將所有cluster按,拼接
String clusterString = StringUtils.join(clusters, ",");
//注冊listener
//private InstancesChangeNotifier changeNotifier; 在init方法中構(gòu)造
changeNotifier.registerListener(groupName, serviceName, clusterString, listener);
//訂閱
//private NamingClientProxy clientProxy;
//NamingClientProxyDelegate 在init方法中構(gòu)造
clientProxy.subscribe(serviceName, groupName, clusterString);
}
4.1、InstancesChangeNotifier注冊listener
//com.alibaba.nacos.client.naming.event.InstancesChangeNotifier#registerListener
public void registerListener(String groupName, String serviceName, String clusters, EventListener listener) {
//這樣的格式 group@@service@@clusters
String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
//從緩存中獲取
//private final Map<String, ConcurrentHashSet<EventListener>> listenerMap = new ConcurrentHashMap<String, ConcurrentHashSet<EventListener>>();
ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
//雙重檢查鎖,將listener放入緩存
if (eventListeners == null) {
synchronized (lock) {
eventListeners = listenerMap.get(key);
if (eventListeners == null) {
eventListeners = new ConcurrentHashSet<EventListener>();
listenerMap.put(key, eventListeners);
}
}
}
eventListeners.add(listener);
}
4.2、NamingClientProxyDelegate 的訂閱
//com.alibaba.nacos.client.naming.remote.NamingClientProxyDelegate#subscribe
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
//group@@service
String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);
//group@@service@@clusters
String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);
//服務(wù)信息修改
serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
//判斷ServiceInfo中的serviceInfoMap有沒有
ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
//沒有,grpc的訂閱ServiceInfo,并返回
if (null == result) {
result = grpcClientProxy.subscribe(serviceName, groupName, clusters);
}
//處理服務(wù)
serviceInfoHolder.processServiceInfo(result);
return result;
}
4.2.1、服務(wù)信息修改
//com.alibaba.nacos.client.naming.core.ServiceInfoUpdateService#scheduleUpdateIfAbsent
public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {
//group@@service@@clusters
String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
//緩存,如果有,代表已經(jīng)添加了task,不用重復(fù)添加
//private final Map<String, ScheduledFuture<?>> futureMap = new HashMap<String, ScheduledFuture<?>>();
if (futureMap.get(serviceKey) != null) {
return;
}
//加鎖
synchronized (futureMap) {
if (futureMap.get(serviceKey) != null) {
return;
}
//添加定時任務(wù) UpdateTask
ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters));
//放入緩存
futureMap.put(serviceKey, future);
}
}
添加UpdateTask
//com.alibaba.nacos.client.naming.core.ServiceInfoUpdateService#addTask
//private final ScheduledExecutorService executor;
//名稱為:com.alibaba.nacos.client.naming.updater
//延遲一秒
private synchronized ScheduledFuture<?> addTask(UpdateTask task) {
return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}
public ServiceInfoUpdateService(Properties properties, ServiceInfoHolder serviceInfoHolder,
NamingClientProxy namingClientProxy, InstancesChangeNotifier changeNotifier) {
this.executor = new ScheduledThreadPoolExecutor(initPollingThreadCount(properties),
new NameThreadFactory("com.alibaba.nacos.client.naming.updater"));
this.serviceInfoHolder = serviceInfoHolder;
this.namingClientProxy = namingClientProxy;
this.changeNotifier = changeNotifier;
}
4.2.2、grpc的 訂閱
//com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy#subscribe
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
//redo中放入ServiceInfo redo標志為false
redoService.cacheSubscriberForRedo(serviceName, groupName, clusters);
//返回訂閱結(jié)果
return doSubscribe(serviceName, groupName, clusters);
}
//com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy#doSubscribe
public ServiceInfo doSubscribe(String serviceName, String groupName, String clusters) throws NacosException {
//構(gòu)建SubscribeServiceRequest
SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, groupName, serviceName, clusters,
true);
//獲取請求結(jié)果
SubscribeServiceResponse response = requestToServer(request, SubscribeServiceResponse.class);
//將上面的redo的標志設(shè)置為true
redoService.subscriberRegistered(serviceName, groupName, clusters);
//返回訂閱結(jié)果
return response.getServiceInfo();
}
4.2.3、處理服務(wù)
//com.alibaba.nacos.client.naming.cache.ServiceInfoHolder#processServiceInfo
public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
String serviceKey = serviceInfo.getKey();
if (serviceKey == null) {
return null;
}
//獲取當前保存的服務(wù)信息
ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
//訂閱獲得的serviceInfo為空,或者獲取到的健康的實例數(shù)量是空,直接返回
if (isEmptyOrErrorPush(serviceInfo)) {
//empty or error push, just ignore
return oldService;
}
//將新的服務(wù)信息覆蓋舊的服務(wù)信息
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
//是否服務(wù)有變更
boolean changed = isChangedServiceInfo(oldService, serviceInfo);
//如果當前保存的JsonFromServer為空,直接保存為新的實例的JsonFromServer
if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
}
//監(jiān)控信息
MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
//服務(wù)有變更 發(fā)布InstancesChangeEvent 事件,并且刷新磁盤緩存
if (changed) {
NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> "
+ JacksonUtils.toJson(serviceInfo.getHosts()));
NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),
serviceInfo.getClusters(), serviceInfo.getHosts()));
DiskCache.write(serviceInfo, cacheDir);
}
return serviceInfo;
}
判斷服務(wù)信息是否有變更
private boolean isChangedServiceInfo(ServiceInfo oldService, ServiceInfo newService) {
//當前服務(wù)信息為空,直接返回true,因為上面方法中有判斷新獲取的服務(wù)信息是否為空
if (null == oldService) {
NAMING_LOGGER.info("init new ips(" + newService.ipCount() + ") service: " + newService.getKey() + " -> "
+ JacksonUtils.toJson(newService.getHosts()));
return true;
}
if (oldService.getLastRefTime() > newService.getLastRefTime()) {
NAMING_LOGGER
.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: " + newService
.getLastRefTime());
}
boolean changed = false;
//將當前的服務(wù)信息組裝成map
Map<String, Instance> oldHostMap = new HashMap<String, Instance>(oldService.getHosts().size());
for (Instance host : oldService.getHosts()) {
oldHostMap.put(host.toInetAddr(), host);
}
//將新獲得的服務(wù)信息組裝成map
Map<String, Instance> newHostMap = new HashMap<String, Instance>(newService.getHosts().size());
for (Instance host : newService.getHosts()) {
newHostMap.put(host.toInetAddr(), host);
}
//聲明 修改的實例,新的服務(wù)實例,移除的服務(wù)實例
Set<Instance> modHosts = new HashSet<Instance>();
Set<Instance> newHosts = new HashSet<Instance>();
Set<Instance> remvHosts = new HashSet<Instance>();
//將新獲得的服務(wù)實例信息做成一個entry list
List<Map.Entry<String, Instance>> newServiceHosts = new ArrayList<Map.Entry<String, Instance>>(
newHostMap.entrySet());
//遍歷
for (Map.Entry<String, Instance> entry : newServiceHosts) {
Instance host = entry.getValue();
String key = entry.getKey();
//Instance 的toString方法是重寫過的,如果當前服務(wù)實例和新獲得的服務(wù)實例不一樣,
//將該服務(wù)實例信息放入 modHosts
if (oldHostMap.containsKey(key) && !StringUtils.equals(host.toString(), oldHostMap.get(key).toString())) {
modHosts.add(host);
continue;
}
//如果當前保存的服務(wù)實例信息中沒有新獲得的服務(wù)實例,將新獲得的服務(wù)實例放入 newHosts
if (!oldHostMap.containsKey(key)) {
newHosts.add(host);
}
}
//遍歷當前保存的服務(wù)實例信息map
for (Map.Entry<String, Instance> entry : oldHostMap.entrySet()) {
Instance host = entry.getValue();
String key = entry.getKey();
//如果新的服務(wù)實例map中沒有當前保存的服務(wù)實例信息map中的實例,將其放入 remvHosts
if (newHostMap.containsKey(key)) {
continue;
}
if (!newHostMap.containsKey(key)) {
remvHosts.add(host);
}
}
//如果 newHosts 中有元素,代表服務(wù)實例發(fā)生了變更
if (newHosts.size() > 0) {
changed = true;
NAMING_LOGGER
.info("new ips(" + newHosts.size() + ") service: " + newService.getKey() + " -> " + JacksonUtils
.toJson(newHosts));
}
//如果 remvHosts 中有元素,代表服務(wù)實例發(fā)生了變更
if (remvHosts.size() > 0) {
changed = true;
NAMING_LOGGER.info("removed ips(" + remvHosts.size() + ") service: " + newService.getKey() + " -> "
+ JacksonUtils.toJson(remvHosts));
}
//如果 modHosts 中有元素,代表服務(wù)實例發(fā)生了變更
if (modHosts.size() > 0) {
changed = true;
NAMING_LOGGER.info("modified ips(" + modHosts.size() + ") service: " + newService.getKey() + " -> "
+ JacksonUtils.toJson(modHosts));
}
return changed;
}
5、UpdateTask
上面 4.2.1 講到添加了一個UpdateTask 到 ScheduledFuture,然后延遲一秒鐘。下面看看這個task做了什么。
UpdateTask 是ServiceInfoUpdateService的內(nèi)部類,實現(xiàn)了Runnable接口,是一個線程。
public class UpdateTask implements Runnable {
long lastRefTime = Long.MAX_VALUE;
private final String serviceName;
private final String groupName;
private final String clusters;
private final String groupedServiceName;
private final String serviceKey;
//1:連不上服務(wù)端,2:獲取到的服務(wù)實例是空
private int failCount = 0;
//構(gòu)造器,初始化參數(shù)
public UpdateTask(String serviceName, String groupName, String clusters) {
this.serviceName = serviceName;
this.groupName = groupName;
this.clusters = clusters;
this.groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
this.serviceKey = ServiceInfo.getKey(groupedServiceName, clusters);
}
@Override
public void run() {
//保存一個延遲時間
long delayTime = DEFAULT_DELAY;
try {
//當前沒有訂閱 并且 沒有執(zhí)行過updateTask
if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(serviceKey)) {
NAMING_LOGGER
.info("update task is stopped, service:" + groupedServiceName + ", clusters:" + clusters);
return;
}
//獲取當前保存的服務(wù)實例信息
ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
//如果沒有服務(wù)實例信息
if (serviceObj == null) {
//從服務(wù)端查詢實例信息
serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
//處理服務(wù)信息
serviceInfoHolder.processServiceInfo(serviceObj);
//保存lastRefTime
lastRefTime = serviceObj.getLastRefTime();
return;
}
//如果當前保存的實例信息的 LastRefTime <= UpdateTask 緩存的 lastRefTime
//說明應(yīng)該從新拉取
if (serviceObj.getLastRefTime() <= lastRefTime) {
serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
serviceInfoHolder.processServiceInfo(serviceObj);
}
//保存最新的 lastRefTime
lastRefTime = serviceObj.getLastRefTime();
//如果當前獲取到的 增加失敗次數(shù) 最大6次
if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
incFailCount();
return;
}
//拉取成功,重設(shè) delayTime delayTime * 6,重設(shè)失敗次數(shù)
// TODO multiple time can be configured.
delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;
resetFailCount();
} catch (Throwable e) {
incFailCount();
NAMING_LOGGER.warn("[NA] failed to update serviceName: " + groupedServiceName, e);
} finally {
//重設(shè)延遲時間,delayTime 左移 失敗次數(shù)位 與 60秒的較小值
//如果拉取成功,delayTime 是 6s,failCount 是0
//如果拉取失敗,delayTime 是 1s,failCount 是失敗次數(shù),相當于做了一個衰減重試
//當失敗次數(shù)達到6次的時候,左移結(jié)果是64s,這時會選擇60秒作為重試的延遲時間
executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
}
}
private void incFailCount() {
int limit = 6;
if (failCount == limit) {
return;
}
failCount++;
}
private void resetFailCount() {
failCount = 0;
}
}
//com.alibaba.nacos.client.naming.remote.NamingClientProxyDelegate#queryInstancesOfService
public ServiceInfo queryInstancesOfService(String serviceName, String groupName, String clusters, int udpPort,
boolean healthyOnly) throws NacosException {
return grpcClientProxy.queryInstancesOfService(serviceName, groupName, clusters, udpPort, healthyOnly);
}
//com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy#queryInstancesOfService
public ServiceInfo queryInstancesOfService(String serviceName, String groupName, String clusters, int udpPort,
boolean healthyOnly) throws NacosException {
ServiceQueryRequest request = new ServiceQueryRequest(namespaceId, serviceName, groupName);
request.setCluster(clusters);
request.setHealthyOnly(healthyOnly);
request.setUdpPort(udpPort);
//請求服務(wù)端獲取實例信息
QueryServiceResponse response = requestToServer(request, QueryServiceResponse.class);
return response.getServiceInfo();
}
5 和 4.2 中 都有serviceInfoHolder.processServiceInfo(serviceObj),感覺只是為了防漏。
6、InstancesChangeEvent的監(jiān)聽
//com.alibaba.nacos.client.naming.event.InstancesChangeNotifier#onEvent
public void onEvent(InstancesChangeEvent event) {
String key = ServiceInfo
.getKey(NamingUtils.getGroupedName(event.getServiceName(), event.getGroupName()), event.getClusters());
ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
if (CollectionUtils.isEmpty(eventListeners)) {
return;
}
for (final EventListener listener : eventListeners) {
final com.alibaba.nacos.api.naming.listener.Event namingEvent = transferToNamingEvent(event);
if (listener instanceof AbstractEventListener && ((AbstractEventListener) listener).getExecutor() != null) {
((AbstractEventListener) listener).getExecutor().execute(() -> listener.onEvent(namingEvent));
} else {
listener.onEvent(namingEvent);
}
}
}
private com.alibaba.nacos.api.naming.listener.Event transferToNamingEvent(
InstancesChangeEvent instancesChangeEvent) {
return new NamingEvent(instancesChangeEvent.getServiceName(), instancesChangeEvent.getGroupName(),
instancesChangeEvent.getClusters(), instancesChangeEvent.getHosts());
}
將InstancesChangeEvent 事件包裝成 NamingEvent,NacosWatch中start方法中包裝的NamingEvent的監(jiān)聽就會收到事件。從而更改NacosWatch#properties中Metadata的值。
7、HeartbeatEvent
該事件會在gateway中監(jiān)聽。
org.springframework.cloud.gateway.route.RouteRefreshListener#onApplicationEvent
cloud.dubbo中也會監(jiān)聽。
com.alibaba.cloud.dubbo.autoconfigure.DubboServiceDiscoveryAutoConfiguration#onHeartbeatEvent
原文鏈接:https://blog.csdn.net/xuwenjingrenca/article/details/125116719
相關(guān)推薦
- 2022-07-13 python版jpeg合成pdf兩種方法
- 2022-10-20 C++?float、double判斷是否等于0問題_C 語言
- 2022-07-04 PyTorch深度學(xué)習(xí)LSTM從input輸入到Linear輸出_python
- 2022-02-20 Linux下安裝jdk包含卸載OpenJDK介紹_Linux
- 2022-10-06 Android開發(fā)Jetpack組件Room使用講解_Android
- 2022-09-06 python?numpy中array與pandas的DataFrame轉(zhuǎn)換方式_python
- 2023-06-20 React?DOM-diff?節(jié)點源碼解析_React
- 2022-11-13 使用git?checkout到歷史某個版本_相關(guān)技巧
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細win安裝深度學(xué)習(xí)環(huán)境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎(chǔ)操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權(quán)
- redisson分布式鎖中waittime的設(shè)
- maven:解決release錯誤:Artif
- restTemplate使用總結(jié)
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結(jié)構(gòu)-簡單動態(tài)字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應(yīng)用詳解
- 聊聊消息隊列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支