日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學(xué)無先后,達者為師

網(wǎng)站首頁 編程語言 正文

Spring Cloud Nacos NacosWatch

作者:喜歡小蘋果的碼農(nóng) 更新時間: 2022-06-08 編程語言

1、注冊

@Bean
@ConditionalOnMissingBean
@ConditionalOnProperty(value = "spring.cloud.nacos.discovery.watch.enabled",
      matchIfMissing = true)
public NacosWatch nacosWatch(NacosServiceManager nacosServiceManager,
      NacosDiscoveryProperties nacosDiscoveryProperties) {
   return new NacosWatch(nacosServiceManager, nacosDiscoveryProperties);
}

2、NacosWatch

構(gòu)造器

public NacosWatch(NacosServiceManager nacosServiceManager,
      NacosDiscoveryProperties properties) {
   this.nacosServiceManager = nacosServiceManager;
   this.properties = properties;
   this.taskScheduler = getTaskScheduler();
}

構(gòu)造器中創(chuàng)建了一個線程池

private final ThreadPoolTaskScheduler taskScheduler;
private static ThreadPoolTaskScheduler getTaskScheduler() {
    ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
    taskScheduler.setBeanName("Nacos-Watch-Task-Scheduler");
    taskScheduler.initialize();
    return taskScheduler;
}

Scheduler的名稱是:Nacos-Watch-Task-Scheduler

3、啟動

NacosWatch的繼承關(guān)系

public class NacosWatch
      implements ApplicationEventPublisherAware, SmartLifecycle, DisposableBean

NacosWatch實現(xiàn)了SmartLifecycle接口,實現(xiàn)了spring容器的生命周期的方法。

start方法

@Override
public void start() {
    //private final AtomicBoolean running = new AtomicBoolean(false); 標識符
    //這里用了cas方法,只會有一個線程獲取到執(zhí)行代碼的權(quán)限
   if (this.running.compareAndSet(false, true)) {
       //生成一個事件監(jiān)聽,這里監(jiān)聽的是NamingEvent
       //private Map<String, EventListener> listenerMap = new ConcurrentHashMap<>(16);	監(jiān)聽器緩存
       //key的格式是 spring.application.name:group
      EventListener eventListener = listenerMap.computeIfAbsent(buildKey(),
            event -> new EventListener() {
               @Override
               public void onEvent(Event event) {
                   //監(jiān)聽器回調(diào)  判斷是否NamingEvent 事件
                  if (event instanceof NamingEvent) {
                      //從事件中獲取服務(wù)列表
                     List<Instance> instances = ((NamingEvent) event)
                           .getInstances();
                      //獲取當前服務(wù)
                     Optional<Instance> instanceOptional = selectCurrentInstance(
                           instances);
                      //如果存在
                     instanceOptional.ifPresent(currentInstance -> {
                        //是否需要重設(shè)matadata
                         resetIfNeeded(currentInstance);
                     });
                  }
               }
            });
		//獲取NamingService
      NamingService namingService = nacosServiceManager
            .getNamingService(properties.getNacosProperties());
      try {
          //訂閱
         namingService.subscribe(properties.getService(), properties.getGroup(),
               Arrays.asList(properties.getClusterName()), eventListener);
      }
      catch (Exception e) {
         log.error("namingService subscribe failed, properties:{}", properties, e);
      }
		//啟動定時器,默認30秒執(zhí)行一次
      this.watchFuture = this.taskScheduler.scheduleWithFixedDelay(
            this::nacosServicesWatch, this.properties.getWatchDelay());
   }
}

重設(shè)Metadata

private void resetIfNeeded(Instance instance) {
   if (!properties.getMetadata().equals(instance.getMetadata())) {
      properties.setMetadata(instance.getMetadata());
   }
}

發(fā)布HeartbeatEvent

public void nacosServicesWatch() {
	//由上面的定時器定時發(fā)布事件	默認30秒
   // nacos doesn't support watch now , publish an event every 30 seconds.
   this.publisher.publishEvent(
         new HeartbeatEvent(this, nacosWatchIndex.getAndIncrement()));

}

