日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

【探索SpringCloud】服務發現-Nacos服務端數據結構和模型

作者:Evan_L 更新時間: 2024-07-18 編程語言

前言

上一文中,我們從官方的圖示了解到Nacos的服務數據結構。但我關心的是,Nacos2.x不是重構了嗎?怎么還是這種數據結構?我推測,必然是為了對Nacos1.x的兼容,實際存儲應該不是這樣的。于是,沿著這個問題出發我們一起來翻一下源碼。

從NamingService的使用開始

在扎入源碼之前,我們需要回憶一下,我們是怎么使用Nacos的?

  1. 構建NamingService
    NamingService serviceRegistry = NacosFactory.createNamingService(properties);
    實際上,這個動作的背后,意味著我們連接了Nacos服務端。
  2. 注冊服務
    serviceRegistry.registerInstance(serviceName, groupName, instance);
  3. 查詢服務
    serviceRegistry.getAllInstances(serviceName, groupName, List.of(clusterName));
    因此,我們就沿著這幾個操作,摸一摸源碼。

!!!高能警告!!!

沒有耐心看源碼的同學,可以直接翻到總結,直接看結論。

構建NamingService

客戶端

    // com.alibaba.nacos.client.naming.NacosNamingService
    /**
     * 初始化方法
     * <p>由NacosNamingService構造器調用,用于初始NamingService</p>
     */
    private void init(Properties properties) throws NacosException {
        final NacosClientProperties nacosClientProperties = NacosClientProperties.PROTOTYPE.derive(properties);
        // 省略...
        // 創建客戶端
        this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, nacosClientProperties, changeNotifier);
    }

    // com.alibaba.nacos.client.naming.remote.NamingClientProxyDelegate
    /**
     * NamingClientProxyDelegate構造器
     */    
    public NamingClientProxyDelegate(String namespace, ServiceInfoHolder serviceInfoHolder, NacosClientProperties properties,
        InstancesChangeNotifier changeNotifier) throws NacosException {
        // 省略...
        // 初始化了兩個客戶端,一個是Http,另一個是Grpc。不過,在注冊實例時,如果該實例為臨時實例,則使用Grpc,因此我們重點關注Grpc
        this.httpClientProxy = new NamingHttpClientProxy(namespace, securityProxy, serverListManager, properties);
        this.grpcClientProxy = new NamingGrpcClientProxy(namespace, securityProxy, serverListManager, properties,
                serviceInfoHolder);
    }
    
    // com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy
    /**
     * NamingGrpcClientProxy構造器
     */    
    public NamingGrpcClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListFactory serverListFactory,
            NacosClientProperties properties, ServiceInfoHolder serviceInfoHolder) throws NacosException {
        // 省略...
        // 創建RPC客戶端
        this.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels);
        this.redoService = new NamingGrpcRedoService(this);
        // 啟動客戶端
        start(serverListFactory, serviceInfoHolder);
    }
    // com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy
    private void start(ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException {
        rpcClient.serverListFactory(serverListFactory);
        rpcClient.registerConnectionListener(redoService);
        rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder));
        // 啟動客戶端
        rpcClient.start();
        NotifyCenter.registerSubscriber(this);
    }
    
    // com.alibaba.nacos.common.remote.client.RpcClient#start
    /**
     * 啟動客戶端
     */
    public final void start() throws NacosException {
        // 控制只啟動一次
        boolean success = rpcClientStatus.compareAndSet(RpcClientStatus.INITIALIZED, RpcClientStatus.STARTING);
        if (!success) {
            return;
        }
        // 創建一個只有2個線程的定時任務線程池
        clientEventExecutor = new ScheduledThreadPoolExecutor(2, r -> {
            Thread t = new Thread(r);
            t.setName("com.alibaba.nacos.client.remote.worker");
            t.setDaemon(true);
            return t;
        });
        
        // 提交-處理連接事件的Task
        clientEventExecutor.submit(() -> {
            while (!clientEventExecutor.isTerminated() && !clientEventExecutor.isShutdown()) {
                ConnectionEvent take;
                take = eventLinkedBlockingQueue.take();
                if (take.isConnected()) {
                    notifyConnected();
                } else if (take.isDisConnected()) {
                    notifyDisConnected();
                }
            }
        });
        // 提交-心跳任務
        clientEventExecutor.submit(() -> {
            while (true) {
            // 由于這里有一大堆邏輯,省略。
            // 1. 超過時間間隔,發起心跳請求
            // 1.1 心跳請求失敗,記錄當前狀態為不健康,并記錄上下文。
            // 1.2 檢查當前配置的推薦的Nacos服務器是否在服務器列表中。在,則嘗試重新連接推薦的服務器。
        });
        
        // connect to server, try to connect to server sync retryTimes times, async starting if failed.
        // 連接服務端,嘗試retryTimes次,同步地連接服務端,如果依然失敗,則改為異步連接。
        Connection connectToServer = null;
        rpcClientStatus.set(RpcClientStatus.STARTING);
        
        int startUpRetryTimes = rpcClientConfig.retryTimes();
        while (startUpRetryTimes > 0 && connectToServer == null) {
            try {
                startUpRetryTimes--;
                ServerInfo serverInfo = nextRpcServer();
                // 連接服務器
                connectToServer = connectToServer(serverInfo);
            } catch (Throwable e) {
                LoggerUtils.printIfWarnEnabled(LOGGER,
                        "[{}] Fail to connect to server on start up, error message = {}, start up retry times left: {}",
                        rpcClientConfig.name(), e.getMessage(), startUpRetryTimes, e);
            }
            
        }
        
        if (connectToServer != null) {
            this.currentConnection = connectToServer;
            rpcClientStatus.set(RpcClientStatus.RUNNING);
            eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED));
        } else {
            switchServerAsync();
        }
        
        registerServerRequestHandler(new ConnectResetRequestHandler());
        
        // register client detection request.
        // 注冊客戶端檢測請求處理器,用于響應服務端的探測
        registerServerRequestHandler(request -> {
            if (request instanceof ClientDetectionRequest) {
                return new ClientDetectionResponse();
            }
            
            return null;
        });
        
    }

