日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學(xué)無先后,達(dá)者為師

網(wǎng)站首頁 編程語言 正文

Spring Cloud Alibaba Nacos 客戶端服務(wù)注冊(cè)心跳和健康監(jiān)測(cè)

作者:喜歡小蘋果的碼農(nóng) 更新時(shí)間: 2022-06-08 編程語言

1、注冊(cè)HealthIndicator

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(Endpoint.class)
@ConditionalOnNacosDiscoveryEnabled
public class NacosDiscoveryEndpointAutoConfiguration {

	@Bean
	@ConditionalOnMissingBean
	@ConditionalOnAvailableEndpoint
	public NacosDiscoveryEndpoint nacosDiscoveryEndpoint(
			NacosServiceManager nacosServiceManager,
			NacosDiscoveryProperties nacosDiscoveryProperties) {
		return new NacosDiscoveryEndpoint(nacosServiceManager, nacosDiscoveryProperties);
	}

	@Bean
	@ConditionalOnEnabledHealthIndicator("nacos-discovery")
	public HealthIndicator nacosDiscoveryHealthIndicator(
			NacosServiceManager nacosServiceManager,
			NacosDiscoveryProperties nacosDiscoveryProperties) {
		Properties nacosProperties = nacosDiscoveryProperties.getNacosProperties();
		return new NacosDiscoveryHealthIndicator(
				nacosServiceManager.getNamingService(nacosProperties));
	}

}

nacosServiceManager在NacosServiceAutoConfiguration中生成。

@Configuration(proxyBeanMethods = false)
@ConditionalOnDiscoveryEnabled
@ConditionalOnNacosDiscoveryEnabled
public class NacosServiceAutoConfiguration {

   @Bean
   public NacosServiceManager nacosServiceManager() {
      return new NacosServiceManager();
   }

}

可以看到,注冊(cè)HealthIndicator 時(shí),獲取了NamingService,并構(gòu)造了NacosDiscoveryHealthIndicator。

public NacosDiscoveryHealthIndicator(NamingService namingService) {
   this.namingService = namingService;
}

NacosDiscoveryHealthIndicator中只有一個(gè)方法,檢查健康狀態(tài)。

protected void doHealthCheck(Health.Builder builder) throws Exception {
   // Just return "UP" or "DOWN"
   String status = namingService.getServerStatus();
   // Set the status to Builder
   builder.status(status);
   switch (status) {
   case STATUS_UP:
      builder.up();
      break;
   case STATUS_DOWN:
      builder.down();
      break;
   default:
      builder.unknown();
      break;
   }
}

2、獲取NamingService

//com.alibaba.cloud.nacos.NacosServiceManager#getNamingService
public NamingService getNamingService(Properties properties) {
    if (Objects.isNull(this.namingService)) {
        buildNamingService(properties);
    }
    return namingService;
}

創(chuàng)建NamingService

//com.alibaba.cloud.nacos.NacosServiceManager#buildNamingService
private NamingService buildNamingService(Properties properties) {
   if (Objects.isNull(namingService)) {
      synchronized (NacosServiceManager.class) {
         if (Objects.isNull(namingService)) {
            namingService = createNewNamingService(properties);
         }
      }
   }
   return namingService;
}
//com.alibaba.cloud.nacos.NacosServiceManager#createNewNamingService
private NamingService createNewNamingService(Properties properties) {
   try {
      return createNamingService(properties);
   }
   catch (NacosException e) {
      throw new RuntimeException(e);
   }
}
//NamingFactory#createNamingService(java.util.Properties)
public static NamingService createNamingService(Properties properties) throws NacosException {
    return NamingFactory.createNamingService(properties);
}
public static NamingService createNamingService(Properties properties) throws NacosException {
    try {
        Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");
        Constructor constructor = driverImplClass.getConstructor(Properties.class);
        return (NamingService) constructor.newInstance(properties);
    } catch (Throwable e) {
        throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
    }
}
//com.alibaba.nacos.client.naming.NacosNamingService#NacosNamingService
public NacosNamingService(Properties properties) throws NacosException {
    init(properties);
}