4、NamingService的訂閱

入?yún)⑹巧厦鏄?gòu)建的EventListener,即NamingEvent

//com.alibaba.nacos.client.naming.NacosNamingService#subscribe
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
        throws NacosException {
    if (null == listener) {
        return;
    }
    //獲取clusterString,將所有cluster按,拼接
    String clusterString = StringUtils.join(clusters, ",");
    //注冊listener
    //private InstancesChangeNotifier changeNotifier;	在init方法中構(gòu)造
    changeNotifier.registerListener(groupName, serviceName, clusterString, listener);
    //訂閱
    //private NamingClientProxy clientProxy;
    //NamingClientProxyDelegate	在init方法中構(gòu)造
    clientProxy.subscribe(serviceName, groupName, clusterString);
}

4.1、InstancesChangeNotifier注冊listener

//com.alibaba.nacos.client.naming.event.InstancesChangeNotifier#registerListener
public void registerListener(String groupName, String serviceName, String clusters, EventListener listener) {
    //這樣的格式 group@@service@@clusters
    String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
    //從緩存中獲取
    //private final Map<String, ConcurrentHashSet<EventListener>> listenerMap = new ConcurrentHashMap<String, ConcurrentHashSet<EventListener>>();
    ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
    //雙重檢查鎖,將listener放入緩存
    if (eventListeners == null) {
        synchronized (lock) {
            eventListeners = listenerMap.get(key);
            if (eventListeners == null) {
                eventListeners = new ConcurrentHashSet<EventListener>();
                listenerMap.put(key, eventListeners);
            }
        }
    }
    eventListeners.add(listener);
}

4.2、NamingClientProxyDelegate 的訂閱

//com.alibaba.nacos.client.naming.remote.NamingClientProxyDelegate#subscribe
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
    //group@@service
    String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);
    //group@@service@@clusters
    String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);
    //服務(wù)信息修改
    serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
    //判斷ServiceInfo中的serviceInfoMap有沒有
    ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
    //沒有,grpc的訂閱ServiceInfo,并返回
    if (null == result) {
        result = grpcClientProxy.subscribe(serviceName, groupName, clusters);
    }
    //處理服務(wù)
    serviceInfoHolder.processServiceInfo(result);
    return result;
}

4.2.1、服務(wù)信息修改

//com.alibaba.nacos.client.naming.core.ServiceInfoUpdateService#scheduleUpdateIfAbsent
public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {
    //group@@service@@clusters
    String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
    //緩存,如果有,代表已經(jīng)添加了task,不用重復(fù)添加
    //private final Map<String, ScheduledFuture<?>> futureMap = new HashMap<String, ScheduledFuture<?>>();
    if (futureMap.get(serviceKey) != null) {
        return;
    }
    //加鎖
    synchronized (futureMap) {
        if (futureMap.get(serviceKey) != null) {
            return;
        }
        //添加定時任務(wù)  UpdateTask
        ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters));
        //放入緩存
        futureMap.put(serviceKey, future);
    }
}

添加UpdateTask

//com.alibaba.nacos.client.naming.core.ServiceInfoUpdateService#addTask
//private final ScheduledExecutorService executor;
//名稱為:com.alibaba.nacos.client.naming.updater
//延遲一秒
private synchronized ScheduledFuture<?> addTask(UpdateTask task) {
    return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}
public ServiceInfoUpdateService(Properties properties, ServiceInfoHolder serviceInfoHolder,
            NamingClientProxy namingClientProxy, InstancesChangeNotifier changeNotifier) {
        this.executor = new ScheduledThreadPoolExecutor(initPollingThreadCount(properties),
                new NameThreadFactory("com.alibaba.nacos.client.naming.updater"));
    this.serviceInfoHolder = serviceInfoHolder;
    this.namingClientProxy = namingClientProxy;
    this.changeNotifier = changeNotifier;
}

4.2.2、grpc的 訂閱