服務端-處理連接請求

服務端的源碼首先我們得找到GrpcServer


    @Override
    public void startServer() throws Exception {
    	// 1. 創建請求處理器注冊器
        final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry();
        // 2. 注冊請求處理器,并封裝攔截器器。封裝后,有點類似于SpringMVC的HandlerAdapter
        addServices(handlerRegistry, new GrpcConnectionInterceptor());
        NettyServerBuilder builder = NettyServerBuilder.forPort(getServicePort()).executor(getRpcExecutor());
		// 省略
		
        server = builder.maxInboundMessageSize(getMaxInboundMessageSize()).fallbackHandlerRegistry(handlerRegistry)
                .compressorRegistry(CompressorRegistry.getDefaultInstance())
                .decompressorRegistry(DecompressorRegistry.getDefaultInstance())
                .addTransportFilter(new AddressTransportFilter(connectionManager))
                .keepAliveTime(getKeepAliveTime(), TimeUnit.MILLISECONDS)
                .keepAliveTimeout(getKeepAliveTimeout(), TimeUnit.MILLISECONDS)
                .permitKeepAliveTime(getPermitKeepAliveTime(), TimeUnit.MILLISECONDS)
                .build();
		// 啟動服務
        server.start();
    }


    private void addServices(MutableHandlerRegistry handlerRegistry, ServerInterceptor... serverInterceptor) {

        // unary common call register.
        // 通用調用注冊
        final MethodDescriptor<Payload, Payload> unaryPayloadMethod = MethodDescriptor.<Payload, Payload>newBuilder()
                .setType(MethodDescriptor.MethodType.UNARY)
                .setFullMethodName(MethodDescriptor.generateFullMethodName(GrpcServerConstants.REQUEST_SERVICE_NAME,
                        GrpcServerConstants.REQUEST_METHOD_NAME))
                .setRequestMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance()))
                .setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build();
		// 定義服務器調用處理器。核心處理邏輯可就在這lambda表達式定義的匿名內部類里了。也只有一個方法:
		// grpcCommonRequestAcceptor.request(request, responseObserver)
        final ServerCallHandler<Payload, Payload> payloadHandler = ServerCalls
                .asyncUnaryCall((request, responseObserver) -> grpcCommonRequestAcceptor.request(request, responseObserver));

        final ServerServiceDefinition serviceDefOfUnaryPayload = ServerServiceDefinition.builder(
                        GrpcServerConstants.REQUEST_SERVICE_NAME)
                .addMethod(unaryPayloadMethod, payloadHandler).build();
        handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfUnaryPayload, serverInterceptor));

        // bi stream register.
        // bi流式調用服務,主要是連接請求、連接斷開
        // 核心處理邏輯:
		// grpcBiStreamRequestAcceptor.requestBiStream(responseObserver)
        final ServerCallHandler<Payload, Payload> biStreamHandler = ServerCalls.asyncBidiStreamingCall(
                (responseObserver) -> grpcBiStreamRequestAcceptor.requestBiStream(responseObserver));

        final MethodDescriptor<Payload, Payload> biStreamMethod = MethodDescriptor.<Payload, Payload>newBuilder()
                .setType(MethodDescriptor.MethodType.BIDI_STREAMING).setFullMethodName(MethodDescriptor
                        .generateFullMethodName(GrpcServerConstants.REQUEST_BI_STREAM_SERVICE_NAME,
                                GrpcServerConstants.REQUEST_BI_STREAM_METHOD_NAME))
                .setRequestMarshaller(ProtoUtils.marshaller(Payload.newBuilder().build()))
                .setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build();

        final ServerServiceDefinition serviceDefOfBiStream = ServerServiceDefinition
                .builder(GrpcServerConstants.REQUEST_BI_STREAM_SERVICE_NAME).addMethod(biStreamMethod, biStreamHandler).build();
        handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfBiStream, serverInterceptor));

    }

