網(wǎng)站首頁(yè) 編程語(yǔ)言 正文
【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nacos服務(wù)端數(shù)據(jù)結(jié)構(gòu)和模型
作者:Evan_L 更新時(shí)間: 2024-07-18 編程語(yǔ)言前言
上一文中,我們從官方的圖示了解到Nacos的服務(wù)數(shù)據(jù)結(jié)構(gòu)。但我關(guān)心的是,Nacos2.x不是重構(gòu)了嗎?怎么還是這種數(shù)據(jù)結(jié)構(gòu)?我推測(cè),必然是為了對(duì)Nacos1.x的兼容,實(shí)際存儲(chǔ)應(yīng)該不是這樣的。于是,沿著這個(gè)問題出發(fā)我們一起來(lái)翻一下源碼。
從NamingService的使用開始
在扎入源碼之前,我們需要回憶一下,我們是怎么使用Nacos的?
- 構(gòu)建NamingService
NamingService serviceRegistry = NacosFactory.createNamingService(properties);
實(shí)際上,這個(gè)動(dòng)作的背后,意味著我們連接了Nacos服務(wù)端。 - 注冊(cè)服務(wù)
serviceRegistry.registerInstance(serviceName, groupName, instance);
- 查詢服務(wù)
serviceRegistry.getAllInstances(serviceName, groupName, List.of(clusterName));
因此,我們就沿著這幾個(gè)操作,摸一摸源碼。
?。?!高能警告?。?!
沒有耐心看源碼的同學(xué),可以直接翻到總結(jié),直接看結(jié)論。
構(gòu)建NamingService
客戶端
// com.alibaba.nacos.client.naming.NacosNamingService
/**
* 初始化方法
* <p>由NacosNamingService構(gòu)造器調(diào)用,用于初始NamingService</p>
*/
private void init(Properties properties) throws NacosException {
final NacosClientProperties nacosClientProperties = NacosClientProperties.PROTOTYPE.derive(properties);
// 省略...
// 創(chuàng)建客戶端
this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, nacosClientProperties, changeNotifier);
}
// com.alibaba.nacos.client.naming.remote.NamingClientProxyDelegate
/**
* NamingClientProxyDelegate構(gòu)造器
*/
public NamingClientProxyDelegate(String namespace, ServiceInfoHolder serviceInfoHolder, NacosClientProperties properties,
InstancesChangeNotifier changeNotifier) throws NacosException {
// 省略...
// 初始化了兩個(gè)客戶端,一個(gè)是Http,另一個(gè)是Grpc。不過,在注冊(cè)實(shí)例時(shí),如果該實(shí)例為臨時(shí)實(shí)例,則使用Grpc,因此我們重點(diǎn)關(guān)注Grpc
this.httpClientProxy = new NamingHttpClientProxy(namespace, securityProxy, serverListManager, properties);
this.grpcClientProxy = new NamingGrpcClientProxy(namespace, securityProxy, serverListManager, properties,
serviceInfoHolder);
}
// com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy
/**
* NamingGrpcClientProxy構(gòu)造器
*/
public NamingGrpcClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListFactory serverListFactory,
NacosClientProperties properties, ServiceInfoHolder serviceInfoHolder) throws NacosException {
// 省略...
// 創(chuàng)建RPC客戶端
this.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels);
this.redoService = new NamingGrpcRedoService(this);
// 啟動(dòng)客戶端
start(serverListFactory, serviceInfoHolder);
}
// com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy
private void start(ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException {
rpcClient.serverListFactory(serverListFactory);
rpcClient.registerConnectionListener(redoService);
rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder));
// 啟動(dòng)客戶端
rpcClient.start();
NotifyCenter.registerSubscriber(this);
}
// com.alibaba.nacos.common.remote.client.RpcClient#start
/**
* 啟動(dòng)客戶端
*/
public final void start() throws NacosException {
// 控制只啟動(dòng)一次
boolean success = rpcClientStatus.compareAndSet(RpcClientStatus.INITIALIZED, RpcClientStatus.STARTING);
if (!success) {
return;
}
// 創(chuàng)建一個(gè)只有2個(gè)線程的定時(shí)任務(wù)線程池
clientEventExecutor = new ScheduledThreadPoolExecutor(2, r -> {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.remote.worker");
t.setDaemon(true);
return t;
});
// 提交-處理連接事件的Task
clientEventExecutor.submit(() -> {
while (!clientEventExecutor.isTerminated() && !clientEventExecutor.isShutdown()) {
ConnectionEvent take;
take = eventLinkedBlockingQueue.take();
if (take.isConnected()) {
notifyConnected();
} else if (take.isDisConnected()) {
notifyDisConnected();
}
}
});
// 提交-心跳任務(wù)
clientEventExecutor.submit(() -> {
while (true) {
// 由于這里有一大堆邏輯,省略。
// 1. 超過時(shí)間間隔,發(fā)起心跳請(qǐng)求
// 1.1 心跳請(qǐng)求失敗,記錄當(dāng)前狀態(tài)為不健康,并記錄上下文。
// 1.2 檢查當(dāng)前配置的推薦的Nacos服務(wù)器是否在服務(wù)器列表中。在,則嘗試重新連接推薦的服務(wù)器。
});
// connect to server, try to connect to server sync retryTimes times, async starting if failed.
// 連接服務(wù)端,嘗試retryTimes次,同步地連接服務(wù)端,如果依然失敗,則改為異步連接。
Connection connectToServer = null;
rpcClientStatus.set(RpcClientStatus.STARTING);
int startUpRetryTimes = rpcClientConfig.retryTimes();
while (startUpRetryTimes > 0 && connectToServer == null) {
try {
startUpRetryTimes--;
ServerInfo serverInfo = nextRpcServer();
// 連接服務(wù)器
connectToServer = connectToServer(serverInfo);
} catch (Throwable e) {
LoggerUtils.printIfWarnEnabled(LOGGER,
"[{}] Fail to connect to server on start up, error message = {}, start up retry times left: {}",
rpcClientConfig.name(), e.getMessage(), startUpRetryTimes, e);
}
}
if (connectToServer != null) {
this.currentConnection = connectToServer;
rpcClientStatus.set(RpcClientStatus.RUNNING);
eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED));
} else {
switchServerAsync();
}
registerServerRequestHandler(new ConnectResetRequestHandler());
// register client detection request.
// 注冊(cè)客戶端檢測(cè)請(qǐng)求處理器,用于響應(yīng)服務(wù)端的探測(cè)
registerServerRequestHandler(request -> {
if (request instanceof ClientDetectionRequest) {
return new ClientDetectionResponse();
}
return null;
});
}
服務(wù)端-處理連接請(qǐng)求
服務(wù)端的源碼首先我們得找到GrpcServer
@Override
public void startServer() throws Exception {
// 1. 創(chuàng)建請(qǐng)求處理器注冊(cè)器
final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry();
// 2. 注冊(cè)請(qǐng)求處理器,并封裝攔截器器。封裝后,有點(diǎn)類似于SpringMVC的HandlerAdapter
addServices(handlerRegistry, new GrpcConnectionInterceptor());
NettyServerBuilder builder = NettyServerBuilder.forPort(getServicePort()).executor(getRpcExecutor());
// 省略
server = builder.maxInboundMessageSize(getMaxInboundMessageSize()).fallbackHandlerRegistry(handlerRegistry)
.compressorRegistry(CompressorRegistry.getDefaultInstance())
.decompressorRegistry(DecompressorRegistry.getDefaultInstance())
.addTransportFilter(new AddressTransportFilter(connectionManager))
.keepAliveTime(getKeepAliveTime(), TimeUnit.MILLISECONDS)
.keepAliveTimeout(getKeepAliveTimeout(), TimeUnit.MILLISECONDS)
.permitKeepAliveTime(getPermitKeepAliveTime(), TimeUnit.MILLISECONDS)
.build();
// 啟動(dòng)服務(wù)
server.start();
}
private void addServices(MutableHandlerRegistry handlerRegistry, ServerInterceptor... serverInterceptor) {
// unary common call register.
// 通用調(diào)用注冊(cè)
final MethodDescriptor<Payload, Payload> unaryPayloadMethod = MethodDescriptor.<Payload, Payload>newBuilder()
.setType(MethodDescriptor.MethodType.UNARY)
.setFullMethodName(MethodDescriptor.generateFullMethodName(GrpcServerConstants.REQUEST_SERVICE_NAME,
GrpcServerConstants.REQUEST_METHOD_NAME))
.setRequestMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build();
// 定義服務(wù)器調(diào)用處理器。核心處理邏輯可就在這lambda表達(dá)式定義的匿名內(nèi)部類里了。也只有一個(gè)方法:
// grpcCommonRequestAcceptor.request(request, responseObserver)
final ServerCallHandler<Payload, Payload> payloadHandler = ServerCalls
.asyncUnaryCall((request, responseObserver) -> grpcCommonRequestAcceptor.request(request, responseObserver));
final ServerServiceDefinition serviceDefOfUnaryPayload = ServerServiceDefinition.builder(
GrpcServerConstants.REQUEST_SERVICE_NAME)
.addMethod(unaryPayloadMethod, payloadHandler).build();
handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfUnaryPayload, serverInterceptor));
// bi stream register.
// bi流式調(diào)用服務(wù),主要是連接請(qǐng)求、連接斷開
// 核心處理邏輯:
// grpcBiStreamRequestAcceptor.requestBiStream(responseObserver)
final ServerCallHandler<Payload, Payload> biStreamHandler = ServerCalls.asyncBidiStreamingCall(
(responseObserver) -> grpcBiStreamRequestAcceptor.requestBiStream(responseObserver));
final MethodDescriptor<Payload, Payload> biStreamMethod = MethodDescriptor.<Payload, Payload>newBuilder()
.setType(MethodDescriptor.MethodType.BIDI_STREAMING).setFullMethodName(MethodDescriptor
.generateFullMethodName(GrpcServerConstants.REQUEST_BI_STREAM_SERVICE_NAME,
GrpcServerConstants.REQUEST_BI_STREAM_METHOD_NAME))
.setRequestMarshaller(ProtoUtils.marshaller(Payload.newBuilder().build()))
.setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build();
final ServerServiceDefinition serviceDefOfBiStream = ServerServiceDefinition
.builder(GrpcServerConstants.REQUEST_BI_STREAM_SERVICE_NAME).addMethod(biStreamMethod, biStreamHandler).build();
handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfBiStream, serverInterceptor));
}
處理連接請(qǐng)求:
// com.alibaba.nacos.core.remote.grpc.GrpcBiStreamRequestAcceptor
@Override
public StreamObserver<Payload> requestBiStream(StreamObserver<Payload> responseObserver) {
StreamObserver<Payload> streamObserver = new StreamObserver<Payload>() {
final String connectionId = GrpcServerConstants.CONTEXT_KEY_CONN_ID.get();
final Integer localPort = GrpcServerConstants.CONTEXT_KEY_CONN_LOCAL_PORT.get();
final int remotePort = GrpcServerConstants.CONTEXT_KEY_CONN_REMOTE_PORT.get();
String remoteIp = GrpcServerConstants.CONTEXT_KEY_CONN_REMOTE_IP.get();
String clientIp = "";
@Override
public void onNext(Payload payload) {
// 處理連接請(qǐng)求
clientIp = payload.getMetadata().getClientIp();
traceDetailIfNecessary(payload);
Object parseObj;
// 省略...
// 檢查
if (parseObj instanceof ConnectionSetupRequest) {
ConnectionSetupRequest setUpRequest = (ConnectionSetupRequest) parseObj;
// 設(shè)置label,省略
// 構(gòu)建Connection
ConnectionMeta metaInfo = new ConnectionMeta(connectionId, payload.getMetadata().getClientIp(),
remoteIp, remotePort, localPort, ConnectionType.GRPC.getType(),
setUpRequest.getClientVersion(), appName, setUpRequest.getLabels());
metaInfo.setTenant(setUpRequest.getTenant());
// 第三個(gè)參數(shù)Channel,是發(fā)生網(wǎng)路數(shù)據(jù)的關(guān)鍵
Connection connection = new GrpcConnection(metaInfo, responseObserver, GrpcServerConstants.CONTEXT_KEY_CHANNEL.get());
connection.setAbilities(setUpRequest.getAbilities());
boolean rejectSdkOnStarting = metaInfo.isSdkSource() && !ApplicationUtils.isStarted();
// 注冊(cè)連接, 重點(diǎn)在 “或” 條件上
// connectionManager.register
if (rejectSdkOnStarting || !connectionManager.register(connectionId, connection)) {
//Not register to the connection manager if current server is over limit or server is starting.
// 如果當(dāng)前服務(wù)器已超限制,或者服務(wù)器還在啟動(dòng)過程中,則注冊(cè)失敗。
connection.request(new ConnectResetRequest(), 3000L);
connection.close();
}
}
// 省略。。。
}
// 省略。。。
};
return streamObserver;
}
這里出現(xiàn)了我們接觸到的第一個(gè)概念:Connection-連接,他有個(gè)屬性ConnectionMeta,記錄連接相關(guān)的信息。當(dāng)需要發(fā)起請(qǐng)求時(shí),他會(huì)將這些信息設(shè)置到Request中,然后通過GrpcUtils轉(zhuǎn)換成Payload發(fā)出請(qǐng)求
繼續(xù)看com.alibaba.nacos.core.remote.ConnectionManager#register
public synchronized boolean register(String connectionId, Connection connection) {
if (connection.isConnected()) {
String clientIp = connection.getMetaInfo().clientIp;
// 省略入?yún)z查
// 注冊(cè)客戶端
connections.put(connectionId, connection);
// 登記客戶端IP
if (!connectionForClientIp.containsKey(clientIp)) {
connectionForClientIp.put(clientIp, new AtomicInteger(0));
}
connectionForClientIp.get(clientIp).getAndIncrement();
// 通知客戶端連接Listener
clientConnectionEventListenerRegistry.notifyClientConnected(connection);
return true;
}
return false;
}
此處出現(xiàn)第一個(gè)Manager:ConnectionManager。用來(lái)管理所有客戶端的連接。登記連接后,調(diào)用了所有的Listener的clientConnected方法。其中,有個(gè)ConnectionBasedClientManager,看名字就知道,可能是負(fù)責(zé)管理客戶端的。
// > ConnectionBasedClientManager#clientConnected(com.alibaba.nacos.core.remote.Connection)
// > ConnectionBasedClientManager#clientConnected(java.lang.String, com.alibaba.nacos.naming.core.v2.client.ClientAttributes)
// ConnectionBasedClientManager
@Override
public boolean clientConnected(String clientId, ClientAttributes attributes) {
String type = attributes.getClientAttribute(ClientConstants.CONNECTION_TYPE);
ClientFactory clientFactory = ClientFactoryHolder.getInstance().findClientFactory(type);
// 通過ClientFactory創(chuàng)建客戶端
// 從以上的兩行代碼,我們通過ClientConstants.CONNECTION_TYPE就知道工廠是ConnectionBasedClientFactory,對(duì)應(yīng)的客戶端自然是ConnectionBasedClient
return clientConnected(clientFactory.newClient(clientId, attributes));
}
@Override
public boolean clientConnected(final Client client) {
// 登記客戶端
clients.computeIfAbsent(client.getClientId(), s -> {
return (ConnectionBasedClient) client;
});
return true;
}
至此,我們又發(fā)現(xiàn)一個(gè)新概念:Client-客戶端。由Grpc連接的客戶端,都由ConnectionBasedClientManager進(jìn)行管理。
小結(jié)
概念 | 類 | 管理者 |
---|---|---|
連接 | com.alibaba.nacos.core.remote.Connection | ConnectionManager |
客戶端 | com.alibaba.nacos.naming.core.v2.client.Client | ClientManager |
注冊(cè)實(shí)例
客戶端
我們重點(diǎn)看看
public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {
// 創(chuàng)建請(qǐng)求。每個(gè)Request在Nacos服務(wù)端都由對(duì)應(yīng)的Handler
InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
NamingRemoteConstants.REGISTER_INSTANCE, instance);
requestToServer(request, Response.class);
redoService.instanceRegistered(serviceName, groupName);
}
服務(wù)端
我們前面說(shuō)服務(wù)端啟動(dòng)時(shí),說(shuō)這個(gè)是負(fù)責(zé)處理通用請(qǐng)求的:
final ServerCallHandler<Payload, Payload> payloadHandler = ServerCalls.asyncUnaryCall((request, responseObserver) -> grpcCommonRequestAcceptor.request(request, responseObserver));
我們就順著往下看
// com.alibaba.nacos.core.remote.grpc.GrpcRequestAcceptor#request
@Override
public void request(Payload grpcRequest, StreamObserver<Payload> responseObserver) {
String type = grpcRequest.getMetadata().getType();
// 省略如下內(nèi)容:
// 檢查服務(wù)是否已啟動(dòng)
// 如果是客戶端對(duì)服務(wù)端的健康檢查,則直接響應(yīng)
// ----------------------------
// 從對(duì)應(yīng)的請(qǐng)求處理器
RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(type);
// 省略:no handler found. 的異常處理
// ----------------------------
String connectionId = GrpcServerConstants.CONTEXT_KEY_CONN_ID.get();
// 省略:檢查連接是否正常.
Object parseObj = null;
parseObj = GrpcUtils.parse(grpcRequest);
// 省略:轉(zhuǎn)換異常、無(wú)效請(qǐng)求異常
Request request = (Request) parseObj;
// 從ConnectionManager獲取到對(duì)應(yīng)的Connection
Connection connection = connectionManager.getConnection(GrpcServerConstants.CONTEXT_KEY_CONN_ID.get());
// 組裝RequestMeta
RequestMeta requestMeta = new RequestMeta();
requestMeta.setClientIp(connection.getMetaInfo().getClientIp());
requestMeta.setConnectionId(GrpcServerConstants.CONTEXT_KEY_CONN_ID.get());
requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
requestMeta.setLabels(connection.getMetaInfo().getLabels());
connectionManager.refreshActiveTime(requestMeta.getConnectionId());
// 調(diào)用requestHandler處理請(qǐng)求
Response response = requestHandler.handleRequest(request, requestMeta);
Payload payloadResponse = GrpcUtils.convert(response);
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
}
這些便是通用請(qǐng)求處理的核心邏輯。現(xiàn)在我們便來(lái)看InstanceRequest的處理com.alibaba.nacos.naming.remote.rpc.handler.InstanceRequestHandler
@Override
@Secured(action = ActionTypes.WRITE)
public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {
Service service = Service
.newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
switch (request.getType()) {
case NamingRemoteConstants.REGISTER_INSTANCE:
return registerInstance(service, request, meta);
case NamingRemoteConstants.DE_REGISTER_INSTANCE:
return deregisterInstance(service, request, meta);
default:
throw new NacosException
}
}
private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta)
throws NacosException {
// 1. 注冊(cè)實(shí)例
clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());
// 2. 發(fā)布事件:RegisterInstanceTraceEvent
NotifyCenter.publishEvent(new RegisterInstanceTraceEvent(System.currentTimeMillis(),
meta.getClientIp(), true, service.getNamespace(), service.getGroup(), service.getName(),
request.getInstance().getIp(), request.getInstance().getPort()));
return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);
}
// 注冊(cè)實(shí)例:
// com.alibaba.nacos.naming.core.v2.service.impl.EphemeralClientOperationServiceImpl#registerInstance
@Override
public void registerInstance(Service service, Instance instance, String clientId) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
// 從ServiceManager獲取已注冊(cè)服務(wù)。而我們當(dāng)前是要注冊(cè)實(shí)例,所以,這個(gè)方法肯定還內(nèi)含玄機(jī)
Service singleton = ServiceManager.getInstance().getSingleton(service);
// 省略:如果獲取到的是持久化實(shí)例,意味著當(dāng)前注冊(cè)臨時(shí)實(shí)例沖突了,返回異常。
Client client = clientManager.getClient(clientId);
InstancePublishInfo instanceInfo = getPublishInfo(instance);
// 記錄當(dāng)前客戶端發(fā)布的實(shí)例
client.addServiceInstance(singleton, instanceInfo);
client.setLastUpdatedTime();
client.recalculateRevision();
// 發(fā)布服務(wù)注冊(cè)事件
NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
NotifyCenter
.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
}
// com.alibaba.nacos.naming.core.v2.ServiceManager
/**
* Get singleton service. Put to manager if no singleton.
* 獲取單例服務(wù)(單例意味著整個(gè)應(yīng)用只有一個(gè)對(duì)象),如果不存在,則注冊(cè)到Manager
*/
public Service getSingleton(Service service) {
// 如果不存在就注冊(cè)
singletonRepository.computeIfAbsent(service, key -> {
// 發(fā)布服務(wù)元信息數(shù)據(jù)事件。不過該事件對(duì)于持久實(shí)例才有用處。
NotifyCenter.publishEvent(new MetadataEvent.ServiceMetadataEvent(service, false));
return service;
});
Service result = singletonRepository.get(service);
// 將服務(wù)登記到namespace
namespaceSingletonMaps.computeIfAbsent(result.getNamespace(), namespace -> new ConcurrentHashSet<>());
namespaceSingletonMaps.get(result.getNamespace()).add(result);
return result;
}
// 再看看ClientOperationEvent.ClientRegisterServiceEvent
// > com.alibaba.nacos.naming.core.v2.index.ClientServiceIndexesManager#onEvent
// > com.alibaba.nacos.naming.core.v2.index.ClientServiceIndexesManager#handleClientOperation
// > com.alibaba.nacos.naming.core.v2.index.ClientServiceIndexesManager#addPublisherIndexes
// 登記發(fā)布服務(wù)的客戶端
private void addPublisherIndexes(Service service, String clientId) {
publisherIndexes.computeIfAbsent(service, key -> new ConcurrentHashSet<>());
publisherIndexes.get(service).add(clientId);
NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
}
小結(jié)
我們總結(jié)一下,以上涉及到的概念。
概念 | 類 | 管理者 | 描述 |
---|---|---|---|
服務(wù) | com.alibaba.nacos.naming.core.v2.pojo.Service | ServiceManager |
除了這個(gè)概念,實(shí)際上我們還看到Client的內(nèi)部結(jié)構(gòu):
AbstractClient:
- 記錄客戶端發(fā)布的服務(wù):ConcurrentHashMap<Service, InstancePublishInfo> publishers
- 記錄客戶端訂閱的服務(wù):ConcurrentHashMap<Service, Subscriber> subscribers
這個(gè)點(diǎn)其實(shí)要到訂閱服務(wù)請(qǐng)求才會(huì)分析到,但為了信息不會(huì)太分散,所以就放到一起了。
ClientServiceIndexesManager
- 客戶端索引管理者。這里的索引指的是,通過Service快速找到客戶端,只是客戶端有ClientManager,如果這里再存一份也不合適,不利于數(shù)據(jù)維護(hù)。因此這里存的是clientId。估計(jì)也是如此,他才叫客戶端索引管理者。
查詢和訂閱實(shí)例
> com.alibaba.nacos.client.naming.NacosNamingService#getAllInstances(java.lang.String, java.lang.String, java.util.List<java.lang.String>)
> com.alibaba.nacos.client.naming.NacosNamingService#getAllInstances(java.lang.String, java.lang.String, java.util.List<java.lang.String>, boolean)
> com.alibaba.nacos.client.naming.NacosNamingService#getAllInstances(String serviceName, String groupName, List<String> clusters, boolean subscribe)
> com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy#subscribe
> com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy#doSubscribe
public ServiceInfo doSubscribe(String serviceName, String groupName, String clusters) throws NacosException {
// 重點(diǎn)SubscribeServiceRequest,看服務(wù)端代碼需要知道是什么請(qǐng)求
SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, groupName, serviceName, clusters, true);
SubscribeServiceResponse response = requestToServer(request, SubscribeServiceResponse.class);
redoService.subscriberRegistered(serviceName, groupName, clusters);
return response.getServiceInfo();
}
服務(wù)端
// com.alibaba.nacos.naming.remote.rpc.handler.SubscribeServiceRequestHandler
@Override
@Secured(action = ActionTypes.READ)
public SubscribeServiceResponse handle(SubscribeServiceRequest request, RequestMeta meta) throws NacosException {
String namespaceId = request.getNamespace();
String serviceName = request.getServiceName();
String groupName = request.getGroupName();
String app = request.getHeader("app", "unknown");
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
Service service = Service.newService(namespaceId, groupName, serviceName, true);
// 訂閱者
Subscriber subscriber = new Subscriber(meta.getClientIp(), meta.getClientVersion(), app, meta.getClientIp(),
namespaceId, groupedServiceName, 0, request.getClusters());
// 服務(wù)信息,這里有幾個(gè)參數(shù)是需要通過方法來(lái)獲取的
// serviceStorage.getData(service),負(fù)責(zé)獲取實(shí)例信息。會(huì)從ClientManage收集
// ServiceUtil.selectInstancesWithHealthyProtection,肩負(fù)著過濾cluster、健康實(shí)例,并執(zhí)行自動(dòng)保護(hù)機(jī)制。
ServiceInfo serviceInfo = ServiceUtil.selectInstancesWithHealthyProtection(serviceStorage.getData(service),
metadataManager.getServiceMetadata(service).orElse(null), subscriber.getCluster(), false,
true, subscriber.getIp());
if (request.isSubscribe()) {
// 訂閱服務(wù)
clientOperationService.subscribeService(service, subscriber, meta.getConnectionId());
// 發(fā)布訂閱事件
NotifyCenter.publishEvent(new SubscribeServiceTraceEvent(System.currentTimeMillis(),
meta.getClientIp(), service.getNamespace(), service.getGroup(), service.getName()));
} else {
// 取消訂閱
clientOperationService.unsubscribeService(service, subscriber, meta.getConnectionId());
NotifyCenter.publishEvent(new UnsubscribeServiceTraceEvent(System.currentTimeMillis(),
meta.getClientIp(), service.getNamespace(), service.getGroup(), service.getName()));
}
return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);
}
// > com.alibaba.nacos.naming.core.v2.index.ServiceStorage#getData
// > com.alibaba.nacos.naming.core.v2.index.ServiceStorage#getPushData
// > com.alibaba.nacos.naming.core.v2.index.ServiceStorage#getAllInstancesFromIndex
// > com.alibaba.nacos.naming.utils.ServiceUtil#selectInstancesWithHealthyProtection(com.alibaba.nacos.api.naming.pojo.ServiceInfo, com.alibaba.nacos.naming.core.v2.metadata.ServiceMetadata, java.lang.String, boolean, boolean, java.lang.String)
// > com.alibaba.nacos.naming.utils.ServiceUtil#doSelectInstances
// 上面是調(diào)用路徑,這里把生產(chǎn)數(shù)據(jù)的方法重點(diǎn)撈出來(lái)
// ServiceStorage的數(shù)據(jù)生產(chǎn)
public ServiceInfo getPushData(Service service) {
ServiceInfo result = emptyServiceInfo(service);
if (!ServiceManager.getInstance().containSingleton(service)) {
return result;
}
Service singleton = ServiceManager.getInstance().getSingleton(service);
result.setHosts(getAllInstancesFromIndex(singleton));
// 從ServiceManager拿到服務(wù)的實(shí)例信息,并登記到ServiceStorage#serviceDataIndexes中
serviceDataIndexes.put(singleton, result);
return result;
}
private List<Instance> getAllInstancesFromIndex(Service service) {
Set<Instance> result = new HashSet<>();
Set<String> clusters = new HashSet<>();
for (String each : serviceIndexesManager.getAllClientsRegisteredService(service)) {
// 獲取實(shí)例信息
Optional<InstancePublishInfo> instancePublishInfo = getInstanceInfo(each, service);
if (instancePublishInfo.isPresent()) {
InstancePublishInfo publishInfo = instancePublishInfo.get();
//If it is a BatchInstancePublishInfo type, it will be processed manually and added to the instance list
if (publishInfo instanceof BatchInstancePublishInfo) {
BatchInstancePublishInfo batchInstancePublishInfo = (BatchInstancePublishInfo) publishInfo;
List<Instance> batchInstance = parseBatchInstance(service, batchInstancePublishInfo, clusters);
result.addAll(batchInstance);
} else {
Instance instance = parseInstance(service, instancePublishInfo.get());
result.add(instance);
clusters.add(instance.getClusterName());
}
}
}
// cache clusters of this service
// 緩存集群信息
serviceClusterIndex.put(service, clusters);
return new LinkedList<>(result);
}
private Optional<InstancePublishInfo> getInstanceInfo(String clientId, Service service) {
// 通過客戶端ID,獲取到Client,進(jìn)而從其獲取客戶端發(fā)布的服務(wù)。
Client client = clientManager.getClient(clientId);
if (null == client) {
return Optional.empty();
}
return Optional.ofNullable(client.getInstancePublishInfo(service));
}
從查詢實(shí)例這里,我們看到有個(gè)數(shù)據(jù)存儲(chǔ):ServiceStorage。重要的是,這個(gè)雖然叫存儲(chǔ),但是實(shí)際上里面的數(shù)據(jù)卻是從別處獲取來(lái)的。來(lái)源于:ServiceManager、ServiceIndexesManager、ClientManager。從這個(gè)角度說(shuō),更像是個(gè)緩存。
總結(jié)
上面的整了一堆源代碼,容易看煩了。感興趣的,可以根據(jù)上面的源碼深入看看。為了方便大家,我畫了圖給大家:
為了讓大家重點(diǎn)看到數(shù)據(jù)生產(chǎn)過程:
從圖中,我們可以看到,nacos2.x的數(shù)據(jù)結(jié)構(gòu)并不像官方的Service->Group->Instance。而是按照Connection、Client、Service分別通過對(duì)應(yīng)的管理器進(jìn)行管理。此外,為了避免數(shù)據(jù)多處存儲(chǔ),還有ClientServiceIndexesManager作為Client和Service的橋梁。
除此之外,還有ServiceStorage,作為數(shù)據(jù)緩存。不過,當(dāng)我們深入了解ServiceStorage時(shí),會(huì)發(fā)現(xiàn)他的數(shù)據(jù)一致性/數(shù)據(jù)的更新,是在給訂閱服務(wù)的客戶端定時(shí)推送時(shí)通過調(diào)用com.alibaba.nacos.naming.core.v2.index.ServiceStorage#getPushData
來(lái)實(shí)現(xiàn)的。個(gè)人認(rèn)為這是有可以優(yōu)化空間的,他們完全可以通過各種事件來(lái)監(jiān)聽實(shí)例的生死來(lái)更新數(shù)據(jù)。
總而言之,如果不算ServiceStorage這個(gè)緩存,那么數(shù)據(jù)主要存在于一下的Manager中:
ConnectionManager、ClientManager、ServiceManager、ClientServiceIndexesManager。
到這里,可能有同學(xué)就有疑問了。那么Group、Cluster這些概念去哪了呢?
這些概念都成為了屬性/字段了。com.alibaba.nacos.naming.core.v2.pojo.Service#group
com.alibaba.nacos.api.naming.pojo.Instance#clusterName
即使在ServiceStorage封裝ServiceInfo時(shí),他們也是作為屬性來(lái)存儲(chǔ)的。通過ServiceUtil來(lái)過濾目標(biāo)實(shí)例。
最后,提醒大家一下,我們這里只是分析了臨時(shí)實(shí)例。是最常用的場(chǎng)景。當(dāng)然,如果我們用Nacos的持久實(shí)例,SpringCloud也就自然支持了持久實(shí)例。不過,咱們不深究了,感興趣的同學(xué),可以順著往下挖一挖持久實(shí)例。
后記
這種深度刨析源碼、深挖一個(gè)技術(shù)細(xì)節(jié)的實(shí)現(xiàn),太費(fèi)時(shí)間、也太費(fèi)篇幅了。我自己都感覺差點(diǎn)把整個(gè)nacos的源碼都搬上來(lái)了。莫見怪。。。
關(guān)于nacos的一致性協(xié)議,就不在這里聊了,這個(gè)東西得單獨(dú)倒騰,還要與其他分布式中間件相互對(duì)比,還有理論。。
下次,咱們先往后聊OpenFeign。
推薦
Nacos的實(shí)現(xiàn)原理在官網(wǎng)有一電子書《Nacos架構(gòu)&原理》,想要了解頂層設(shè)計(jì)原理的同學(xué),建議看看。
原文鏈接:https://blog.csdn.net/Evan_L/article/details/132513335
- 上一篇:沒有了
- 下一篇:沒有了
相關(guān)推薦
- 2022-10-01 django中資源文件夾的引入及配置方法_python
- 2022-12-08 Anaconda中pkgs文件夾及如何清空PKGS_相關(guān)技巧
- 2022-06-18 C語(yǔ)言詳細(xì)講解#error與#line如何使用_C 語(yǔ)言
- 2022-12-01 C++中高性能內(nèi)存池的實(shí)現(xiàn)詳解_C 語(yǔ)言
- 2022-06-25 關(guān)于Ubuntu?Server?18.04?LTS?安裝Tomcat并配置systemctl管理To
- 2022-09-19 C++四種cast使用詳細(xì)介紹_C 語(yǔ)言
- 2022-04-06 go語(yǔ)言限制協(xié)程并發(fā)數(shù)的方案詳情_Golang
- 2022-04-23 二次開發(fā)element el-tooltip+span 超出文本部分顯示省略號(hào)鼠標(biāo)懸浮顯示全部?jī)?nèi)容
- 欄目分類
-
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細(xì)win安裝深度學(xué)習(xí)環(huán)境2025年最新版(
- Linux 中運(yùn)行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲(chǔ)小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎(chǔ)操作-- 運(yùn)算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認(rèn)證信息的處理
- Spring Security之認(rèn)證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權(quán)
- redisson分布式鎖中waittime的設(shè)
- maven:解決release錯(cuò)誤:Artif
- restTemplate使用總結(jié)
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實(shí)現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結(jié)構(gòu)-簡(jiǎn)單動(dòng)態(tài)字符串(SD
- arthas操作spring被代理目標(biāo)對(duì)象命令
- Spring中的單例模式應(yīng)用詳解
- 聊聊消息隊(duì)列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠(yuǎn)程分支