//com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy#subscribe
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
    //redo中放入ServiceInfo	redo標志為false
    redoService.cacheSubscriberForRedo(serviceName, groupName, clusters);
    //返回訂閱結(jié)果
    return doSubscribe(serviceName, groupName, clusters);
}
//com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy#doSubscribe
public ServiceInfo doSubscribe(String serviceName, String groupName, String clusters) throws NacosException {
    //構(gòu)建SubscribeServiceRequest
    SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, groupName, serviceName, clusters,
            true);
    //獲取請求結(jié)果
    SubscribeServiceResponse response = requestToServer(request, SubscribeServiceResponse.class);
    //將上面的redo的標志設(shè)置為true
    redoService.subscriberRegistered(serviceName, groupName, clusters);
    //返回訂閱結(jié)果
    return response.getServiceInfo();
}

4.2.3、處理服務(wù)

//com.alibaba.nacos.client.naming.cache.ServiceInfoHolder#processServiceInfo
public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
    String serviceKey = serviceInfo.getKey();
    if (serviceKey == null) {
        return null;
    }
    //獲取當前保存的服務(wù)信息
    ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
    //訂閱獲得的serviceInfo為空,或者獲取到的健康的實例數(shù)量是空,直接返回
    if (isEmptyOrErrorPush(serviceInfo)) {
        //empty or error push, just ignore
        return oldService;
    }
    //將新的服務(wù)信息覆蓋舊的服務(wù)信息
    serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
    //是否服務(wù)有變更
    boolean changed = isChangedServiceInfo(oldService, serviceInfo);
    //如果當前保存的JsonFromServer為空,直接保存為新的實例的JsonFromServer
    if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
        serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
    }
    //監(jiān)控信息
    MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
    //服務(wù)有變更  發(fā)布InstancesChangeEvent 事件,并且刷新磁盤緩存
    if (changed) {
        NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> "
                + JacksonUtils.toJson(serviceInfo.getHosts()));
        NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),
                serviceInfo.getClusters(), serviceInfo.getHosts()));
        DiskCache.write(serviceInfo, cacheDir);
    }
    return serviceInfo;
}

判斷服務(wù)信息是否有變更

private boolean isChangedServiceInfo(ServiceInfo oldService, ServiceInfo newService) {
    //當前服務(wù)信息為空,直接返回true,因為上面方法中有判斷新獲取的服務(wù)信息是否為空
    if (null == oldService) {
        NAMING_LOGGER.info("init new ips(" + newService.ipCount() + ") service: " + newService.getKey() + " -> "
                + JacksonUtils.toJson(newService.getHosts()));
        return true;
    }
    if (oldService.getLastRefTime() > newService.getLastRefTime()) {
        NAMING_LOGGER
                .warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: " + newService
                        .getLastRefTime());
    }
    boolean changed = false;
    //將當前的服務(wù)信息組裝成map
    Map<String, Instance> oldHostMap = new HashMap<String, Instance>(oldService.getHosts().size());
    for (Instance host : oldService.getHosts()) {
        oldHostMap.put(host.toInetAddr(), host);
    }
    //將新獲得的服務(wù)信息組裝成map
    Map<String, Instance> newHostMap = new HashMap<String, Instance>(newService.getHosts().size());
    for (Instance host : newService.getHosts()) {
        newHostMap.put(host.toInetAddr(), host);
    }
    //聲明 修改的實例,新的服務(wù)實例,移除的服務(wù)實例
    Set<Instance> modHosts = new HashSet<Instance>();
    Set<Instance> newHosts = new HashSet<Instance>();
    Set<Instance> remvHosts = new HashSet<Instance>();
    //將新獲得的服務(wù)實例信息做成一個entry list
    List<Map.Entry<String, Instance>> newServiceHosts = new ArrayList<Map.Entry<String, Instance>>(
            newHostMap.entrySet());
    //遍歷
    for (Map.Entry<String, Instance> entry : newServiceHosts) {
        Instance host = entry.getValue();
        String key = entry.getKey();
        //Instance 的toString方法是重寫過的,如果當前服務(wù)實例和新獲得的服務(wù)實例不一樣,
        //將該服務(wù)實例信息放入 modHosts
        if (oldHostMap.containsKey(key) && !StringUtils.equals(host.toString(), oldHostMap.get(key).toString())) {
            modHosts.add(host);
            continue;
        }
        //如果當前保存的服務(wù)實例信息中沒有新獲得的服務(wù)實例,將新獲得的服務(wù)實例放入 newHosts
        if (!oldHostMap.containsKey(key)) {
            newHosts.add(host);
        }
    }
    //遍歷當前保存的服務(wù)實例信息map
    for (Map.Entry<String, Instance> entry : oldHostMap.entrySet()) {
        Instance host = entry.getValue();
        String key = entry.getKey();
        //如果新的服務(wù)實例map中沒有當前保存的服務(wù)實例信息map中的實例,將其放入 remvHosts
        if (newHostMap.containsKey(key)) {
            continue;
        }
        
        if (!newHostMap.containsKey(key)) {
            remvHosts.add(host);
        }
        
    }
    //如果 newHosts 中有元素,代表服務(wù)實例發(fā)生了變更
    if (newHosts.size() > 0) {
        changed = true;
        NAMING_LOGGER
                .info("new ips(" + newHosts.size() + ") service: " + newService.getKey() + " -> " + JacksonUtils
                        .toJson(newHosts));
    }
    //如果 remvHosts 中有元素,代表服務(wù)實例發(fā)生了變更
    if (remvHosts.size() > 0) {
        changed = true;
        NAMING_LOGGER.info("removed ips(" + remvHosts.size() + ") service: " + newService.getKey() + " -> "
                + JacksonUtils.toJson(remvHosts));
    }
    //如果 modHosts 中有元素,代表服務(wù)實例發(fā)生了變更
    if (modHosts.size() > 0) {
        changed = true;
        NAMING_LOGGER.info("modified ips(" + modHosts.size() + ") service: " + newService.getKey() + " -> "
                + JacksonUtils.toJson(modHosts));
    }
    return changed;
}