處理連接請求:

	// com.alibaba.nacos.core.remote.grpc.GrpcBiStreamRequestAcceptor
    @Override
    public StreamObserver<Payload> requestBiStream(StreamObserver<Payload> responseObserver) {
        
        StreamObserver<Payload> streamObserver = new StreamObserver<Payload>() {
            
            final String connectionId = GrpcServerConstants.CONTEXT_KEY_CONN_ID.get();
            
            final Integer localPort = GrpcServerConstants.CONTEXT_KEY_CONN_LOCAL_PORT.get();
            
            final int remotePort = GrpcServerConstants.CONTEXT_KEY_CONN_REMOTE_PORT.get();
            
            String remoteIp = GrpcServerConstants.CONTEXT_KEY_CONN_REMOTE_IP.get();
            
            String clientIp = "";
            
            @Override
            public void onNext(Payload payload) {
                // 處理連接請求
                clientIp = payload.getMetadata().getClientIp();
                traceDetailIfNecessary(payload);
                
                Object parseObj;
                // 省略...
				// 檢查
				
                if (parseObj instanceof ConnectionSetupRequest) {
                    ConnectionSetupRequest setUpRequest = (ConnectionSetupRequest) parseObj;
                    // 設置label,省略
                    
                    // 構建Connection
                    ConnectionMeta metaInfo = new ConnectionMeta(connectionId, payload.getMetadata().getClientIp(),
                            remoteIp, remotePort, localPort, ConnectionType.GRPC.getType(),
                            setUpRequest.getClientVersion(), appName, setUpRequest.getLabels());
                    metaInfo.setTenant(setUpRequest.getTenant());
                    // 第三個參數Channel,是發生網路數據的關鍵
                    Connection connection = new GrpcConnection(metaInfo, responseObserver, GrpcServerConstants.CONTEXT_KEY_CHANNEL.get());
                    connection.setAbilities(setUpRequest.getAbilities());
                    boolean rejectSdkOnStarting = metaInfo.isSdkSource() && !ApplicationUtils.isStarted();
                    // 注冊連接, 重點在 “或” 條件上
                    // connectionManager.register
                    if (rejectSdkOnStarting || !connectionManager.register(connectionId, connection)) {
                        //Not register to the connection manager if current server is over limit or server is starting.
                        // 如果當前服務器已超限制,或者服務器還在啟動過程中,則注冊失敗。
                            connection.request(new ConnectResetRequest(), 3000L);
                            connection.close();
                    }
                }            
            	// 省略。。。
            }
            // 省略。。。
        };
        
        return streamObserver;
    }

