網(wǎng)站首頁 編程語言 正文
Spring Cloud Alibaba Nacos 客戶端服務(wù)注冊(cè)心跳和健康監(jiān)測(cè)
作者:喜歡小蘋果的碼農(nóng) 更新時(shí)間: 2022-06-08 編程語言1、注冊(cè)HealthIndicator
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(Endpoint.class)
@ConditionalOnNacosDiscoveryEnabled
public class NacosDiscoveryEndpointAutoConfiguration {
@Bean
@ConditionalOnMissingBean
@ConditionalOnAvailableEndpoint
public NacosDiscoveryEndpoint nacosDiscoveryEndpoint(
NacosServiceManager nacosServiceManager,
NacosDiscoveryProperties nacosDiscoveryProperties) {
return new NacosDiscoveryEndpoint(nacosServiceManager, nacosDiscoveryProperties);
}
@Bean
@ConditionalOnEnabledHealthIndicator("nacos-discovery")
public HealthIndicator nacosDiscoveryHealthIndicator(
NacosServiceManager nacosServiceManager,
NacosDiscoveryProperties nacosDiscoveryProperties) {
Properties nacosProperties = nacosDiscoveryProperties.getNacosProperties();
return new NacosDiscoveryHealthIndicator(
nacosServiceManager.getNamingService(nacosProperties));
}
}
nacosServiceManager在NacosServiceAutoConfiguration中生成。
@Configuration(proxyBeanMethods = false)
@ConditionalOnDiscoveryEnabled
@ConditionalOnNacosDiscoveryEnabled
public class NacosServiceAutoConfiguration {
@Bean
public NacosServiceManager nacosServiceManager() {
return new NacosServiceManager();
}
}
可以看到,注冊(cè)HealthIndicator 時(shí),獲取了NamingService,并構(gòu)造了NacosDiscoveryHealthIndicator。
public NacosDiscoveryHealthIndicator(NamingService namingService) {
this.namingService = namingService;
}
NacosDiscoveryHealthIndicator中只有一個(gè)方法,檢查健康狀態(tài)。
protected void doHealthCheck(Health.Builder builder) throws Exception {
// Just return "UP" or "DOWN"
String status = namingService.getServerStatus();
// Set the status to Builder
builder.status(status);
switch (status) {
case STATUS_UP:
builder.up();
break;
case STATUS_DOWN:
builder.down();
break;
default:
builder.unknown();
break;
}
}
2、獲取NamingService
//com.alibaba.cloud.nacos.NacosServiceManager#getNamingService
public NamingService getNamingService(Properties properties) {
if (Objects.isNull(this.namingService)) {
buildNamingService(properties);
}
return namingService;
}
創(chuàng)建NamingService
//com.alibaba.cloud.nacos.NacosServiceManager#buildNamingService
private NamingService buildNamingService(Properties properties) {
if (Objects.isNull(namingService)) {
synchronized (NacosServiceManager.class) {
if (Objects.isNull(namingService)) {
namingService = createNewNamingService(properties);
}
}
}
return namingService;
}
//com.alibaba.cloud.nacos.NacosServiceManager#createNewNamingService
private NamingService createNewNamingService(Properties properties) {
try {
return createNamingService(properties);
}
catch (NacosException e) {
throw new RuntimeException(e);
}
}
//NamingFactory#createNamingService(java.util.Properties)
public static NamingService createNamingService(Properties properties) throws NacosException {
return NamingFactory.createNamingService(properties);
}
public static NamingService createNamingService(Properties properties) throws NacosException {
try {
Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");
Constructor constructor = driverImplClass.getConstructor(Properties.class);
return (NamingService) constructor.newInstance(properties);
} catch (Throwable e) {
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
}
}
//com.alibaba.nacos.client.naming.NacosNamingService#NacosNamingService
public NacosNamingService(Properties properties) throws NacosException {
init(properties);
}
3、初始化NamingService
private void init(Properties properties) throws NacosException {
ValidatorUtils.checkInitParam(properties);
this.namespace = InitUtils.initNamespaceForNaming(properties);
InitUtils.initSerialization();
InitUtils.initWebRootContext(properties);
initLogName(properties);
this.changeNotifier = new InstancesChangeNotifier();
NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
NotifyCenter.registerSubscriber(changeNotifier);
//構(gòu)建ServiceInfoHolder,里面有個(gè)定時(shí)任務(wù)刷新磁盤
this.serviceInfoHolder = new ServiceInfoHolder(namespace, properties);
//生成NamingClientProxyDelegate
this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, properties, changeNotifier);
}
3.1、ServiceInfoHolder
ServiceInfoHolder構(gòu)建時(shí)會(huì)初始化磁盤目錄并且生成任務(wù)操作磁盤緩存
public ServiceInfoHolder(String namespace, Properties properties) {
//初始化磁盤緩存目錄
initCacheDir(namespace, properties);
//判斷是否啟動(dòng)時(shí)加載緩存 namingLoadCacheAtStart 配置
if (isLoadCacheAtStart(properties)) {
this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(DiskCache.read(this.cacheDir));
} else {
this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(16);
}
//故障切換任務(wù)
this.failoverReactor = new FailoverReactor(this, cacheDir);
//加載 namingPushEmptyProtection 配置的值
this.pushEmptyProtection = isPushEmptyProtect(properties);
}
public FailoverReactor(ServiceInfoHolder serviceInfoHolder, String cacheDir) {
this.serviceInfoHolder = serviceInfoHolder;
this.failoverDir = cacheDir + FAILOVER_DIR;
// init executorService
this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.failover");
return thread;
}
});
this.init();
}
public void init() {
//故障切換 每5秒一次
executorService.scheduleWithFixedDelay(new SwitchRefresher(), 0L, 5000L, TimeUnit.MILLISECONDS);
//寫磁盤 30分鐘后開始,每天一次
executorService.scheduleWithFixedDelay(new DiskFileWriter(), 30, DAY_PERIOD_MINUTES, TimeUnit.MINUTES);
//備份 每10秒一次
// backup file on startup if failover directory is empty.
executorService.schedule(new Runnable() {
@Override
public void run() {
try {
File cacheDir = new File(failoverDir);
if (!cacheDir.exists() && !cacheDir.mkdirs()) {
throw new IllegalStateException("failed to create cache dir: " + failoverDir);
}
File[] files = cacheDir.listFiles();
if (files == null || files.length <= 0) {
new DiskFileWriter().run();
}
} catch (Throwable e) {
NAMING_LOGGER.error("[NA] failed to backup file on startup.", e);
}
}
}, 10000L, TimeUnit.MILLISECONDS);
}
具體的任務(wù)就不貼了
3.2、NamingGrpcClientProxy
public NamingClientProxyDelegate(String namespace, ServiceInfoHolder serviceInfoHolder, Properties properties,
InstancesChangeNotifier changeNotifier) throws NacosException {
this.serviceInfoUpdateService = new ServiceInfoUpdateService(properties, serviceInfoHolder, this,
changeNotifier);
this.serverListManager = new ServerListManager(properties, namespace);
this.serviceInfoHolder = serviceInfoHolder;
this.securityProxy = new SecurityProxy(properties, NamingHttpClientManager.getInstance().getNacosRestTemplate());
initSecurityProxy();
//生成NamingHttpClientProxy 非臨時(shí)實(shí)例使用
this.httpClientProxy = new NamingHttpClientProxy(namespace, securityProxy, serverListManager, properties,
serviceInfoHolder);
//生成NamingGrpcClientProxy 臨時(shí)實(shí)例使用
this.grpcClientProxy = new NamingGrpcClientProxy(namespace, securityProxy, serverListManager, properties,
serviceInfoHolder);
}
4、心跳的具體邏輯
4.1、NamingGrpcClientProxy 的心跳
public NamingGrpcClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListFactory serverListFactory,
Properties properties, ServiceInfoHolder serviceInfoHolder) throws NacosException {
super(securityProxy, properties);
this.namespaceId = namespaceId;
this.uuid = UUID.randomUUID().toString();
this.requestTimeout = Long.parseLong(properties.getProperty(CommonParams.NAMING_REQUEST_TIMEOUT, "-1"));
Map<String, String> labels = new HashMap<String, String>();
labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_SDK);
labels.put(RemoteConstants.LABEL_MODULE, RemoteConstants.LABEL_MODULE_NAMING);
this.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels);
//redo 心跳
this.redoService = new NamingGrpcRedoService(this);
start(serverListFactory, serviceInfoHolder);
}
private void start(ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException {
rpcClient.serverListFactory(serverListFactory);
rpcClient.registerConnectionListener(redoService);
rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder));
rpcClient.start();
NotifyCenter.registerSubscriber(this);
}
創(chuàng)建NamingGrpcRedoService
private static final String REDO_THREAD_NAME = "com.alibaba.nacos.client.naming.grpc.redo";
private static final int REDO_THREAD = 1;
private static final long DEFAULT_REDO_DELAY = 3000L;
public NamingGrpcRedoService(NamingGrpcClientProxy clientProxy) {
//生成單線程線程池
this.redoExecutor = new ScheduledThreadPoolExecutor(REDO_THREAD, new NameThreadFactory(REDO_THREAD_NAME));
//延遲執(zhí)行 傳入clientProxy 默認(rèn)3秒鐘延遲 這里新建了一個(gè)RedoScheduledTask
this.redoExecutor.scheduleWithFixedDelay(new RedoScheduledTask(clientProxy, this), DEFAULT_REDO_DELAY,
DEFAULT_REDO_DELAY, TimeUnit.MILLISECONDS);
}
NameThreadFactory
public class NameThreadFactory implements ThreadFactory {
private final AtomicInteger id = new AtomicInteger(0);
private String name;
public NameThreadFactory(String name) {
if (!name.endsWith(StringUtils.DOT)) {
name += StringUtils.DOT;
}
this.name = name;
}
@Override
public Thread newThread(Runnable r) {
String threadName = name + id.getAndIncrement();
Thread thread = new Thread(r, threadName);
thread.setDaemon(true);
return thread;
}
}
心跳的定時(shí)任務(wù)
public class RedoScheduledTask extends AbstractExecuteTask {
private final NamingGrpcClientProxy clientProxy;
private final NamingGrpcRedoService redoService;
public RedoScheduledTask(NamingGrpcClientProxy clientProxy, NamingGrpcRedoService redoService) {
this.clientProxy = clientProxy;
this.redoService = redoService;
}
@Override
public void run() {
//沒連接上服務(wù)端
if (!redoService.isConnected()) {
LogUtils.NAMING_LOGGER.warn("Grpc Connection is disconnect, skip current redo task");
return;
}
try {
//注冊(cè)
redoForInstances();
//訂閱
redoForSubscribes();
} catch (Exception e) {
LogUtils.NAMING_LOGGER.warn("Redo task run with unexpected exception: ", e);
}
}
private void redoForInstances() {
//遍歷
for (InstanceRedoData each : redoService.findInstanceRedoData()) {
try {
redoForInstance(each);
} catch (NacosException e) {
LogUtils.NAMING_LOGGER.error("Redo instance operation {} for {}@@{} failed. ", each.getRedoType(),
each.getGroupName(), each.getServiceName(), e);
}
}
}
private void redoForInstance(InstanceRedoData redoData) throws NacosException {
RedoData.RedoType redoType = redoData.getRedoType();
String serviceName = redoData.getServiceName();
String groupName = redoData.getGroupName();
LogUtils.NAMING_LOGGER.info("Redo instance operation {} for {}@@{}", redoType, groupName, serviceName);
switch (redoType) {
case REGISTER:
if (isClientDisabled()) {
return;
}
//調(diào)用注冊(cè)方法注冊(cè)
clientProxy.doRegisterService(serviceName, groupName, redoData.get());
break;
case UNREGISTER:
//非 RUNNING狀態(tài)
if (isClientDisabled()) {
return;
}
//服務(wù)下線
clientProxy.doDeregisterService(serviceName, groupName, redoData.get());
break;
case REMOVE:
//移除心跳
redoService.removeInstanceForRedo(serviceName, groupName);
break;
default:
}
}
private void redoForSubscribes() {
for (SubscriberRedoData each : redoService.findSubscriberRedoData()) {
try {
redoForSubscribe(each);
} catch (NacosException e) {
LogUtils.NAMING_LOGGER.error("Redo subscriber operation {} for {}@@{}#{} failed. ", each.getRedoType(),
each.getGroupName(), each.getServiceName(), each.get(), e);
}
}
}
private void redoForSubscribe(SubscriberRedoData redoData) throws NacosException {
RedoData.RedoType redoType = redoData.getRedoType();
String serviceName = redoData.getServiceName();
String groupName = redoData.getGroupName();
String cluster = redoData.get();
LogUtils.NAMING_LOGGER.info("Redo subscriber operation {} for {}@@{}#{}", redoType, groupName, serviceName, cluster);
switch (redoData.getRedoType()) {
case REGISTER:
if (isClientDisabled()) {
return;
}
clientProxy.doSubscribe(serviceName, groupName, cluster);
break;
case UNREGISTER:
if (isClientDisabled()) {
return;
}
clientProxy.doUnsubscribe(serviceName, groupName, cluster);
break;
case REMOVE:
redoService.removeSubscriberForRedo(redoData.getServiceName(), redoData.getGroupName(), redoData.get());
break;
default:
}
}
private boolean isClientDisabled() {
return !clientProxy.isEnable();
}
}
public RedoType getRedoType() {
if (isRegistered() && !isUnregistering()) {
return RedoType.NONE;
} else if (isRegistered() && isUnregistering()) {
return RedoType.UNREGISTER;
} else if (!isRegistered() && !isUnregistering()) {
return RedoType.REGISTER;
} else {
return RedoType.REMOVE;
}
}
redoData什么時(shí)候會(huì)將registered重新設(shè)置為false呢
public class NamingGrpcRedoService implements ConnectionEventListener
NamingGrpcRedoService實(shí)現(xiàn)了ConnectionEventListener接口
public void onDisConnect() {
connected = false;
LogUtils.NAMING_LOGGER.warn("Grpc connection disconnect, mark to redo");
synchronized (registeredInstances) {
registeredInstances.values().forEach(instanceRedoData -> instanceRedoData.setRegistered(false));
}
synchronized (subscribes) {
subscribes.values().forEach(subscriberRedoData -> subscriberRedoData.setRegistered(false));
}
LogUtils.NAMING_LOGGER.warn("mark to redo completed");
}
當(dāng)監(jiān)聽到com.alibaba.nacos.common.remote.client.RpcClient.ConnectionEvent 事件eventType == DISCONNECTED時(shí),會(huì)回調(diào)這個(gè)方法,將所有的redoData的registered設(shè)置為false,從而可以觸發(fā)任務(wù)的重新注冊(cè)和訂閱。
當(dāng)構(gòu)造NamingGrpcClientProxy時(shí)會(huì)調(diào)rpcClient的start方法,構(gòu)建了一個(gè)線程池。想詳細(xì)了解這個(gè)方法可以去翻一下。
//com.alibaba.nacos.common.remote.client.RpcClient#start
public final void start() throws NacosException {
...
//這里又構(gòu)建了一個(gè)線程池。
clientEventExecutor = new ScheduledThreadPoolExecutor(2, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.remote.worker");
t.setDaemon(true);
return t;
}
});
// connection event consumer.
//從 eventLinkedBlockingQueue 獲取ConnectionEvent事件,并做相應(yīng)操作
clientEventExecutor.submit(new Runnable() {
@Override
public void run() {
while (!clientEventExecutor.isTerminated() && !clientEventExecutor.isShutdown()) {
ConnectionEvent take = null;
try {
take = eventLinkedBlockingQueue.take();
if (take.isConnected()) {
notifyConnected();
} else if (take.isDisConnected()) {
notifyDisConnected();
}
} catch (Throwable e) {
//Do nothing
}
}
}
});
//這個(gè)線程判斷是否需要重新連接,重新連接是會(huì)發(fā)布ConnectionEvent DISCONNECTED
//具體邏輯不貼了
clientEventExecutor.submit(new Runnable() {
@Override
public void run() {
while (true) {
try {
...
}
reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail);
} catch (Throwable throwable) {
//Do nothing
}
}
}
});
...
}
具體的邏輯就不粘貼了
//com.alibaba.nacos.common.remote.client.RpcClient#reconnect
protected void reconnect(final ServerInfo recommendServerInfo, boolean onRequestFail) {
...
if (connectionNew != null) {
...
if (currentConnection != null) {
...
closeConnection(currentConnection);
}
...
}
if (isShutdown()) {
closeConnection(currentConnection);
}
...
}
closeConnection 方法會(huì)向eventLinkedBlockingQueue中添加一個(gè)ConnectionEvent.DISCONNECTED。
com.alibaba.nacos.common.remote.client.RpcClient#shutdown也會(huì)調(diào)用該方法。
//com.alibaba.nacos.common.remote.client.RpcClient#closeConnection
private void closeConnection(Connection connection) {
if (connection != null) {
connection.close();
eventLinkedBlockingQueue.add(new ConnectionEvent(ConnectionEvent.DISCONNECTED));
}
}
4.2、NamingHttpClientProxy
NamingHttpClientProxy 有兩個(gè)線程池,分別是心跳和接收服務(wù)端推送
public NamingHttpClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListManager serverListManager,
Properties properties, ServiceInfoHolder serviceInfoHolder) {
super(securityProxy, properties);
this.serverListManager = serverListManager;
this.setServerPort(DEFAULT_SERVER_PORT);
this.namespaceId = namespaceId;
//創(chuàng)建心跳
this.beatReactor = new BeatReactor(this, properties);
//接收服務(wù)端推送信息
this.pushReceiver = new PushReceiver(serviceInfoHolder);
this.maxRetry = ConvertUtils.toInt(properties.getProperty(PropertyKeyConst.NAMING_REQUEST_DOMAIN_RETRY_COUNT,
String.valueOf(UtilAndComs.REQUEST_DOMAIN_RETRY_COUNT)));
}
4.2.1、NamingHttpClientProxy 的心跳
BeatReactor
public BeatReactor(NamingHttpClientProxy serverProxy, Properties properties) {
this.serverProxy = serverProxy;
int threadCount = initClientBeatThreadCount(properties);
this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.beat.sender");
return thread;
}
});
}
public static final int DEFAULT_CLIENT_BEAT_THREAD_COUNT =
ThreadUtils.getSuitableThreadCount(1) > 1 ? ThreadUtils.getSuitableThreadCount(1) / 2 : 1;
private int initClientBeatThreadCount(Properties properties) {
if (properties == null) {
return UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT;
}
//NAMING_CLIENT_BEAT_THREAD_COUNT = "namingClientBeatThreadCount";
return ConvertUtils.toInt(properties.getProperty(PropertyKeyConst.NAMING_CLIENT_BEAT_THREAD_COUNT),
UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT);
}
添加心跳
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
instance);
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
if (instance.isEphemeral()) {
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
//添加心跳信息
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
...
構(gòu)建心跳信息
public BeatInfo buildBeatInfo(String groupedServiceName, Instance instance) {
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName(groupedServiceName);
beatInfo.setIp(instance.getIp());
beatInfo.setPort(instance.getPort());
beatInfo.setCluster(instance.getClusterName());
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
return beatInfo;
}
//com.alibaba.nacos.api.naming.pojo.Instance#getInstanceHeartBeatInterval
//public static final long DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5);
public long getInstanceHeartBeatInterval() {
//HEART_BEAT_INTERVAL = "preserved.heart.beat.interval";
return getMetaDataByKeyWithDefault(PreservedMetadataKeys.HEART_BEAT_INTERVAL,
Constants.DEFAULT_HEART_BEAT_INTERVAL);
}
private long getMetaDataByKeyWithDefault(final String key, final long defaultValue) {
if (getMetadata() == null || getMetadata().isEmpty()) {
return defaultValue;
}
final String value = getMetadata().get(key);
if (!StringUtils.isEmpty(value) && value.matches(NUMBER_PATTERN)) {
return Long.parseLong(value);
}
return defaultValue;
}
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
BeatInfo existBeat;
//fix #1733
if ((existBeat = dom2Beat.remove(key)) != null) {
existBeat.setStopped(true);
}
dom2Beat.put(key, beatInfo);
//啟動(dòng)
executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}
BeatTask是BeatReactor的內(nèi)部類
class BeatTask implements Runnable {
BeatInfo beatInfo;
public BeatTask(BeatInfo beatInfo) {
this.beatInfo = beatInfo;
}
@Override
public void run() {
if (beatInfo.isStopped()) {
return;
}
long nextTime = beatInfo.getPeriod();
try {
JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
long interval = result.get(CLIENT_BEAT_INTERVAL_FIELD).asLong();
boolean lightBeatEnabled = false;
if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
}
BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
if (interval > 0) {
nextTime = interval;
}
int code = NamingResponseCode.OK;
if (result.has(CommonParams.CODE)) {
code = result.get(CommonParams.CODE).asInt();
}
if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
Instance instance = new Instance();
instance.setPort(beatInfo.getPort());
instance.setIp(beatInfo.getIp());
instance.setWeight(beatInfo.getWeight());
instance.setMetadata(beatInfo.getMetadata());
instance.setClusterName(beatInfo.getCluster());
instance.setServiceName(beatInfo.getServiceName());
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(true);
try {
serverProxy.registerService(beatInfo.getServiceName(),
NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
} catch (Exception ignore) {
}
}
} catch (NacosException ex) {
NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());
} catch (Exception unknownEx) {
NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, unknown exception msg: {}",
JacksonUtils.toJson(beatInfo), unknownEx.getMessage(), unknownEx);
} finally {
executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
}
}
發(fā)起請(qǐng)求
//com.alibaba.nacos.client.naming.remote.http.NamingHttpClientProxy#sendBeat
public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());
}
Map<String, String> params = new HashMap<String, String>(8);
Map<String, String> bodyMap = new HashMap<String, String>(2);
if (!lightBeatEnabled) {
bodyMap.put("beat", JacksonUtils.toJson(beatInfo));
}
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());
params.put(IP_PARAM, beatInfo.getIp());
params.put(PORT_PARAM, String.valueOf(beatInfo.getPort()));
//reqApi和上篇的一樣
String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);
return JacksonUtils.toObj(result);
}
4.2.2、PushReceiver 接收服務(wù)端的推送消息
public PushReceiver(ServiceInfoHolder serviceInfoHolder) {
try {
this.serviceInfoHolder = serviceInfoHolder;
//獲取udp的端口
String udpPort = getPushReceiverUdpPort();
//創(chuàng)建udpSocket
if (StringUtils.isEmpty(udpPort)) {
this.udpSocket = new DatagramSocket();
} else {
this.udpSocket = new DatagramSocket(new InetSocketAddress(Integer.parseInt(udpPort)));
}
//構(gòu)建線程池
this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.push.receiver");
return thread;
}
});
//執(zhí)行
this.executorService.execute(this);
} catch (Exception e) {
NAMING_LOGGER.error("[NA] init udp socket failed", e);
}
}
PushReceiver 實(shí)現(xiàn)了runnable接口
//com.alibaba.nacos.client.naming.core.PushReceiver#run
public void run() {
while (!closed) {
try {
// byte[] is initialized with 0 full filled by default
byte[] buffer = new byte[UDP_MSS];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
//接收udp數(shù)據(jù)
udpSocket.receive(packet);
//將接收的udp數(shù)據(jù)轉(zhuǎn)為json
String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();
NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
//將接收的udp數(shù)據(jù)轉(zhuǎn)為PushPacket
PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);
String ack;
//判斷 pushPacket.type 處理 并構(gòu)建ack
if (PUSH_PACKAGE_TYPE_DOM.equals(pushPacket.type) || PUSH_PACKAGE_TYPE_SERVICE.equals(pushPacket.type)) {
//dom或者service,處理請(qǐng)求,處理服務(wù)實(shí)例變更
serviceInfoHolder.processServiceInfo(pushPacket.data);
//返回ack,無實(shí)例數(shù)據(jù)
// send ack to server
ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":"
+ "\"\"}";
} else if (PUSH_PACKAGE_TYPE_DUMP.equals(pushPacket.type)) {
//dump請(qǐng)求,將本實(shí)例保存的服務(wù)實(shí)例信息返回
// dump data to server
ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":"
+ "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(serviceInfoHolder.getServiceInfoMap()))
+ "\"}";
} else {
//返回ack,無實(shí)例數(shù)據(jù)
// do nothing send ack only
ack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
+ "\", \"data\":" + "\"\"}";
}
//給服務(wù)端發(fā)送響應(yīng)
udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length,
packet.getSocketAddress()));
} catch (Exception e) {
if (closed) {
return;
}
NAMING_LOGGER.error("[NA] error while receiving push data", e);
}
}
}
public ServiceInfo processServiceInfo(String json) {
ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class);
serviceInfo.setJsonFromServer(json);
//具體處理服務(wù)實(shí)例變更,下一篇中有具體邏輯
return processServiceInfo(serviceInfo);
}
5、關(guān)于Spring Cloud Nacos一些bean的注冊(cè)順序
HealthIndicator
NacosWatch
NacosAutoServiceRegistration
所以在注冊(cè)NacosWatch和NacosAutoServiceRegistration的時(shí)候,HealthIndicator已經(jīng)把NamingService給初始化好了。
原文鏈接:https://blog.csdn.net/xuwenjingrenca/article/details/125083737
相關(guān)推薦
- 2023-03-04 Python使用yaml模塊操作YAML文檔的方法_python
- 2022-09-15 圖文詳解在Anaconda安裝Pytorch的詳細(xì)步驟_python
- 2022-06-22 Android實(shí)現(xiàn)歡迎滑動(dòng)頁面_Android
- 2022-09-06 golang實(shí)現(xiàn)命令行程序的使用幫助功能_Golang
- 2022-05-31 如何使用yolov5輸出檢測(cè)到的目標(biāo)坐標(biāo)信息_python
- 2022-05-27 Entity?Framework?Core相關(guān)包的概念介紹與安裝_實(shí)用技巧
- 2023-02-04 C語言實(shí)現(xiàn)繪制可愛的橘子鐘表_C 語言
- 2022-06-19 Go中Writer和Reader接口的使用入門_Golang
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細(xì)win安裝深度學(xué)習(xí)環(huán)境2025年最新版(
- Linux 中運(yùn)行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲(chǔ)小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎(chǔ)操作-- 運(yùn)算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認(rèn)證信息的處理
- Spring Security之認(rèn)證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權(quán)
- redisson分布式鎖中waittime的設(shè)
- maven:解決release錯(cuò)誤:Artif
- restTemplate使用總結(jié)
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實(shí)現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結(jié)構(gòu)-簡(jiǎn)單動(dòng)態(tài)字符串(SD
- arthas操作spring被代理目標(biāo)對(duì)象命令
- Spring中的單例模式應(yīng)用詳解
- 聊聊消息隊(duì)列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠(yuǎn)程分支