5、UpdateTask

上面 4.2.1 講到添加了一個UpdateTask 到 ScheduledFuture,然后延遲一秒鐘。下面看看這個task做了什么。

UpdateTask 是ServiceInfoUpdateService的內(nèi)部類,實現(xiàn)了Runnable接口,是一個線程。

public class UpdateTask implements Runnable {
    long lastRefTime = Long.MAX_VALUE;
    private final String serviceName;
    private final String groupName;
    private final String clusters;
    private final String groupedServiceName;
    private final String serviceKey;
    //1:連不上服務(wù)端,2:獲取到的服務(wù)實例是空
    private int failCount = 0;
	//構(gòu)造器,初始化參數(shù)
    public UpdateTask(String serviceName, String groupName, String clusters) {
        this.serviceName = serviceName;
        this.groupName = groupName;
        this.clusters = clusters;
        this.groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
        this.serviceKey = ServiceInfo.getKey(groupedServiceName, clusters);
    }

    @Override
    public void run() {
        //保存一個延遲時間
        long delayTime = DEFAULT_DELAY;

        try {
            //當前沒有訂閱 并且 沒有執(zhí)行過updateTask
            if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(serviceKey)) {
                NAMING_LOGGER
                    .info("update task is stopped, service:" + groupedServiceName + ", clusters:" + clusters);
                return;
            }
			//獲取當前保存的服務(wù)實例信息
            ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
            //如果沒有服務(wù)實例信息
            if (serviceObj == null) {
                //從服務(wù)端查詢實例信息
                serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
                //處理服務(wù)信息
                serviceInfoHolder.processServiceInfo(serviceObj);
                //保存lastRefTime
                lastRefTime = serviceObj.getLastRefTime();
                return;
            }
			//如果當前保存的實例信息的 LastRefTime <= UpdateTask 緩存的 lastRefTime
            //說明應(yīng)該從新拉取
            if (serviceObj.getLastRefTime() <= lastRefTime) {
                serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
                serviceInfoHolder.processServiceInfo(serviceObj);
            }
            //保存最新的 lastRefTime
            lastRefTime = serviceObj.getLastRefTime();
            //如果當前獲取到的  增加失敗次數(shù) 最大6次
            if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
                incFailCount();
                return;
            }
            //拉取成功,重設(shè) delayTime	delayTime * 6,重設(shè)失敗次數(shù)
            // TODO multiple time can be configured.
            delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;
            resetFailCount();
        } catch (Throwable e) {
            incFailCount();
            NAMING_LOGGER.warn("[NA] failed to update serviceName: " + groupedServiceName, e);
        } finally {
            //重設(shè)延遲時間,delayTime 左移 失敗次數(shù)位 與 60秒的較小值
            //如果拉取成功,delayTime 是 6s,failCount 是0
            //如果拉取失敗,delayTime 是 1s,failCount 是失敗次數(shù),相當于做了一個衰減重試
            //當失敗次數(shù)達到6次的時候,左移結(jié)果是64s,這時會選擇60秒作為重試的延遲時間
            executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
        }
    }

    private void incFailCount() {
        int limit = 6;
        if (failCount == limit) {
            return;
        }
        failCount++;
    }

    private void resetFailCount() {
        failCount = 0;
    }
}
//com.alibaba.nacos.client.naming.remote.NamingClientProxyDelegate#queryInstancesOfService
public ServiceInfo queryInstancesOfService(String serviceName, String groupName, String clusters, int udpPort,
        boolean healthyOnly) throws NacosException {
    return grpcClientProxy.queryInstancesOfService(serviceName, groupName, clusters, udpPort, healthyOnly);
}
//com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy#queryInstancesOfService
public ServiceInfo queryInstancesOfService(String serviceName, String groupName, String clusters, int udpPort,
        boolean healthyOnly) throws NacosException {
    ServiceQueryRequest request = new ServiceQueryRequest(namespaceId, serviceName, groupName);
    request.setCluster(clusters);
    request.setHealthyOnly(healthyOnly);
    request.setUdpPort(udpPort);
    //請求服務(wù)端獲取實例信息
    QueryServiceResponse response = requestToServer(request, QueryServiceResponse.class);
    return response.getServiceInfo();
}