這里出現了我們接觸到的第一個概念:Connection-連接,他有個屬性ConnectionMeta,記錄連接相關的信息。當需要發起請求時,他會將這些信息設置到Request中,然后通過GrpcUtils轉換成Payload發出請求
繼續看com.alibaba.nacos.core.remote.ConnectionManager#register

    public synchronized boolean register(String connectionId, Connection connection) {
        
        if (connection.isConnected()) {
            String clientIp = connection.getMetaInfo().clientIp;
            // 省略入參檢查
            // 注冊客戶端
            connections.put(connectionId, connection);
            // 登記客戶端IP
            if (!connectionForClientIp.containsKey(clientIp)) {
                connectionForClientIp.put(clientIp, new AtomicInteger(0));
            }
            connectionForClientIp.get(clientIp).getAndIncrement();
            
            // 通知客戶端連接Listener
            clientConnectionEventListenerRegistry.notifyClientConnected(connection);
            
            return true;
            
        }
        return false;
        
    }

此處出現第一個Manager:ConnectionManager。用來管理所有客戶端的連接。登記連接后,調用了所有的Listener的clientConnected方法。其中,有個ConnectionBasedClientManager,看名字就知道,可能是負責管理客戶端的。

	// > ConnectionBasedClientManager#clientConnected(com.alibaba.nacos.core.remote.Connection)
	//  > ConnectionBasedClientManager#clientConnected(java.lang.String, com.alibaba.nacos.naming.core.v2.client.ClientAttributes)
	// ConnectionBasedClientManager
	@Override
    public boolean clientConnected(String clientId, ClientAttributes attributes) {
        String type = attributes.getClientAttribute(ClientConstants.CONNECTION_TYPE);
        ClientFactory clientFactory = ClientFactoryHolder.getInstance().findClientFactory(type);
        // 通過ClientFactory創建客戶端
        // 從以上的兩行代碼,我們通過ClientConstants.CONNECTION_TYPE就知道工廠是ConnectionBasedClientFactory,對應的客戶端自然是ConnectionBasedClient
        return clientConnected(clientFactory.newClient(clientId, attributes));
    }
    @Override
    public boolean clientConnected(final Client client) {
    	// 登記客戶端
        clients.computeIfAbsent(client.getClientId(), s -> {
            return (ConnectionBasedClient) client;
        });
        return true;
    }

至此,我們又發現一個新概念:Client-客戶端。由Grpc連接的客戶端,都由ConnectionBasedClientManager進行管理。

小結

概念 管理者
連接 com.alibaba.nacos.core.remote.Connection ConnectionManager
客戶端 com.alibaba.nacos.naming.core.v2.client.Client ClientManager

注冊實例

客戶端

