網站首頁 編程語言 正文
1、注冊HealthIndicator
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(Endpoint.class)
@ConditionalOnNacosDiscoveryEnabled
public class NacosDiscoveryEndpointAutoConfiguration {
@Bean
@ConditionalOnMissingBean
@ConditionalOnAvailableEndpoint
public NacosDiscoveryEndpoint nacosDiscoveryEndpoint(
NacosServiceManager nacosServiceManager,
NacosDiscoveryProperties nacosDiscoveryProperties) {
return new NacosDiscoveryEndpoint(nacosServiceManager, nacosDiscoveryProperties);
}
@Bean
@ConditionalOnEnabledHealthIndicator("nacos-discovery")
public HealthIndicator nacosDiscoveryHealthIndicator(
NacosServiceManager nacosServiceManager,
NacosDiscoveryProperties nacosDiscoveryProperties) {
Properties nacosProperties = nacosDiscoveryProperties.getNacosProperties();
return new NacosDiscoveryHealthIndicator(
nacosServiceManager.getNamingService(nacosProperties));
}
}
nacosServiceManager在NacosServiceAutoConfiguration中生成。
@Configuration(proxyBeanMethods = false)
@ConditionalOnDiscoveryEnabled
@ConditionalOnNacosDiscoveryEnabled
public class NacosServiceAutoConfiguration {
@Bean
public NacosServiceManager nacosServiceManager() {
return new NacosServiceManager();
}
}
可以看到,注冊HealthIndicator 時,獲取了NamingService,并構造了NacosDiscoveryHealthIndicator。
public NacosDiscoveryHealthIndicator(NamingService namingService) {
this.namingService = namingService;
}
NacosDiscoveryHealthIndicator中只有一個方法,檢查健康狀態。
protected void doHealthCheck(Health.Builder builder) throws Exception {
// Just return "UP" or "DOWN"
String status = namingService.getServerStatus();
// Set the status to Builder
builder.status(status);
switch (status) {
case STATUS_UP:
builder.up();
break;
case STATUS_DOWN:
builder.down();
break;
default:
builder.unknown();
break;
}
}
2、獲取NamingService
//com.alibaba.cloud.nacos.NacosServiceManager#getNamingService
public NamingService getNamingService(Properties properties) {
if (Objects.isNull(this.namingService)) {
buildNamingService(properties);
}
return namingService;
}
創建NamingService
//com.alibaba.cloud.nacos.NacosServiceManager#buildNamingService
private NamingService buildNamingService(Properties properties) {
if (Objects.isNull(namingService)) {
synchronized (NacosServiceManager.class) {
if (Objects.isNull(namingService)) {
namingService = createNewNamingService(properties);
}
}
}
return namingService;
}
//com.alibaba.cloud.nacos.NacosServiceManager#createNewNamingService
private NamingService createNewNamingService(Properties properties) {
try {
return createNamingService(properties);
}
catch (NacosException e) {
throw new RuntimeException(e);
}
}
//NamingFactory#createNamingService(java.util.Properties)
public static NamingService createNamingService(Properties properties) throws NacosException {
return NamingFactory.createNamingService(properties);
}
public static NamingService createNamingService(Properties properties) throws NacosException {
try {
Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");
Constructor constructor = driverImplClass.getConstructor(Properties.class);
return (NamingService) constructor.newInstance(properties);
} catch (Throwable e) {
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
}
}
//com.alibaba.nacos.client.naming.NacosNamingService#NacosNamingService
public NacosNamingService(Properties properties) throws NacosException {
init(properties);
}
3、初始化NamingService
private void init(Properties properties) throws NacosException {
ValidatorUtils.checkInitParam(properties);
this.namespace = InitUtils.initNamespaceForNaming(properties);
InitUtils.initSerialization();
InitUtils.initWebRootContext(properties);
initLogName(properties);
this.changeNotifier = new InstancesChangeNotifier();
NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
NotifyCenter.registerSubscriber(changeNotifier);
//構建ServiceInfoHolder,里面有個定時任務刷新磁盤
this.serviceInfoHolder = new ServiceInfoHolder(namespace, properties);
//生成NamingClientProxyDelegate
this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, properties, changeNotifier);
}
3.1、ServiceInfoHolder
ServiceInfoHolder構建時會初始化磁盤目錄并且生成任務操作磁盤緩存
public ServiceInfoHolder(String namespace, Properties properties) {
//初始化磁盤緩存目錄
initCacheDir(namespace, properties);
//判斷是否啟動時加載緩存 namingLoadCacheAtStart 配置
if (isLoadCacheAtStart(properties)) {
this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(DiskCache.read(this.cacheDir));
} else {
this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(16);
}
//故障切換任務
this.failoverReactor = new FailoverReactor(this, cacheDir);
//加載 namingPushEmptyProtection 配置的值
this.pushEmptyProtection = isPushEmptyProtect(properties);
}
public FailoverReactor(ServiceInfoHolder serviceInfoHolder, String cacheDir) {
this.serviceInfoHolder = serviceInfoHolder;
this.failoverDir = cacheDir + FAILOVER_DIR;
// init executorService
this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.failover");
return thread;
}
});
this.init();
}
public void init() {
//故障切換 每5秒一次
executorService.scheduleWithFixedDelay(new SwitchRefresher(), 0L, 5000L, TimeUnit.MILLISECONDS);
//寫磁盤 30分鐘后開始,每天一次
executorService.scheduleWithFixedDelay(new DiskFileWriter(), 30, DAY_PERIOD_MINUTES, TimeUnit.MINUTES);
//備份 每10秒一次
// backup file on startup if failover directory is empty.
executorService.schedule(new Runnable() {
@Override
public void run() {
try {
File cacheDir = new File(failoverDir);
if (!cacheDir.exists() && !cacheDir.mkdirs()) {
throw new IllegalStateException("failed to create cache dir: " + failoverDir);
}
File[] files = cacheDir.listFiles();
if (files == null || files.length <= 0) {
new DiskFileWriter().run();
}
} catch (Throwable e) {
NAMING_LOGGER.error("[NA] failed to backup file on startup.", e);
}
}
}, 10000L, TimeUnit.MILLISECONDS);
}
具體的任務就不貼了
3.2、NamingGrpcClientProxy
public NamingClientProxyDelegate(String namespace, ServiceInfoHolder serviceInfoHolder, Properties properties,
InstancesChangeNotifier changeNotifier) throws NacosException {
this.serviceInfoUpdateService = new ServiceInfoUpdateService(properties, serviceInfoHolder, this,
changeNotifier);
this.serverListManager = new ServerListManager(properties, namespace);
this.serviceInfoHolder = serviceInfoHolder;
this.securityProxy = new SecurityProxy(properties, NamingHttpClientManager.getInstance().getNacosRestTemplate());
initSecurityProxy();
//生成NamingHttpClientProxy 非臨時實例使用
this.httpClientProxy = new NamingHttpClientProxy(namespace, securityProxy, serverListManager, properties,
serviceInfoHolder);
//生成NamingGrpcClientProxy 臨時實例使用
this.grpcClientProxy = new NamingGrpcClientProxy(namespace, securityProxy, serverListManager, properties,
serviceInfoHolder);
}
4、心跳的具體邏輯
4.1、NamingGrpcClientProxy 的心跳
public NamingGrpcClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListFactory serverListFactory,
Properties properties, ServiceInfoHolder serviceInfoHolder) throws NacosException {
super(securityProxy, properties);
this.namespaceId = namespaceId;
this.uuid = UUID.randomUUID().toString();
this.requestTimeout = Long.parseLong(properties.getProperty(CommonParams.NAMING_REQUEST_TIMEOUT, "-1"));
Map<String, String> labels = new HashMap<String, String>();
labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_SDK);
labels.put(RemoteConstants.LABEL_MODULE, RemoteConstants.LABEL_MODULE_NAMING);
this.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels);
//redo 心跳
this.redoService = new NamingGrpcRedoService(this);
start(serverListFactory, serviceInfoHolder);
}
private void start(ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException {
rpcClient.serverListFactory(serverListFactory);
rpcClient.registerConnectionListener(redoService);
rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder));
rpcClient.start();
NotifyCenter.registerSubscriber(this);
}
創建NamingGrpcRedoService
private static final String REDO_THREAD_NAME = "com.alibaba.nacos.client.naming.grpc.redo";
private static final int REDO_THREAD = 1;
private static final long DEFAULT_REDO_DELAY = 3000L;
public NamingGrpcRedoService(NamingGrpcClientProxy clientProxy) {
//生成單線程線程池
this.redoExecutor = new ScheduledThreadPoolExecutor(REDO_THREAD, new NameThreadFactory(REDO_THREAD_NAME));
//延遲執行 傳入clientProxy 默認3秒鐘延遲 這里新建了一個RedoScheduledTask
this.redoExecutor.scheduleWithFixedDelay(new RedoScheduledTask(clientProxy, this), DEFAULT_REDO_DELAY,
DEFAULT_REDO_DELAY, TimeUnit.MILLISECONDS);
}
NameThreadFactory
public class NameThreadFactory implements ThreadFactory {
private final AtomicInteger id = new AtomicInteger(0);
private String name;
public NameThreadFactory(String name) {
if (!name.endsWith(StringUtils.DOT)) {
name += StringUtils.DOT;
}
this.name = name;
}
@Override
public Thread newThread(Runnable r) {
String threadName = name + id.getAndIncrement();
Thread thread = new Thread(r, threadName);
thread.setDaemon(true);
return thread;
}
}
心跳的定時任務
public class RedoScheduledTask extends AbstractExecuteTask {
private final NamingGrpcClientProxy clientProxy;
private final NamingGrpcRedoService redoService;
public RedoScheduledTask(NamingGrpcClientProxy clientProxy, NamingGrpcRedoService redoService) {
this.clientProxy = clientProxy;
this.redoService = redoService;
}
@Override
public void run() {
//沒連接上服務端
if (!redoService.isConnected()) {
LogUtils.NAMING_LOGGER.warn("Grpc Connection is disconnect, skip current redo task");
return;
}
try {
//注冊
redoForInstances();
//訂閱
redoForSubscribes();
} catch (Exception e) {
LogUtils.NAMING_LOGGER.warn("Redo task run with unexpected exception: ", e);
}
}
private void redoForInstances() {
//遍歷
for (InstanceRedoData each : redoService.findInstanceRedoData()) {
try {
redoForInstance(each);
} catch (NacosException e) {
LogUtils.NAMING_LOGGER.error("Redo instance operation {} for {}@@{} failed. ", each.getRedoType(),
each.getGroupName(), each.getServiceName(), e);
}
}
}
private void redoForInstance(InstanceRedoData redoData) throws NacosException {
RedoData.RedoType redoType = redoData.getRedoType();
String serviceName = redoData.getServiceName();
String groupName = redoData.getGroupName();
LogUtils.NAMING_LOGGER.info("Redo instance operation {} for {}@@{}", redoType, groupName, serviceName);
switch (redoType) {
case REGISTER:
if (isClientDisabled()) {
return;
}
//調用注冊方法注冊
clientProxy.doRegisterService(serviceName, groupName, redoData.get());
break;
case UNREGISTER:
//非 RUNNING狀態
if (isClientDisabled()) {
return;
}
//服務下線
clientProxy.doDeregisterService(serviceName, groupName, redoData.get());
break;
case REMOVE:
//移除心跳
redoService.removeInstanceForRedo(serviceName, groupName);
break;
default:
}
}
private void redoForSubscribes() {
for (SubscriberRedoData each : redoService.findSubscriberRedoData()) {
try {
redoForSubscribe(each);
} catch (NacosException e) {
LogUtils.NAMING_LOGGER.error("Redo subscriber operation {} for {}@@{}#{} failed. ", each.getRedoType(),
each.getGroupName(), each.getServiceName(), each.get(), e);
}
}
}
private void redoForSubscribe(SubscriberRedoData redoData) throws NacosException {
RedoData.RedoType redoType = redoData.getRedoType();
String serviceName = redoData.getServiceName();
String groupName = redoData.getGroupName();
String cluster = redoData.get();
LogUtils.NAMING_LOGGER.info("Redo subscriber operation {} for {}@@{}#{}", redoType, groupName, serviceName, cluster);
switch (redoData.getRedoType()) {
case REGISTER:
if (isClientDisabled()) {
return;
}
clientProxy.doSubscribe(serviceName, groupName, cluster);
break;
case UNREGISTER:
if (isClientDisabled()) {
return;
}
clientProxy.doUnsubscribe(serviceName, groupName, cluster);
break;
case REMOVE:
redoService.removeSubscriberForRedo(redoData.getServiceName(), redoData.getGroupName(), redoData.get());
break;
default:
}
}
private boolean isClientDisabled() {
return !clientProxy.isEnable();
}
}
public RedoType getRedoType() {
if (isRegistered() && !isUnregistering()) {
return RedoType.NONE;
} else if (isRegistered() && isUnregistering()) {
return RedoType.UNREGISTER;
} else if (!isRegistered() && !isUnregistering()) {
return RedoType.REGISTER;
} else {
return RedoType.REMOVE;
}
}
redoData什么時候會將registered重新設置為false呢
public class NamingGrpcRedoService implements ConnectionEventListener
NamingGrpcRedoService實現了ConnectionEventListener接口
public void onDisConnect() {
connected = false;
LogUtils.NAMING_LOGGER.warn("Grpc connection disconnect, mark to redo");
synchronized (registeredInstances) {
registeredInstances.values().forEach(instanceRedoData -> instanceRedoData.setRegistered(false));
}
synchronized (subscribes) {
subscribes.values().forEach(subscriberRedoData -> subscriberRedoData.setRegistered(false));
}
LogUtils.NAMING_LOGGER.warn("mark to redo completed");
}
當監聽到com.alibaba.nacos.common.remote.client.RpcClient.ConnectionEvent 事件eventType == DISCONNECTED時,會回調這個方法,將所有的redoData的registered設置為false,從而可以觸發任務的重新注冊和訂閱。
當構造NamingGrpcClientProxy時會調rpcClient的start方法,構建了一個線程池。想詳細了解這個方法可以去翻一下。
//com.alibaba.nacos.common.remote.client.RpcClient#start
public final void start() throws NacosException {
...
//這里又構建了一個線程池。
clientEventExecutor = new ScheduledThreadPoolExecutor(2, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.remote.worker");
t.setDaemon(true);
return t;
}
});
// connection event consumer.
//從 eventLinkedBlockingQueue 獲取ConnectionEvent事件,并做相應操作
clientEventExecutor.submit(new Runnable() {
@Override
public void run() {
while (!clientEventExecutor.isTerminated() && !clientEventExecutor.isShutdown()) {
ConnectionEvent take = null;
try {
take = eventLinkedBlockingQueue.take();
if (take.isConnected()) {
notifyConnected();
} else if (take.isDisConnected()) {
notifyDisConnected();
}
} catch (Throwable e) {
//Do nothing
}
}
}
});
//這個線程判斷是否需要重新連接,重新連接是會發布ConnectionEvent DISCONNECTED
//具體邏輯不貼了
clientEventExecutor.submit(new Runnable() {
@Override
public void run() {
while (true) {
try {
...
}
reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail);
} catch (Throwable throwable) {
//Do nothing
}
}
}
});
...
}
具體的邏輯就不粘貼了
//com.alibaba.nacos.common.remote.client.RpcClient#reconnect
protected void reconnect(final ServerInfo recommendServerInfo, boolean onRequestFail) {
...
if (connectionNew != null) {
...
if (currentConnection != null) {
...
closeConnection(currentConnection);
}
...
}
if (isShutdown()) {
closeConnection(currentConnection);
}
...
}
closeConnection 方法會向eventLinkedBlockingQueue中添加一個ConnectionEvent.DISCONNECTED。
com.alibaba.nacos.common.remote.client.RpcClient#shutdown也會調用該方法。
//com.alibaba.nacos.common.remote.client.RpcClient#closeConnection
private void closeConnection(Connection connection) {
if (connection != null) {
connection.close();
eventLinkedBlockingQueue.add(new ConnectionEvent(ConnectionEvent.DISCONNECTED));
}
}
4.2、NamingHttpClientProxy
NamingHttpClientProxy 有兩個線程池,分別是心跳和接收服務端推送
public NamingHttpClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListManager serverListManager,
Properties properties, ServiceInfoHolder serviceInfoHolder) {
super(securityProxy, properties);
this.serverListManager = serverListManager;
this.setServerPort(DEFAULT_SERVER_PORT);
this.namespaceId = namespaceId;
//創建心跳
this.beatReactor = new BeatReactor(this, properties);
//接收服務端推送信息
this.pushReceiver = new PushReceiver(serviceInfoHolder);
this.maxRetry = ConvertUtils.toInt(properties.getProperty(PropertyKeyConst.NAMING_REQUEST_DOMAIN_RETRY_COUNT,
String.valueOf(UtilAndComs.REQUEST_DOMAIN_RETRY_COUNT)));
}
4.2.1、NamingHttpClientProxy 的心跳
BeatReactor
public BeatReactor(NamingHttpClientProxy serverProxy, Properties properties) {
this.serverProxy = serverProxy;
int threadCount = initClientBeatThreadCount(properties);
this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.beat.sender");
return thread;
}
});
}
public static final int DEFAULT_CLIENT_BEAT_THREAD_COUNT =
ThreadUtils.getSuitableThreadCount(1) > 1 ? ThreadUtils.getSuitableThreadCount(1) / 2 : 1;
private int initClientBeatThreadCount(Properties properties) {
if (properties == null) {
return UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT;
}
//NAMING_CLIENT_BEAT_THREAD_COUNT = "namingClientBeatThreadCount";
return ConvertUtils.toInt(properties.getProperty(PropertyKeyConst.NAMING_CLIENT_BEAT_THREAD_COUNT),
UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT);
}
添加心跳
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
instance);
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
if (instance.isEphemeral()) {
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
//添加心跳信息
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
...
構建心跳信息
public BeatInfo buildBeatInfo(String groupedServiceName, Instance instance) {
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName(groupedServiceName);
beatInfo.setIp(instance.getIp());
beatInfo.setPort(instance.getPort());
beatInfo.setCluster(instance.getClusterName());
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
return beatInfo;
}
//com.alibaba.nacos.api.naming.pojo.Instance#getInstanceHeartBeatInterval
//public static final long DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5);
public long getInstanceHeartBeatInterval() {
//HEART_BEAT_INTERVAL = "preserved.heart.beat.interval";
return getMetaDataByKeyWithDefault(PreservedMetadataKeys.HEART_BEAT_INTERVAL,
Constants.DEFAULT_HEART_BEAT_INTERVAL);
}
private long getMetaDataByKeyWithDefault(final String key, final long defaultValue) {
if (getMetadata() == null || getMetadata().isEmpty()) {
return defaultValue;
}
final String value = getMetadata().get(key);
if (!StringUtils.isEmpty(value) && value.matches(NUMBER_PATTERN)) {
return Long.parseLong(value);
}
return defaultValue;
}
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
BeatInfo existBeat;
//fix #1733
if ((existBeat = dom2Beat.remove(key)) != null) {
existBeat.setStopped(true);
}
dom2Beat.put(key, beatInfo);
//啟動
executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}
BeatTask是BeatReactor的內部類
class BeatTask implements Runnable {
BeatInfo beatInfo;
public BeatTask(BeatInfo beatInfo) {
this.beatInfo = beatInfo;
}
@Override
public void run() {
if (beatInfo.isStopped()) {
return;
}
long nextTime = beatInfo.getPeriod();
try {
JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
long interval = result.get(CLIENT_BEAT_INTERVAL_FIELD).asLong();
boolean lightBeatEnabled = false;
if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
}
BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
if (interval > 0) {
nextTime = interval;
}
int code = NamingResponseCode.OK;
if (result.has(CommonParams.CODE)) {
code = result.get(CommonParams.CODE).asInt();
}
if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
Instance instance = new Instance();
instance.setPort(beatInfo.getPort());
instance.setIp(beatInfo.getIp());
instance.setWeight(beatInfo.getWeight());
instance.setMetadata(beatInfo.getMetadata());
instance.setClusterName(beatInfo.getCluster());
instance.setServiceName(beatInfo.getServiceName());
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(true);
try {
serverProxy.registerService(beatInfo.getServiceName(),
NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
} catch (Exception ignore) {
}
}
} catch (NacosException ex) {
NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());
} catch (Exception unknownEx) {
NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, unknown exception msg: {}",
JacksonUtils.toJson(beatInfo), unknownEx.getMessage(), unknownEx);
} finally {
executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
}
}
發起請求
//com.alibaba.nacos.client.naming.remote.http.NamingHttpClientProxy#sendBeat
public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());
}
Map<String, String> params = new HashMap<String, String>(8);
Map<String, String> bodyMap = new HashMap<String, String>(2);
if (!lightBeatEnabled) {
bodyMap.put("beat", JacksonUtils.toJson(beatInfo));
}
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());
params.put(IP_PARAM, beatInfo.getIp());
params.put(PORT_PARAM, String.valueOf(beatInfo.getPort()));
//reqApi和上篇的一樣
String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);
return JacksonUtils.toObj(result);
}
4.2.2、PushReceiver 接收服務端的推送消息
public PushReceiver(ServiceInfoHolder serviceInfoHolder) {
try {
this.serviceInfoHolder = serviceInfoHolder;
//獲取udp的端口
String udpPort = getPushReceiverUdpPort();
//創建udpSocket
if (StringUtils.isEmpty(udpPort)) {
this.udpSocket = new DatagramSocket();
} else {
this.udpSocket = new DatagramSocket(new InetSocketAddress(Integer.parseInt(udpPort)));
}
//構建線程池
this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.push.receiver");
return thread;
}
});
//執行
this.executorService.execute(this);
} catch (Exception e) {
NAMING_LOGGER.error("[NA] init udp socket failed", e);
}
}
PushReceiver 實現了runnable接口
//com.alibaba.nacos.client.naming.core.PushReceiver#run
public void run() {
while (!closed) {
try {
// byte[] is initialized with 0 full filled by default
byte[] buffer = new byte[UDP_MSS];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
//接收udp數據
udpSocket.receive(packet);
//將接收的udp數據轉為json
String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();
NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
//將接收的udp數據轉為PushPacket
PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);
String ack;
//判斷 pushPacket.type 處理 并構建ack
if (PUSH_PACKAGE_TYPE_DOM.equals(pushPacket.type) || PUSH_PACKAGE_TYPE_SERVICE.equals(pushPacket.type)) {
//dom或者service,處理請求,處理服務實例變更
serviceInfoHolder.processServiceInfo(pushPacket.data);
//返回ack,無實例數據
// send ack to server
ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":"
+ "\"\"}";
} else if (PUSH_PACKAGE_TYPE_DUMP.equals(pushPacket.type)) {
//dump請求,將本實例保存的服務實例信息返回
// dump data to server
ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":"
+ "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(serviceInfoHolder.getServiceInfoMap()))
+ "\"}";
} else {
//返回ack,無實例數據
// do nothing send ack only
ack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
+ "\", \"data\":" + "\"\"}";
}
//給服務端發送響應
udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length,
packet.getSocketAddress()));
} catch (Exception e) {
if (closed) {
return;
}
NAMING_LOGGER.error("[NA] error while receiving push data", e);
}
}
}
public ServiceInfo processServiceInfo(String json) {
ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class);
serviceInfo.setJsonFromServer(json);
//具體處理服務實例變更,下一篇中有具體邏輯
return processServiceInfo(serviceInfo);
}
5、關于Spring Cloud Nacos一些bean的注冊順序
HealthIndicator
NacosWatch
NacosAutoServiceRegistration
所以在注冊NacosWatch和NacosAutoServiceRegistration的時候,HealthIndicator已經把NamingService給初始化好了。
原文鏈接:https://blog.csdn.net/xuwenjingrenca/article/details/125083737
相關推薦
- 2023-11-22 Docker常用命令、指令
- 2022-08-01 Win?Server2016遠程桌面如何允許多用戶同時登錄_win服務器
- 2023-03-18 go?sync.Map基本原理深入解析_Golang
- 2023-03-22 Python裝飾器使用方法全面梳理_python
- 2022-07-21 C#?PDF轉圖片(JPG,Png)的項目實踐_C#教程
- 2023-07-13 react父子組件,任意組件傳值
- 2022-09-07 Redis?sentinel哨兵集群的實現步驟_Redis
- 2022-08-31 C++淺析引用的定義與使用_C 語言
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支