3、初始化NamingService

private void init(Properties properties) throws NacosException {
    ValidatorUtils.checkInitParam(properties);
    this.namespace = InitUtils.initNamespaceForNaming(properties);
    InitUtils.initSerialization();
    InitUtils.initWebRootContext(properties);
    initLogName(properties);
    
    this.changeNotifier = new InstancesChangeNotifier();
    NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
    NotifyCenter.registerSubscriber(changeNotifier);
    //構(gòu)建ServiceInfoHolder,里面有個(gè)定時(shí)任務(wù)刷新磁盤
    this.serviceInfoHolder = new ServiceInfoHolder(namespace, properties);
    //生成NamingClientProxyDelegate
    this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, properties, changeNotifier);
}

3.1、ServiceInfoHolder

ServiceInfoHolder構(gòu)建時(shí)會(huì)初始化磁盤目錄并且生成任務(wù)操作磁盤緩存

public ServiceInfoHolder(String namespace, Properties properties) {
    //初始化磁盤緩存目錄
    initCacheDir(namespace, properties);
    //判斷是否啟動(dòng)時(shí)加載緩存	namingLoadCacheAtStart 配置
    if (isLoadCacheAtStart(properties)) {
        this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(DiskCache.read(this.cacheDir));
    } else {
        this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(16);
    }
    //故障切換任務(wù)
    this.failoverReactor = new FailoverReactor(this, cacheDir);
    //加載 namingPushEmptyProtection 配置的值
    this.pushEmptyProtection = isPushEmptyProtect(properties);
}
public FailoverReactor(ServiceInfoHolder serviceInfoHolder, String cacheDir) {
    this.serviceInfoHolder = serviceInfoHolder;
    this.failoverDir = cacheDir + FAILOVER_DIR;
    // init executorService
    this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setDaemon(true);
            thread.setName("com.alibaba.nacos.naming.failover");
            return thread;
        }
    });
    this.init();
}
public void init() {
    //故障切換 每5秒一次
    executorService.scheduleWithFixedDelay(new SwitchRefresher(), 0L, 5000L, TimeUnit.MILLISECONDS);
    //寫磁盤   30分鐘后開始,每天一次
    executorService.scheduleWithFixedDelay(new DiskFileWriter(), 30, DAY_PERIOD_MINUTES, TimeUnit.MINUTES);
    //備份	每10秒一次
    // backup file on startup if failover directory is empty.
    executorService.schedule(new Runnable() {
        @Override
        public void run() {
            try {
                File cacheDir = new File(failoverDir);
                
                if (!cacheDir.exists() && !cacheDir.mkdirs()) {
                    throw new IllegalStateException("failed to create cache dir: " + failoverDir);
                }
                
                File[] files = cacheDir.listFiles();
                if (files == null || files.length <= 0) {
                    new DiskFileWriter().run();
                }
            } catch (Throwable e) {
                NAMING_LOGGER.error("[NA] failed to backup file on startup.", e);
            }
            
        }
    }, 10000L, TimeUnit.MILLISECONDS);
}

具體的任務(wù)就不貼了

3.2、NamingGrpcClientProxy

public NamingClientProxyDelegate(String namespace, ServiceInfoHolder serviceInfoHolder, Properties properties,
        InstancesChangeNotifier changeNotifier) throws NacosException {
    this.serviceInfoUpdateService = new ServiceInfoUpdateService(properties, serviceInfoHolder, this,
            changeNotifier);
    this.serverListManager = new ServerListManager(properties, namespace);
    this.serviceInfoHolder = serviceInfoHolder;
    this.securityProxy = new SecurityProxy(properties, NamingHttpClientManager.getInstance().getNacosRestTemplate());
    initSecurityProxy();
    //生成NamingHttpClientProxy  非臨時(shí)實(shí)例使用
    this.httpClientProxy = new NamingHttpClientProxy(namespace, securityProxy, serverListManager, properties,
            serviceInfoHolder);
    //生成NamingGrpcClientProxy 臨時(shí)實(shí)例使用
    this.grpcClientProxy = new NamingGrpcClientProxy(namespace, securityProxy, serverListManager, properties,
            serviceInfoHolder);
}