在這里插入圖片描述
我們重點看看

    public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {
    	// 創建請求。每個Request在Nacos服務端都由對應的Handler
        InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
                NamingRemoteConstants.REGISTER_INSTANCE, instance);
        requestToServer(request, Response.class);
        redoService.instanceRegistered(serviceName, groupName);
    }

服務端

我們前面說服務端啟動時,說這個是負責處理通用請求的:

final ServerCallHandler<Payload, Payload> payloadHandler = ServerCalls.asyncUnaryCall((request, responseObserver) -> grpcCommonRequestAcceptor.request(request, responseObserver));

我們就順著往下看

	// com.alibaba.nacos.core.remote.grpc.GrpcRequestAcceptor#request
    @Override
    public void request(Payload grpcRequest, StreamObserver<Payload> responseObserver) {
        
        String type = grpcRequest.getMetadata().getType();
        // 省略如下內容:
        // 檢查服務是否已啟動
        // 如果是客戶端對服務端的健康檢查,則直接響應
        // ----------------------------
        // 從對應的請求處理器
        RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(type);
        // 省略:no handler found. 的異常處理
        // ----------------------------
        String connectionId = GrpcServerConstants.CONTEXT_KEY_CONN_ID.get();
        
        // 省略:檢查連接是否正常.
        Object parseObj = null;
        parseObj = GrpcUtils.parse(grpcRequest);
		// 省略:轉換異常、無效請求異常
        
        Request request = (Request) parseObj;
			// 從ConnectionManager獲取到對應的Connection
            Connection connection = connectionManager.getConnection(GrpcServerConstants.CONTEXT_KEY_CONN_ID.get());
            // 組裝RequestMeta
            RequestMeta requestMeta = new RequestMeta();
            requestMeta.setClientIp(connection.getMetaInfo().getClientIp());
            requestMeta.setConnectionId(GrpcServerConstants.CONTEXT_KEY_CONN_ID.get());
            requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
            requestMeta.setLabels(connection.getMetaInfo().getLabels());
            connectionManager.refreshActiveTime(requestMeta.getConnectionId());
            // 調用requestHandler處理請求
            Response response = requestHandler.handleRequest(request, requestMeta);
            Payload payloadResponse = GrpcUtils.convert(response);
            traceIfNecessary(payloadResponse, false);
            responseObserver.onNext(payloadResponse);
            responseObserver.onCompleted();
    }

這些便是通用請求處理的核心邏輯。現在我們便來看InstanceRequest的處理com.alibaba.nacos.naming.remote.rpc.handler.InstanceRequestHandler

    @Override
    @Secured(action = ActionTypes.WRITE)
    public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {
        Service service = Service
                .newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
        switch (request.getType()) {
            case NamingRemoteConstants.REGISTER_INSTANCE:
                return registerInstance(service, request, meta);
            case NamingRemoteConstants.DE_REGISTER_INSTANCE:
                return deregisterInstance(service, request, meta);
            default:
                throw new NacosException
        }
    }
    private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta)
            throws NacosException {
        // 1. 注冊實例
        clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());
        // 2. 發布事件:RegisterInstanceTraceEvent
        NotifyCenter.publishEvent(new RegisterInstanceTraceEvent(System.currentTimeMillis(),
                meta.getClientIp(), true, service.getNamespace(), service.getGroup(), service.getName(),
                request.getInstance().getIp(), request.getInstance().getPort()));
        return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);
    }