5 和 4.2 中 都有serviceInfoHolder.processServiceInfo(serviceObj),感覺只是為了防漏。

6、InstancesChangeEvent的監(jiān)聽

//com.alibaba.nacos.client.naming.event.InstancesChangeNotifier#onEvent
public void onEvent(InstancesChangeEvent event) {
    String key = ServiceInfo
            .getKey(NamingUtils.getGroupedName(event.getServiceName(), event.getGroupName()), event.getClusters());
    ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
    if (CollectionUtils.isEmpty(eventListeners)) {
        return;
    }
    for (final EventListener listener : eventListeners) {
        final com.alibaba.nacos.api.naming.listener.Event namingEvent = transferToNamingEvent(event);
        if (listener instanceof AbstractEventListener && ((AbstractEventListener) listener).getExecutor() != null) {
            ((AbstractEventListener) listener).getExecutor().execute(() -> listener.onEvent(namingEvent));
        } else {
            listener.onEvent(namingEvent);
        }
    }
}
private com.alibaba.nacos.api.naming.listener.Event transferToNamingEvent(
        InstancesChangeEvent instancesChangeEvent) {
    return new NamingEvent(instancesChangeEvent.getServiceName(), instancesChangeEvent.getGroupName(),
            instancesChangeEvent.getClusters(), instancesChangeEvent.getHosts());
}

將InstancesChangeEvent 事件包裝成 NamingEvent,NacosWatch中start方法中包裝的NamingEvent的監(jiān)聽就會收到事件。從而更改NacosWatch#properties中Metadata的值。

7、HeartbeatEvent

該事件會在gateway中監(jiān)聽。

org.springframework.cloud.gateway.route.RouteRefreshListener#onApplicationEvent

cloud.dubbo中也會監(jiān)聽。

com.alibaba.cloud.dubbo.autoconfigure.DubboServiceDiscoveryAutoConfiguration#onHeartbeatEvent

原文鏈接:https://blog.csdn.net/xuwenjingrenca/article/details/125116719

欄目分類
最近更新