4、心跳的具體邏輯

4.1、NamingGrpcClientProxy 的心跳

public NamingGrpcClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListFactory serverListFactory,
        Properties properties, ServiceInfoHolder serviceInfoHolder) throws NacosException {
    super(securityProxy, properties);
    this.namespaceId = namespaceId;
    this.uuid = UUID.randomUUID().toString();
    this.requestTimeout = Long.parseLong(properties.getProperty(CommonParams.NAMING_REQUEST_TIMEOUT, "-1"));
    Map<String, String> labels = new HashMap<String, String>();
    labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_SDK);
    labels.put(RemoteConstants.LABEL_MODULE, RemoteConstants.LABEL_MODULE_NAMING);
    this.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels);
    //redo 心跳
    this.redoService = new NamingGrpcRedoService(this);
    start(serverListFactory, serviceInfoHolder);
}
private void start(ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException {
    rpcClient.serverListFactory(serverListFactory);
    rpcClient.registerConnectionListener(redoService);
    rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder));
    rpcClient.start();
    NotifyCenter.registerSubscriber(this);
}

創(chuàng)建NamingGrpcRedoService

private static final String REDO_THREAD_NAME = "com.alibaba.nacos.client.naming.grpc.redo";
private static final int REDO_THREAD = 1;
private static final long DEFAULT_REDO_DELAY = 3000L;
public NamingGrpcRedoService(NamingGrpcClientProxy clientProxy) {
    //生成單線程線程池
    this.redoExecutor = new ScheduledThreadPoolExecutor(REDO_THREAD, new NameThreadFactory(REDO_THREAD_NAME));
    //延遲執(zhí)行	傳入clientProxy  默認(rèn)3秒鐘延遲	這里新建了一個(gè)RedoScheduledTask
    this.redoExecutor.scheduleWithFixedDelay(new RedoScheduledTask(clientProxy, this), DEFAULT_REDO_DELAY,
            DEFAULT_REDO_DELAY, TimeUnit.MILLISECONDS);
}

NameThreadFactory

public class NameThreadFactory implements ThreadFactory {
    
    private final AtomicInteger id = new AtomicInteger(0);
    
    private String name;
    
    public NameThreadFactory(String name) {
        if (!name.endsWith(StringUtils.DOT)) {
            name += StringUtils.DOT;
        }
        this.name = name;
    }
    
    @Override
    public Thread newThread(Runnable r) {
        String threadName = name + id.getAndIncrement();
        Thread thread = new Thread(r, threadName);
        thread.setDaemon(true);
        return thread;
    }
}

心跳的定時(shí)任務(wù)

public class RedoScheduledTask extends AbstractExecuteTask {
    
    private final NamingGrpcClientProxy clientProxy;
    
    private final NamingGrpcRedoService redoService;
    
    public RedoScheduledTask(NamingGrpcClientProxy clientProxy, NamingGrpcRedoService redoService) {
        this.clientProxy = clientProxy;
        this.redoService = redoService;
    }
    
    @Override
    public void run() {
        //沒連接上服務(wù)端
        if (!redoService.isConnected()) {
            LogUtils.NAMING_LOGGER.warn("Grpc Connection is disconnect, skip current redo task");
            return;
        }
        try {
            //注冊(cè)
            redoForInstances();
            //訂閱
            redoForSubscribes();
        } catch (Exception e) {
            LogUtils.NAMING_LOGGER.warn("Redo task run with unexpected exception: ", e);
        }
    }
    
    private void redoForInstances() {
        //遍歷
        for (InstanceRedoData each : redoService.findInstanceRedoData()) {
            try {
                redoForInstance(each);
            } catch (NacosException e) {
                LogUtils.NAMING_LOGGER.error("Redo instance operation {} for {}@@{} failed. ", each.getRedoType(),
                        each.getGroupName(), each.getServiceName(), e);
            }
        }
    }
    