// 注冊實例:
// com.alibaba.nacos.naming.core.v2.service.impl.EphemeralClientOperationServiceImpl#registerInstance

    @Override
    public void registerInstance(Service service, Instance instance, String clientId) throws NacosException {
        NamingUtils.checkInstanceIsLegal(instance);
    	// 從ServiceManager獲取已注冊服務。而我們當前是要注冊實例,所以,這個方法肯定還內含玄機
        Service singleton = ServiceManager.getInstance().getSingleton(service);
        // 省略:如果獲取到的是持久化實例,意味著當前注冊臨時實例沖突了,返回異常。
        Client client = clientManager.getClient(clientId);

        InstancePublishInfo instanceInfo = getPublishInfo(instance);
        // 記錄當前客戶端發布的實例
        client.addServiceInstance(singleton, instanceInfo);
        client.setLastUpdatedTime();
        client.recalculateRevision();
        // 發布服務注冊事件
        NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
        NotifyCenter
                .publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
    }

	// com.alibaba.nacos.naming.core.v2.ServiceManager
    /**
     * Get singleton service. Put to manager if no singleton.
     * 獲取單例服務(單例意味著整個應用只有一個對象),如果不存在,則注冊到Manager
     */
    public Service getSingleton(Service service) {
    	// 如果不存在就注冊
        singletonRepository.computeIfAbsent(service, key -> {
        	// 發布服務元信息數據事件。不過該事件對于持久實例才有用處。
            NotifyCenter.publishEvent(new MetadataEvent.ServiceMetadataEvent(service, false));
            return service;
        });
        Service result = singletonRepository.get(service);
        // 將服務登記到namespace
        namespaceSingletonMaps.computeIfAbsent(result.getNamespace(), namespace -> new ConcurrentHashSet<>());
        namespaceSingletonMaps.get(result.getNamespace()).add(result);
        return result;
    }
	// 再看看ClientOperationEvent.ClientRegisterServiceEvent
	// > com.alibaba.nacos.naming.core.v2.index.ClientServiceIndexesManager#onEvent
	//  > com.alibaba.nacos.naming.core.v2.index.ClientServiceIndexesManager#handleClientOperation
	//   > com.alibaba.nacos.naming.core.v2.index.ClientServiceIndexesManager#addPublisherIndexes
	// 登記發布服務的客戶端
    private void addPublisherIndexes(Service service, String clientId) {
        publisherIndexes.computeIfAbsent(service, key -> new ConcurrentHashSet<>());
        publisherIndexes.get(service).add(clientId);
        NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
    }

小結

我們總結一下,以上涉及到的概念。

概念 管理者 描述
服務 com.alibaba.nacos.naming.core.v2.pojo.Service ServiceManager

除了這個概念,實際上我們還看到Client的內部結構:
AbstractClient:

  • 記錄客戶端發布的服務:ConcurrentHashMap<Service, InstancePublishInfo> publishers
  • 記錄客戶端訂閱的服務:ConcurrentHashMap<Service, Subscriber> subscribers

    這個點其實要到訂閱服務請求才會分析到,但為了信息不會太分散,所以就放到一起了。

ClientServiceIndexesManager

  • 客戶端索引管理者。這里的索引指的是,通過Service快速找到客戶端,只是客戶端有ClientManager,如果這里再存一份也不合適,不利于數據維護。因此這里存的是clientId。估計也是如此,他才叫客戶端索引管理者。

查詢和訂閱實例

> com.alibaba.nacos.client.naming.NacosNamingService#getAllInstances(java.lang.String, java.lang.String, java.util.List<java.lang.String>)
 > com.alibaba.nacos.client.naming.NacosNamingService#getAllInstances(java.lang.String, java.lang.String, java.util.List<java.lang.String>, boolean)
  > com.alibaba.nacos.client.naming.NacosNamingService#getAllInstances(String serviceName, String groupName, List<String> clusters, boolean subscribe)
   > com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy#subscribe
    > com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy#doSubscribe
    public ServiceInfo doSubscribe(String serviceName, String groupName, String clusters) throws NacosException {
    	// 重點SubscribeServiceRequest,看服務端代碼需要知道是什么請求
        SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, groupName, serviceName, clusters, true);
        SubscribeServiceResponse response = requestToServer(request, SubscribeServiceResponse.class);
        redoService.subscriberRegistered(serviceName, groupName, clusters);
        return response.getServiceInfo();
    }
    