    private void redoForInstance(InstanceRedoData redoData) throws NacosException {
        RedoData.RedoType redoType = redoData.getRedoType();
        String serviceName = redoData.getServiceName();
        String groupName = redoData.getGroupName();
        LogUtils.NAMING_LOGGER.info("Redo instance operation {} for {}@@{}", redoType, groupName, serviceName);
        switch (redoType) {
            case REGISTER:
                if (isClientDisabled()) {
                    return;
                }
                //調(diào)用注冊(cè)方法注冊(cè)
                clientProxy.doRegisterService(serviceName, groupName, redoData.get());
                break;
            case UNREGISTER:
                //非 RUNNING狀態(tài)
                if (isClientDisabled()) {
                    return;
                }
                //服務(wù)下線
                clientProxy.doDeregisterService(serviceName, groupName, redoData.get());
                break;
            case REMOVE:
                //移除心跳
                redoService.removeInstanceForRedo(serviceName, groupName);
                break;
            default:
        }
        
    }
    
    private void redoForSubscribes() {
        for (SubscriberRedoData each : redoService.findSubscriberRedoData()) {
            try {
                redoForSubscribe(each);
            } catch (NacosException e) {
                LogUtils.NAMING_LOGGER.error("Redo subscriber operation {} for {}@@{}#{} failed. ", each.getRedoType(),
                        each.getGroupName(), each.getServiceName(), each.get(), e);
            }
        }
    }
    
    private void redoForSubscribe(SubscriberRedoData redoData) throws NacosException {
        RedoData.RedoType redoType = redoData.getRedoType();
        String serviceName = redoData.getServiceName();
        String groupName = redoData.getGroupName();
        String cluster = redoData.get();
        LogUtils.NAMING_LOGGER.info("Redo subscriber operation {} for {}@@{}#{}", redoType, groupName, serviceName, cluster);
        switch (redoData.getRedoType()) {
            case REGISTER:
                if (isClientDisabled()) {
                    return;
                }
                clientProxy.doSubscribe(serviceName, groupName, cluster);
                break;
            case UNREGISTER:
                if (isClientDisabled()) {
                    return;
                }
                clientProxy.doUnsubscribe(serviceName, groupName, cluster);
                break;
            case REMOVE:
                redoService.removeSubscriberForRedo(redoData.getServiceName(), redoData.getGroupName(), redoData.get());
                break;
            default:
        }
    }
    
    private boolean isClientDisabled() {
        return !clientProxy.isEnable();
    }
}
public RedoType getRedoType() {
    if (isRegistered() && !isUnregistering()) {
        return RedoType.NONE;
    } else if (isRegistered() && isUnregistering()) {
        return RedoType.UNREGISTER;
    } else if (!isRegistered() && !isUnregistering()) {
        return RedoType.REGISTER;
    } else {
        return RedoType.REMOVE;
    }
}

redoData什么時(shí)候會(huì)將registered重新設(shè)置為false呢

public class NamingGrpcRedoService implements ConnectionEventListener

NamingGrpcRedoService實(shí)現(xiàn)了ConnectionEventListener接口

public void onDisConnect() {
    connected = false;
    LogUtils.NAMING_LOGGER.warn("Grpc connection disconnect, mark to redo");
    synchronized (registeredInstances) {
        registeredInstances.values().forEach(instanceRedoData -> instanceRedoData.setRegistered(false));
    }
    synchronized (subscribes) {
        subscribes.values().forEach(subscriberRedoData -> subscriberRedoData.setRegistered(false));
    }
    LogUtils.NAMING_LOGGER.warn("mark to redo completed");
}

當(dāng)監(jiān)聽到com.alibaba.nacos.common.remote.client.RpcClient.ConnectionEvent 事件eventType == DISCONNECTED時(shí),會(huì)回調(diào)這個(gè)方法,將所有的redoData的registered設(shè)置為false,從而可以觸發(fā)任務(wù)的重新注冊(cè)和訂閱。

當(dāng)構(gòu)造NamingGrpcClientProxy時(shí)會(huì)調(diào)rpcClient的start方法,構(gòu)建了一個(gè)線程池。想詳細(xì)了解這個(gè)方法可以去翻一下。