服務端

	// com.alibaba.nacos.naming.remote.rpc.handler.SubscribeServiceRequestHandler
    @Override
    @Secured(action = ActionTypes.READ)
    public SubscribeServiceResponse handle(SubscribeServiceRequest request, RequestMeta meta) throws NacosException {
        String namespaceId = request.getNamespace();
        String serviceName = request.getServiceName();
        String groupName = request.getGroupName();
        String app = request.getHeader("app", "unknown");
        String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
        Service service = Service.newService(namespaceId, groupName, serviceName, true);
        // 訂閱者
        Subscriber subscriber = new Subscriber(meta.getClientIp(), meta.getClientVersion(), app, meta.getClientIp(),
                namespaceId, groupedServiceName, 0, request.getClusters());
        // 服務信息,這里有幾個參數是需要通過方法來獲取的
        // serviceStorage.getData(service),負責獲取實例信息。會從ClientManage收集
        // ServiceUtil.selectInstancesWithHealthyProtection,肩負著過濾cluster、健康實例,并執行自動保護機制。
        ServiceInfo serviceInfo = ServiceUtil.selectInstancesWithHealthyProtection(serviceStorage.getData(service),
                metadataManager.getServiceMetadata(service).orElse(null), subscriber.getCluster(), false,
                true, subscriber.getIp());
        if (request.isSubscribe()) {
        	// 訂閱服務
            clientOperationService.subscribeService(service, subscriber, meta.getConnectionId());
            // 發布訂閱事件
            NotifyCenter.publishEvent(new SubscribeServiceTraceEvent(System.currentTimeMillis(),
                    meta.getClientIp(), service.getNamespace(), service.getGroup(), service.getName()));
        } else {
        	// 取消訂閱
            clientOperationService.unsubscribeService(service, subscriber, meta.getConnectionId());
            NotifyCenter.publishEvent(new UnsubscribeServiceTraceEvent(System.currentTimeMillis(),
                    meta.getClientIp(), service.getNamespace(), service.getGroup(), service.getName()));
        }
        return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);
    }
	
	// > com.alibaba.nacos.naming.core.v2.index.ServiceStorage#getData
	//  > com.alibaba.nacos.naming.core.v2.index.ServiceStorage#getPushData
	//   > com.alibaba.nacos.naming.core.v2.index.ServiceStorage#getAllInstancesFromIndex
	// > com.alibaba.nacos.naming.utils.ServiceUtil#selectInstancesWithHealthyProtection(com.alibaba.nacos.api.naming.pojo.ServiceInfo, com.alibaba.nacos.naming.core.v2.metadata.ServiceMetadata, java.lang.String, boolean, boolean, java.lang.String)
	//  > com.alibaba.nacos.naming.utils.ServiceUtil#doSelectInstances
	// 上面是調用路徑,這里把生產數據的方法重點撈出來
	// ServiceStorage的數據生產
    public ServiceInfo getPushData(Service service) {
        ServiceInfo result = emptyServiceInfo(service);
        if (!ServiceManager.getInstance().containSingleton(service)) {
            return result;
        }
        Service singleton = ServiceManager.getInstance().getSingleton(service);
        result.setHosts(getAllInstancesFromIndex(singleton));
        // 從ServiceManager拿到服務的實例信息,并登記到ServiceStorage#serviceDataIndexes中
        serviceDataIndexes.put(singleton, result);
        return result;
    }
    private List<Instance> getAllInstancesFromIndex(Service service) {
        Set<Instance> result = new HashSet<>();
        Set<String> clusters = new HashSet<>();
        
        for (String each : serviceIndexesManager.getAllClientsRegisteredService(service)) {
        	// 獲取實例信息
            Optional<InstancePublishInfo> instancePublishInfo = getInstanceInfo(each, service);
            if (instancePublishInfo.isPresent()) {
                InstancePublishInfo publishInfo = instancePublishInfo.get();
                //If it is a BatchInstancePublishInfo type, it will be processed manually and added to the instance list
                if (publishInfo instanceof BatchInstancePublishInfo) {
                    BatchInstancePublishInfo batchInstancePublishInfo = (BatchInstancePublishInfo) publishInfo;
                    List<Instance> batchInstance = parseBatchInstance(service, batchInstancePublishInfo, clusters);
                    result.addAll(batchInstance);
                } else {
                    Instance instance = parseInstance(service, instancePublishInfo.get());
                    result.add(instance);
                    clusters.add(instance.getClusterName());
                }
            }
        }
        // cache clusters of this service
        // 緩存集群信息
        serviceClusterIndex.put(service, clusters);
        return new LinkedList<>(result);
    }
        
    private Optional<InstancePublishInfo> getInstanceInfo(String clientId, Service service) {
    	// 通過客戶端ID,獲取到Client,進而從其獲取客戶端發布的服務。
        Client client = clientManager.getClient(clientId);
        if (null == client) {
            return Optional.empty();
        }
        return Optional.ofNullable(client.getInstancePublishInfo(service));
    }

從查詢實例這里,我們看到有個數據存儲:ServiceStorage。重要的是,這個雖然叫存儲,但是實際上里面的數據卻是從別處獲取來的。來源于:ServiceManager、ServiceIndexesManager、ClientManager。從這個角度說,更像是個緩存。

總結

上面的整了一堆源代碼,容易看煩了。感興趣的,可以根據上面的源碼深入看看。為了方便大家,我畫了圖給大家:
在這里插入圖片描述
為了讓大家重點看到數據生產過程:
在這里插入圖片描述
從圖中,我們可以看到,nacos2.x的數據結構并不像官方的Service->Group->Instance。而是按照Connection、Client、Service分別通過對應的管理器進行管理。此外,為了避免數據多處存儲,還有ClientServiceIndexesManager作為Client和Service的橋梁。
除此之外,還有ServiceStorage,作為數據緩存。不過,當我們深入了解ServiceStorage時,會發現他的數據一致性/數據的更新,是在給訂閱服務的客戶端定時推送時通過調用com.alibaba.nacos.naming.core.v2.index.ServiceStorage#getPushData來實現的。個人認為這是有可以優化空間的,他們完全可以通過各種事件來監聽實例的生死來更新數據。

總而言之,如果不算ServiceStorage這個緩存,那么數據主要存在于一下的Manager中:
ConnectionManager、ClientManager、ServiceManager、ClientServiceIndexesManager。

到這里,可能有同學就有疑問了。那么Group、Cluster這些概念去哪了呢?
這些概念都成為了屬性/字段了。
com.alibaba.nacos.naming.core.v2.pojo.Service#group
com.alibaba.nacos.api.naming.pojo.Instance#clusterName
即使在ServiceStorage封裝ServiceInfo時,他們也是作為屬性來存儲的。通過ServiceUtil來過濾目標實例。

最后,提醒大家一下,我們這里只是分析了臨時實例。是最常用的場景。當然,如果我們用Nacos的持久實例,SpringCloud也就自然支持了持久實例。不過,咱們不深究了,感興趣的同學,可以順著往下挖一挖持久實例。

后記

這種深度刨析源碼、深挖一個技術細節的實現,太費時間、也太費篇幅了。我自己都感覺差點把整個nacos的源碼都搬上來了。莫見怪。。。
關于nacos的一致性協議,就不在這里聊了,這個東西得單獨倒騰,還要與其他分布式中間件相互對比,還有理論。。
下次,咱們先往后聊OpenFeign。

推薦

Nacos的實現原理在官網有一電子書《Nacos架構&原理》,想要了解頂層設計原理的同學,建議看看。

原文鏈接:https://blog.csdn.net/Evan_L/article/details/132513335

  • 上一篇:沒有了
  • 下一篇:沒有了
欄目分類
最近更新