//com.alibaba.nacos.common.remote.client.RpcClient#start
public final void start() throws NacosException {
	...
    //這里又構(gòu)建了一個(gè)線程池。
    clientEventExecutor = new ScheduledThreadPoolExecutor(2, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName("com.alibaba.nacos.client.remote.worker");
            t.setDaemon(true);
            return t;
        }
    });
    
    // connection event consumer.
    //從 eventLinkedBlockingQueue 獲取ConnectionEvent事件,并做相應(yīng)操作
    clientEventExecutor.submit(new Runnable() {
        @Override
        public void run() {
            while (!clientEventExecutor.isTerminated() && !clientEventExecutor.isShutdown()) {
                ConnectionEvent take = null;
                try {
                    take = eventLinkedBlockingQueue.take();
                    if (take.isConnected()) {
                        notifyConnected();
                    } else if (take.isDisConnected()) {
                        notifyDisConnected();
                    }
                } catch (Throwable e) {
                    //Do nothing
                }
            }
        }
    });
    //這個(gè)線程判斷是否需要重新連接,重新連接是會(huì)發(fā)布ConnectionEvent DISCONNECTED
    //具體邏輯不貼了
    clientEventExecutor.submit(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    try {
							...
                        }
                        reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail);
                    } catch (Throwable throwable) {
                        //Do nothing
                    }
                }
            }
        });
    ...
}

具體的邏輯就不粘貼了

//com.alibaba.nacos.common.remote.client.RpcClient#reconnect
protected void reconnect(final ServerInfo recommendServerInfo, boolean onRequestFail) {
    
    ...
        if (connectionNew != null) {
            ...
            if (currentConnection != null) {
                ...
                        closeConnection(currentConnection);
            }
            ...
        }
        if (isShutdown()) {
            closeConnection(currentConnection);
        }
    ...
}

closeConnection 方法會(huì)向eventLinkedBlockingQueue中添加一個(gè)ConnectionEvent.DISCONNECTED。

com.alibaba.nacos.common.remote.client.RpcClient#shutdown也會(huì)調(diào)用該方法。

//com.alibaba.nacos.common.remote.client.RpcClient#closeConnection
private void closeConnection(Connection connection) {
    if (connection != null) {
        connection.close();
        eventLinkedBlockingQueue.add(new ConnectionEvent(ConnectionEvent.DISCONNECTED));
    }
}

4.2、NamingHttpClientProxy

NamingHttpClientProxy 有兩個(gè)線程池,分別是心跳和接收服務(wù)端推送

public NamingHttpClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListManager serverListManager,
        Properties properties, ServiceInfoHolder serviceInfoHolder) {
    super(securityProxy, properties);
    this.serverListManager = serverListManager;
    this.setServerPort(DEFAULT_SERVER_PORT);
    this.namespaceId = namespaceId;
    //創(chuàng)建心跳
    this.beatReactor = new BeatReactor(this, properties);
    //接收服務(wù)端推送信息
    this.pushReceiver = new PushReceiver(serviceInfoHolder);
    this.maxRetry = ConvertUtils.toInt(properties.getProperty(PropertyKeyConst.NAMING_REQUEST_DOMAIN_RETRY_COUNT,
            String.valueOf(UtilAndComs.REQUEST_DOMAIN_RETRY_COUNT)));
}

4.2.1、NamingHttpClientProxy 的心跳

BeatReactor

public BeatReactor(NamingHttpClientProxy serverProxy, Properties properties) {
    this.serverProxy = serverProxy;
    int threadCount = initClientBeatThreadCount(properties);
    this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setDaemon(true);
            thread.setName("com.alibaba.nacos.naming.beat.sender");
            return thread;
        }
    });
}
public static final int DEFAULT_CLIENT_BEAT_THREAD_COUNT =
            ThreadUtils.getSuitableThreadCount(1) > 1 ? ThreadUtils.getSuitableThreadCount(1) / 2 : 1;
private int initClientBeatThreadCount(Properties properties) {
    if (properties == null) {
        return UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT;
    }
	//NAMING_CLIENT_BEAT_THREAD_COUNT = "namingClientBeatThreadCount";
    return ConvertUtils.toInt(properties.getProperty(PropertyKeyConst.NAMING_CLIENT_BEAT_THREAD_COUNT),
                              UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT);
}

添加心跳

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
    
    NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
            instance);
    String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    if (instance.isEphemeral()) {
        BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
        //添加心跳信息
        beatReactor.addBeatInfo(groupedServiceName, beatInfo);
    }
    ...

構(gòu)建心跳信息

public BeatInfo buildBeatInfo(String groupedServiceName, Instance instance) {
    BeatInfo beatInfo = new BeatInfo();
    beatInfo.setServiceName(groupedServiceName);
    beatInfo.setIp(instance.getIp());
    beatInfo.setPort(instance.getPort());
    beatInfo.setCluster(instance.getClusterName());
    beatInfo.setWeight(instance.getWeight());
    beatInfo.setMetadata(instance.getMetadata());
    beatInfo.setScheduled(false);
    beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
    return beatInfo;
}
//com.alibaba.nacos.api.naming.pojo.Instance#getInstanceHeartBeatInterval
//public static final long DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5);
public long getInstanceHeartBeatInterval() {
    //HEART_BEAT_INTERVAL = "preserved.heart.beat.interval";
    return getMetaDataByKeyWithDefault(PreservedMetadataKeys.HEART_BEAT_INTERVAL,
                                       Constants.DEFAULT_HEART_BEAT_INTERVAL);
}
private long getMetaDataByKeyWithDefault(final String key, final long defaultValue) {
    if (getMetadata() == null || getMetadata().isEmpty()) {
        return defaultValue;
    }
    final String value = getMetadata().get(key);
    if (!StringUtils.isEmpty(value) && value.matches(NUMBER_PATTERN)) {
        return Long.parseLong(value);
    }
    return defaultValue;
}
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
    NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
    String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
    BeatInfo existBeat;
    //fix #1733
    if ((existBeat = dom2Beat.remove(key)) != null) {
        existBeat.setStopped(true);
    }
    dom2Beat.put(key, beatInfo);
    //啟動(dòng)
    executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
    MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}

BeatTask是BeatReactor的內(nèi)部類

class BeatTask implements Runnable {
    
    BeatInfo beatInfo;
    
    public BeatTask(BeatInfo beatInfo) {
        this.beatInfo = beatInfo;
    }
    
    @Override
    public void run() {
        if (beatInfo.isStopped()) {
            return;
        }
        long nextTime = beatInfo.getPeriod();
        try {
            JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
            long interval = result.get(CLIENT_BEAT_INTERVAL_FIELD).asLong();
            boolean lightBeatEnabled = false;
            if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
                lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
            }
            BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
            if (interval > 0) {
                nextTime = interval;
            }
            int code = NamingResponseCode.OK;
            if (result.has(CommonParams.CODE)) {
                code = result.get(CommonParams.CODE).asInt();
            }
            if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
                Instance instance = new Instance();
                instance.setPort(beatInfo.getPort());
                instance.setIp(beatInfo.getIp());
                instance.setWeight(beatInfo.getWeight());
                instance.setMetadata(beatInfo.getMetadata());
                instance.setClusterName(beatInfo.getCluster());
                instance.setServiceName(beatInfo.getServiceName());
                instance.setInstanceId(instance.getInstanceId());
                instance.setEphemeral(true);
                try {
                    serverProxy.registerService(beatInfo.getServiceName(),
                            NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
                } catch (Exception ignore) {
                }
            }
        } catch (NacosException ex) {
            NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
                    JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());

        } catch (Exception unknownEx) {
            NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, unknown exception msg: {}",
                    JacksonUtils.toJson(beatInfo), unknownEx.getMessage(), unknownEx);
        } finally {
            executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
        }
    }
}

發(fā)起請(qǐng)求

//com.alibaba.nacos.client.naming.remote.http.NamingHttpClientProxy#sendBeat
public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {
    
    if (NAMING_LOGGER.isDebugEnabled()) {
        NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());
    }
    Map<String, String> params = new HashMap<String, String>(8);
    Map<String, String> bodyMap = new HashMap<String, String>(2);
    if (!lightBeatEnabled) {
        bodyMap.put("beat", JacksonUtils.toJson(beatInfo));
    }
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
    params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());
    params.put(IP_PARAM, beatInfo.getIp());
    params.put(PORT_PARAM, String.valueOf(beatInfo.getPort()));
    //reqApi和上篇的一樣
    String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);
    return JacksonUtils.toObj(result);
}

4.2.2、PushReceiver 接收服務(wù)端的推送消息

public PushReceiver(ServiceInfoHolder serviceInfoHolder) {
    try {
        this.serviceInfoHolder = serviceInfoHolder;
        //獲取udp的端口
        String udpPort = getPushReceiverUdpPort();
        //創(chuàng)建udpSocket
        if (StringUtils.isEmpty(udpPort)) {
            this.udpSocket = new DatagramSocket();
        } else {
            this.udpSocket = new DatagramSocket(new InetSocketAddress(Integer.parseInt(udpPort)));
        }
        //構(gòu)建線程池
        this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setDaemon(true);
                thread.setName("com.alibaba.nacos.naming.push.receiver");
                return thread;
            }
        });
        //執(zhí)行
        this.executorService.execute(this);
    } catch (Exception e) {
        NAMING_LOGGER.error("[NA] init udp socket failed", e);
    }
}

PushReceiver 實(shí)現(xiàn)了runnable接口

//com.alibaba.nacos.client.naming.core.PushReceiver#run
public void run() {
    while (!closed) {
        try {
            
            // byte[] is initialized with 0 full filled by default
            byte[] buffer = new byte[UDP_MSS];
            DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
            //接收udp數(shù)據(jù)
            udpSocket.receive(packet);
            //將接收的udp數(shù)據(jù)轉(zhuǎn)為json
            String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();
            NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
            //將接收的udp數(shù)據(jù)轉(zhuǎn)為PushPacket
            PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);
            String ack;
            //判斷 pushPacket.type 處理 并構(gòu)建ack
            if (PUSH_PACKAGE_TYPE_DOM.equals(pushPacket.type) || PUSH_PACKAGE_TYPE_SERVICE.equals(pushPacket.type)) {
                //dom或者service,處理請(qǐng)求,處理服務(wù)實(shí)例變更
                serviceInfoHolder.processServiceInfo(pushPacket.data);
                //返回ack,無實(shí)例數(shù)據(jù)
                // send ack to server
                ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":"
                        + "\"\"}";
            } else if (PUSH_PACKAGE_TYPE_DUMP.equals(pushPacket.type)) {
                //dump請(qǐng)求,將本實(shí)例保存的服務(wù)實(shí)例信息返回
                // dump data to server
                ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":"
                        + "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(serviceInfoHolder.getServiceInfoMap()))
                        + "\"}";
            } else {
                //返回ack,無實(shí)例數(shù)據(jù)
                // do nothing send ack only
                ack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
                        + "\", \"data\":" + "\"\"}";
            }
            //給服務(wù)端發(fā)送響應(yīng)
            udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length,
                    packet.getSocketAddress()));
        } catch (Exception e) {
            if (closed) {
                return;
            }
            NAMING_LOGGER.error("[NA] error while receiving push data", e);
        }
    }
}
public ServiceInfo processServiceInfo(String json) {
    ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class);
    serviceInfo.setJsonFromServer(json);
    //具體處理服務(wù)實(shí)例變更,下一篇中有具體邏輯
    return processServiceInfo(serviceInfo);
}

5、關(guān)于Spring Cloud Nacos一些bean的注冊(cè)順序

HealthIndicator
NacosWatch
NacosAutoServiceRegistration
所以在注冊(cè)NacosWatch和NacosAutoServiceRegistration的時(shí)候,HealthIndicator已經(jīng)把NamingService給初始化好了。

原文鏈接:https://blog.csdn.net/xuwenjingrenca/article/details/125083737

欄目分類
最近更新