日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學(xué)無(wú)先后,達(dá)者為師

網(wǎng)站首頁(yè) 編程語(yǔ)言 正文

【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nacos服務(wù)端數(shù)據(jù)結(jié)構(gòu)和模型

作者:Evan_L 更新時(shí)間: 2024-07-18 編程語(yǔ)言

前言

上一文中,我們從官方的圖示了解到Nacos的服務(wù)數(shù)據(jù)結(jié)構(gòu)。但我關(guān)心的是,Nacos2.x不是重構(gòu)了嗎?怎么還是這種數(shù)據(jù)結(jié)構(gòu)?我推測(cè),必然是為了對(duì)Nacos1.x的兼容,實(shí)際存儲(chǔ)應(yīng)該不是這樣的。于是,沿著這個(gè)問題出發(fā)我們一起來(lái)翻一下源碼。

從NamingService的使用開始

在扎入源碼之前,我們需要回憶一下,我們是怎么使用Nacos的?

  1. 構(gòu)建NamingService
    NamingService serviceRegistry = NacosFactory.createNamingService(properties);
    實(shí)際上,這個(gè)動(dòng)作的背后,意味著我們連接了Nacos服務(wù)端。
  2. 注冊(cè)服務(wù)
    serviceRegistry.registerInstance(serviceName, groupName, instance);
  3. 查詢服務(wù)
    serviceRegistry.getAllInstances(serviceName, groupName, List.of(clusterName));
    因此,我們就沿著這幾個(gè)操作,摸一摸源碼。

?。?!高能警告?。?!

沒有耐心看源碼的同學(xué),可以直接翻到總結(jié),直接看結(jié)論。

構(gòu)建NamingService

客戶端

    // com.alibaba.nacos.client.naming.NacosNamingService
    /**
     * 初始化方法
     * <p>由NacosNamingService構(gòu)造器調(diào)用,用于初始NamingService</p>
     */
    private void init(Properties properties) throws NacosException {
        final NacosClientProperties nacosClientProperties = NacosClientProperties.PROTOTYPE.derive(properties);
        // 省略...
        // 創(chuàng)建客戶端
        this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, nacosClientProperties, changeNotifier);
    }

    // com.alibaba.nacos.client.naming.remote.NamingClientProxyDelegate
    /**
     * NamingClientProxyDelegate構(gòu)造器
     */    
    public NamingClientProxyDelegate(String namespace, ServiceInfoHolder serviceInfoHolder, NacosClientProperties properties,
        InstancesChangeNotifier changeNotifier) throws NacosException {
        // 省略...
        // 初始化了兩個(gè)客戶端,一個(gè)是Http,另一個(gè)是Grpc。不過,在注冊(cè)實(shí)例時(shí),如果該實(shí)例為臨時(shí)實(shí)例,則使用Grpc,因此我們重點(diǎn)關(guān)注Grpc
        this.httpClientProxy = new NamingHttpClientProxy(namespace, securityProxy, serverListManager, properties);
        this.grpcClientProxy = new NamingGrpcClientProxy(namespace, securityProxy, serverListManager, properties,
                serviceInfoHolder);
    }
    
    // com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy
    /**
     * NamingGrpcClientProxy構(gòu)造器
     */    
    public NamingGrpcClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListFactory serverListFactory,
            NacosClientProperties properties, ServiceInfoHolder serviceInfoHolder) throws NacosException {
        // 省略...
        // 創(chuàng)建RPC客戶端
        this.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels);
        this.redoService = new NamingGrpcRedoService(this);
        // 啟動(dòng)客戶端
        start(serverListFactory, serviceInfoHolder);
    }
    // com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy
    private void start(ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException {
        rpcClient.serverListFactory(serverListFactory);
        rpcClient.registerConnectionListener(redoService);
        rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder));
        // 啟動(dòng)客戶端
        rpcClient.start();
        NotifyCenter.registerSubscriber(this);
    }
    
    // com.alibaba.nacos.common.remote.client.RpcClient#start
    /**
     * 啟動(dòng)客戶端
     */
    public final void start() throws NacosException {
        // 控制只啟動(dòng)一次
        boolean success = rpcClientStatus.compareAndSet(RpcClientStatus.INITIALIZED, RpcClientStatus.STARTING);
        if (!success) {
            return;
        }
        // 創(chuàng)建一個(gè)只有2個(gè)線程的定時(shí)任務(wù)線程池
        clientEventExecutor = new ScheduledThreadPoolExecutor(2, r -> {
            Thread t = new Thread(r);
            t.setName("com.alibaba.nacos.client.remote.worker");
            t.setDaemon(true);
            return t;
        });
        
        // 提交-處理連接事件的Task
        clientEventExecutor.submit(() -> {
            while (!clientEventExecutor.isTerminated() && !clientEventExecutor.isShutdown()) {
                ConnectionEvent take;
                take = eventLinkedBlockingQueue.take();
                if (take.isConnected()) {
                    notifyConnected();
                } else if (take.isDisConnected()) {
                    notifyDisConnected();
                }
            }
        });
        // 提交-心跳任務(wù)
        clientEventExecutor.submit(() -> {
            while (true) {
            // 由于這里有一大堆邏輯,省略。
            // 1. 超過時(shí)間間隔,發(fā)起心跳請(qǐng)求
            // 1.1 心跳請(qǐng)求失敗,記錄當(dāng)前狀態(tài)為不健康,并記錄上下文。
            // 1.2 檢查當(dāng)前配置的推薦的Nacos服務(wù)器是否在服務(wù)器列表中。在,則嘗試重新連接推薦的服務(wù)器。
        });
        
        // connect to server, try to connect to server sync retryTimes times, async starting if failed.
        // 連接服務(wù)端,嘗試retryTimes次,同步地連接服務(wù)端,如果依然失敗,則改為異步連接。
        Connection connectToServer = null;
        rpcClientStatus.set(RpcClientStatus.STARTING);
        
        int startUpRetryTimes = rpcClientConfig.retryTimes();
        while (startUpRetryTimes > 0 && connectToServer == null) {
            try {
                startUpRetryTimes--;
                ServerInfo serverInfo = nextRpcServer();
                // 連接服務(wù)器
                connectToServer = connectToServer(serverInfo);
            } catch (Throwable e) {
                LoggerUtils.printIfWarnEnabled(LOGGER,
                        "[{}] Fail to connect to server on start up, error message = {}, start up retry times left: {}",
                        rpcClientConfig.name(), e.getMessage(), startUpRetryTimes, e);
            }
            
        }
        
        if (connectToServer != null) {
            this.currentConnection = connectToServer;
            rpcClientStatus.set(RpcClientStatus.RUNNING);
            eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED));
        } else {
            switchServerAsync();
        }
        
        registerServerRequestHandler(new ConnectResetRequestHandler());
        
        // register client detection request.
        // 注冊(cè)客戶端檢測(cè)請(qǐng)求處理器,用于響應(yīng)服務(wù)端的探測(cè)
        registerServerRequestHandler(request -> {
            if (request instanceof ClientDetectionRequest) {
                return new ClientDetectionResponse();
            }
            
            return null;
        });
        
    }

服務(wù)端-處理連接請(qǐng)求

服務(wù)端的源碼首先我們得找到GrpcServer


    @Override
    public void startServer() throws Exception {
    	// 1. 創(chuàng)建請(qǐng)求處理器注冊(cè)器
        final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry();
        // 2. 注冊(cè)請(qǐng)求處理器,并封裝攔截器器。封裝后,有點(diǎn)類似于SpringMVC的HandlerAdapter
        addServices(handlerRegistry, new GrpcConnectionInterceptor());
        NettyServerBuilder builder = NettyServerBuilder.forPort(getServicePort()).executor(getRpcExecutor());
		// 省略
		
        server = builder.maxInboundMessageSize(getMaxInboundMessageSize()).fallbackHandlerRegistry(handlerRegistry)
                .compressorRegistry(CompressorRegistry.getDefaultInstance())
                .decompressorRegistry(DecompressorRegistry.getDefaultInstance())
                .addTransportFilter(new AddressTransportFilter(connectionManager))
                .keepAliveTime(getKeepAliveTime(), TimeUnit.MILLISECONDS)
                .keepAliveTimeout(getKeepAliveTimeout(), TimeUnit.MILLISECONDS)
                .permitKeepAliveTime(getPermitKeepAliveTime(), TimeUnit.MILLISECONDS)
                .build();
		// 啟動(dòng)服務(wù)
        server.start();
    }


    private void addServices(MutableHandlerRegistry handlerRegistry, ServerInterceptor... serverInterceptor) {

        // unary common call register.
        // 通用調(diào)用注冊(cè)
        final MethodDescriptor<Payload, Payload> unaryPayloadMethod = MethodDescriptor.<Payload, Payload>newBuilder()
                .setType(MethodDescriptor.MethodType.UNARY)
                .setFullMethodName(MethodDescriptor.generateFullMethodName(GrpcServerConstants.REQUEST_SERVICE_NAME,
                        GrpcServerConstants.REQUEST_METHOD_NAME))
                .setRequestMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance()))
                .setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build();
		// 定義服務(wù)器調(diào)用處理器。核心處理邏輯可就在這lambda表達(dá)式定義的匿名內(nèi)部類里了。也只有一個(gè)方法:
		// grpcCommonRequestAcceptor.request(request, responseObserver)
        final ServerCallHandler<Payload, Payload> payloadHandler = ServerCalls
                .asyncUnaryCall((request, responseObserver) -> grpcCommonRequestAcceptor.request(request, responseObserver));

        final ServerServiceDefinition serviceDefOfUnaryPayload = ServerServiceDefinition.builder(
                        GrpcServerConstants.REQUEST_SERVICE_NAME)
                .addMethod(unaryPayloadMethod, payloadHandler).build();
        handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfUnaryPayload, serverInterceptor));

        // bi stream register.
        // bi流式調(diào)用服務(wù),主要是連接請(qǐng)求、連接斷開
        // 核心處理邏輯:
		// grpcBiStreamRequestAcceptor.requestBiStream(responseObserver)
        final ServerCallHandler<Payload, Payload> biStreamHandler = ServerCalls.asyncBidiStreamingCall(
                (responseObserver) -> grpcBiStreamRequestAcceptor.requestBiStream(responseObserver));

        final MethodDescriptor<Payload, Payload> biStreamMethod = MethodDescriptor.<Payload, Payload>newBuilder()
                .setType(MethodDescriptor.MethodType.BIDI_STREAMING).setFullMethodName(MethodDescriptor
                        .generateFullMethodName(GrpcServerConstants.REQUEST_BI_STREAM_SERVICE_NAME,
                                GrpcServerConstants.REQUEST_BI_STREAM_METHOD_NAME))
                .setRequestMarshaller(ProtoUtils.marshaller(Payload.newBuilder().build()))
                .setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build();

        final ServerServiceDefinition serviceDefOfBiStream = ServerServiceDefinition
                .builder(GrpcServerConstants.REQUEST_BI_STREAM_SERVICE_NAME).addMethod(biStreamMethod, biStreamHandler).build();
        handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfBiStream, serverInterceptor));

    }

處理連接請(qǐng)求:

	// com.alibaba.nacos.core.remote.grpc.GrpcBiStreamRequestAcceptor
    @Override
    public StreamObserver<Payload> requestBiStream(StreamObserver<Payload> responseObserver) {
        
        StreamObserver<Payload> streamObserver = new StreamObserver<Payload>() {
            
            final String connectionId = GrpcServerConstants.CONTEXT_KEY_CONN_ID.get();
            
            final Integer localPort = GrpcServerConstants.CONTEXT_KEY_CONN_LOCAL_PORT.get();
            
            final int remotePort = GrpcServerConstants.CONTEXT_KEY_CONN_REMOTE_PORT.get();
            
            String remoteIp = GrpcServerConstants.CONTEXT_KEY_CONN_REMOTE_IP.get();
            
            String clientIp = "";
            
            @Override
            public void onNext(Payload payload) {
                // 處理連接請(qǐng)求
                clientIp = payload.getMetadata().getClientIp();
                traceDetailIfNecessary(payload);
                
                Object parseObj;
                // 省略...
				// 檢查
				
                if (parseObj instanceof ConnectionSetupRequest) {
                    ConnectionSetupRequest setUpRequest = (ConnectionSetupRequest) parseObj;
                    // 設(shè)置label,省略
                    
                    // 構(gòu)建Connection
                    ConnectionMeta metaInfo = new ConnectionMeta(connectionId, payload.getMetadata().getClientIp(),
                            remoteIp, remotePort, localPort, ConnectionType.GRPC.getType(),
                            setUpRequest.getClientVersion(), appName, setUpRequest.getLabels());
                    metaInfo.setTenant(setUpRequest.getTenant());
                    // 第三個(gè)參數(shù)Channel,是發(fā)生網(wǎng)路數(shù)據(jù)的關(guān)鍵
                    Connection connection = new GrpcConnection(metaInfo, responseObserver, GrpcServerConstants.CONTEXT_KEY_CHANNEL.get());
                    connection.setAbilities(setUpRequest.getAbilities());
                    boolean rejectSdkOnStarting = metaInfo.isSdkSource() && !ApplicationUtils.isStarted();
                    // 注冊(cè)連接, 重點(diǎn)在 “或” 條件上
                    // connectionManager.register
                    if (rejectSdkOnStarting || !connectionManager.register(connectionId, connection)) {
                        //Not register to the connection manager if current server is over limit or server is starting.
                        // 如果當(dāng)前服務(wù)器已超限制,或者服務(wù)器還在啟動(dòng)過程中,則注冊(cè)失敗。
                            connection.request(new ConnectResetRequest(), 3000L);
                            connection.close();
                    }
                }            
            	// 省略。。。
            }
            // 省略。。。
        };
        
        return streamObserver;
    }

這里出現(xiàn)了我們接觸到的第一個(gè)概念:Connection-連接,他有個(gè)屬性ConnectionMeta,記錄連接相關(guān)的信息。當(dāng)需要發(fā)起請(qǐng)求時(shí),他會(huì)將這些信息設(shè)置到Request中,然后通過GrpcUtils轉(zhuǎn)換成Payload發(fā)出請(qǐng)求
繼續(xù)看com.alibaba.nacos.core.remote.ConnectionManager#register

    public synchronized boolean register(String connectionId, Connection connection) {
        
        if (connection.isConnected()) {
            String clientIp = connection.getMetaInfo().clientIp;
            // 省略入?yún)z查
            // 注冊(cè)客戶端
            connections.put(connectionId, connection);
            // 登記客戶端IP
            if (!connectionForClientIp.containsKey(clientIp)) {
                connectionForClientIp.put(clientIp, new AtomicInteger(0));
            }
            connectionForClientIp.get(clientIp).getAndIncrement();
            
            // 通知客戶端連接Listener
            clientConnectionEventListenerRegistry.notifyClientConnected(connection);
            
            return true;
            
        }
        return false;
        
    }

此處出現(xiàn)第一個(gè)Manager:ConnectionManager。用來(lái)管理所有客戶端的連接。登記連接后,調(diào)用了所有的Listener的clientConnected方法。其中,有個(gè)ConnectionBasedClientManager,看名字就知道,可能是負(fù)責(zé)管理客戶端的。

	// > ConnectionBasedClientManager#clientConnected(com.alibaba.nacos.core.remote.Connection)
	//  > ConnectionBasedClientManager#clientConnected(java.lang.String, com.alibaba.nacos.naming.core.v2.client.ClientAttributes)
	// ConnectionBasedClientManager
	@Override
    public boolean clientConnected(String clientId, ClientAttributes attributes) {
        String type = attributes.getClientAttribute(ClientConstants.CONNECTION_TYPE);
        ClientFactory clientFactory = ClientFactoryHolder.getInstance().findClientFactory(type);
        // 通過ClientFactory創(chuàng)建客戶端
        // 從以上的兩行代碼,我們通過ClientConstants.CONNECTION_TYPE就知道工廠是ConnectionBasedClientFactory,對(duì)應(yīng)的客戶端自然是ConnectionBasedClient
        return clientConnected(clientFactory.newClient(clientId, attributes));
    }
    @Override
    public boolean clientConnected(final Client client) {
    	// 登記客戶端
        clients.computeIfAbsent(client.getClientId(), s -> {
            return (ConnectionBasedClient) client;
        });
        return true;
    }

至此,我們又發(fā)現(xiàn)一個(gè)新概念:Client-客戶端。由Grpc連接的客戶端,都由ConnectionBasedClientManager進(jìn)行管理。

小結(jié)

概念 管理者
連接 com.alibaba.nacos.core.remote.Connection ConnectionManager
客戶端 com.alibaba.nacos.naming.core.v2.client.Client ClientManager

注冊(cè)實(shí)例

客戶端

在這里插入圖片描述
我們重點(diǎn)看看

    public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {
    	// 創(chuàng)建請(qǐng)求。每個(gè)Request在Nacos服務(wù)端都由對(duì)應(yīng)的Handler
        InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
                NamingRemoteConstants.REGISTER_INSTANCE, instance);
        requestToServer(request, Response.class);
        redoService.instanceRegistered(serviceName, groupName);
    }

服務(wù)端

我們前面說(shuō)服務(wù)端啟動(dòng)時(shí),說(shuō)這個(gè)是負(fù)責(zé)處理通用請(qǐng)求的:

final ServerCallHandler<Payload, Payload> payloadHandler = ServerCalls.asyncUnaryCall((request, responseObserver) -> grpcCommonRequestAcceptor.request(request, responseObserver));

我們就順著往下看

	// com.alibaba.nacos.core.remote.grpc.GrpcRequestAcceptor#request
    @Override
    public void request(Payload grpcRequest, StreamObserver<Payload> responseObserver) {
        
        String type = grpcRequest.getMetadata().getType();
        // 省略如下內(nèi)容:
        // 檢查服務(wù)是否已啟動(dòng)
        // 如果是客戶端對(duì)服務(wù)端的健康檢查,則直接響應(yīng)
        // ----------------------------
        // 從對(duì)應(yīng)的請(qǐng)求處理器
        RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(type);
        // 省略:no handler found. 的異常處理
        // ----------------------------
        String connectionId = GrpcServerConstants.CONTEXT_KEY_CONN_ID.get();
        
        // 省略:檢查連接是否正常.
        Object parseObj = null;
        parseObj = GrpcUtils.parse(grpcRequest);
		// 省略:轉(zhuǎn)換異常、無(wú)效請(qǐng)求異常
        
        Request request = (Request) parseObj;
			// 從ConnectionManager獲取到對(duì)應(yīng)的Connection
            Connection connection = connectionManager.getConnection(GrpcServerConstants.CONTEXT_KEY_CONN_ID.get());
            // 組裝RequestMeta
            RequestMeta requestMeta = new RequestMeta();
            requestMeta.setClientIp(connection.getMetaInfo().getClientIp());
            requestMeta.setConnectionId(GrpcServerConstants.CONTEXT_KEY_CONN_ID.get());
            requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
            requestMeta.setLabels(connection.getMetaInfo().getLabels());
            connectionManager.refreshActiveTime(requestMeta.getConnectionId());
            // 調(diào)用requestHandler處理請(qǐng)求
            Response response = requestHandler.handleRequest(request, requestMeta);
            Payload payloadResponse = GrpcUtils.convert(response);
            traceIfNecessary(payloadResponse, false);
            responseObserver.onNext(payloadResponse);
            responseObserver.onCompleted();
    }

這些便是通用請(qǐng)求處理的核心邏輯。現(xiàn)在我們便來(lái)看InstanceRequest的處理com.alibaba.nacos.naming.remote.rpc.handler.InstanceRequestHandler

    @Override
    @Secured(action = ActionTypes.WRITE)
    public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {
        Service service = Service
                .newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
        switch (request.getType()) {
            case NamingRemoteConstants.REGISTER_INSTANCE:
                return registerInstance(service, request, meta);
            case NamingRemoteConstants.DE_REGISTER_INSTANCE:
                return deregisterInstance(service, request, meta);
            default:
                throw new NacosException
        }
    }
    private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta)
            throws NacosException {
        // 1. 注冊(cè)實(shí)例
        clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());
        // 2. 發(fā)布事件:RegisterInstanceTraceEvent
        NotifyCenter.publishEvent(new RegisterInstanceTraceEvent(System.currentTimeMillis(),
                meta.getClientIp(), true, service.getNamespace(), service.getGroup(), service.getName(),
                request.getInstance().getIp(), request.getInstance().getPort()));
        return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);
    }

// 注冊(cè)實(shí)例:
// com.alibaba.nacos.naming.core.v2.service.impl.EphemeralClientOperationServiceImpl#registerInstance

    @Override
    public void registerInstance(Service service, Instance instance, String clientId) throws NacosException {
        NamingUtils.checkInstanceIsLegal(instance);
    	// 從ServiceManager獲取已注冊(cè)服務(wù)。而我們當(dāng)前是要注冊(cè)實(shí)例,所以,這個(gè)方法肯定還內(nèi)含玄機(jī)
        Service singleton = ServiceManager.getInstance().getSingleton(service);
        // 省略:如果獲取到的是持久化實(shí)例,意味著當(dāng)前注冊(cè)臨時(shí)實(shí)例沖突了,返回異常。
        Client client = clientManager.getClient(clientId);

        InstancePublishInfo instanceInfo = getPublishInfo(instance);
        // 記錄當(dāng)前客戶端發(fā)布的實(shí)例
        client.addServiceInstance(singleton, instanceInfo);
        client.setLastUpdatedTime();
        client.recalculateRevision();
        // 發(fā)布服務(wù)注冊(cè)事件
        NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
        NotifyCenter
                .publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
    }

	// com.alibaba.nacos.naming.core.v2.ServiceManager
    /**
     * Get singleton service. Put to manager if no singleton.
     * 獲取單例服務(wù)(單例意味著整個(gè)應(yīng)用只有一個(gè)對(duì)象),如果不存在,則注冊(cè)到Manager
     */
    public Service getSingleton(Service service) {
    	// 如果不存在就注冊(cè)
        singletonRepository.computeIfAbsent(service, key -> {
        	// 發(fā)布服務(wù)元信息數(shù)據(jù)事件。不過該事件對(duì)于持久實(shí)例才有用處。
            NotifyCenter.publishEvent(new MetadataEvent.ServiceMetadataEvent(service, false));
            return service;
        });
        Service result = singletonRepository.get(service);
        // 將服務(wù)登記到namespace
        namespaceSingletonMaps.computeIfAbsent(result.getNamespace(), namespace -> new ConcurrentHashSet<>());
        namespaceSingletonMaps.get(result.getNamespace()).add(result);
        return result;
    }
	// 再看看ClientOperationEvent.ClientRegisterServiceEvent
	// > com.alibaba.nacos.naming.core.v2.index.ClientServiceIndexesManager#onEvent
	//  > com.alibaba.nacos.naming.core.v2.index.ClientServiceIndexesManager#handleClientOperation
	//   > com.alibaba.nacos.naming.core.v2.index.ClientServiceIndexesManager#addPublisherIndexes
	// 登記發(fā)布服務(wù)的客戶端
    private void addPublisherIndexes(Service service, String clientId) {
        publisherIndexes.computeIfAbsent(service, key -> new ConcurrentHashSet<>());
        publisherIndexes.get(service).add(clientId);
        NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
    }

小結(jié)

我們總結(jié)一下,以上涉及到的概念。

概念 管理者 描述
服務(wù) com.alibaba.nacos.naming.core.v2.pojo.Service ServiceManager

除了這個(gè)概念,實(shí)際上我們還看到Client的內(nèi)部結(jié)構(gòu):
AbstractClient:

  • 記錄客戶端發(fā)布的服務(wù):ConcurrentHashMap<Service, InstancePublishInfo> publishers
  • 記錄客戶端訂閱的服務(wù):ConcurrentHashMap<Service, Subscriber> subscribers

    這個(gè)點(diǎn)其實(shí)要到訂閱服務(wù)請(qǐng)求才會(huì)分析到,但為了信息不會(huì)太分散,所以就放到一起了。

ClientServiceIndexesManager

  • 客戶端索引管理者。這里的索引指的是,通過Service快速找到客戶端,只是客戶端有ClientManager,如果這里再存一份也不合適,不利于數(shù)據(jù)維護(hù)。因此這里存的是clientId。估計(jì)也是如此,他才叫客戶端索引管理者。

查詢和訂閱實(shí)例

> com.alibaba.nacos.client.naming.NacosNamingService#getAllInstances(java.lang.String, java.lang.String, java.util.List<java.lang.String>)
 > com.alibaba.nacos.client.naming.NacosNamingService#getAllInstances(java.lang.String, java.lang.String, java.util.List<java.lang.String>, boolean)
  > com.alibaba.nacos.client.naming.NacosNamingService#getAllInstances(String serviceName, String groupName, List<String> clusters, boolean subscribe)
   > com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy#subscribe
    > com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy#doSubscribe
    public ServiceInfo doSubscribe(String serviceName, String groupName, String clusters) throws NacosException {
    	// 重點(diǎn)SubscribeServiceRequest,看服務(wù)端代碼需要知道是什么請(qǐng)求
        SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, groupName, serviceName, clusters, true);
        SubscribeServiceResponse response = requestToServer(request, SubscribeServiceResponse.class);
        redoService.subscriberRegistered(serviceName, groupName, clusters);
        return response.getServiceInfo();
    }
    

服務(wù)端

	// com.alibaba.nacos.naming.remote.rpc.handler.SubscribeServiceRequestHandler
    @Override
    @Secured(action = ActionTypes.READ)
    public SubscribeServiceResponse handle(SubscribeServiceRequest request, RequestMeta meta) throws NacosException {
        String namespaceId = request.getNamespace();
        String serviceName = request.getServiceName();
        String groupName = request.getGroupName();
        String app = request.getHeader("app", "unknown");
        String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
        Service service = Service.newService(namespaceId, groupName, serviceName, true);
        // 訂閱者
        Subscriber subscriber = new Subscriber(meta.getClientIp(), meta.getClientVersion(), app, meta.getClientIp(),
                namespaceId, groupedServiceName, 0, request.getClusters());
        // 服務(wù)信息,這里有幾個(gè)參數(shù)是需要通過方法來(lái)獲取的
        // serviceStorage.getData(service),負(fù)責(zé)獲取實(shí)例信息。會(huì)從ClientManage收集
        // ServiceUtil.selectInstancesWithHealthyProtection,肩負(fù)著過濾cluster、健康實(shí)例,并執(zhí)行自動(dòng)保護(hù)機(jī)制。
        ServiceInfo serviceInfo = ServiceUtil.selectInstancesWithHealthyProtection(serviceStorage.getData(service),
                metadataManager.getServiceMetadata(service).orElse(null), subscriber.getCluster(), false,
                true, subscriber.getIp());
        if (request.isSubscribe()) {
        	// 訂閱服務(wù)
            clientOperationService.subscribeService(service, subscriber, meta.getConnectionId());
            // 發(fā)布訂閱事件
            NotifyCenter.publishEvent(new SubscribeServiceTraceEvent(System.currentTimeMillis(),
                    meta.getClientIp(), service.getNamespace(), service.getGroup(), service.getName()));
        } else {
        	// 取消訂閱
            clientOperationService.unsubscribeService(service, subscriber, meta.getConnectionId());
            NotifyCenter.publishEvent(new UnsubscribeServiceTraceEvent(System.currentTimeMillis(),
                    meta.getClientIp(), service.getNamespace(), service.getGroup(), service.getName()));
        }
        return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);
    }
	
	// > com.alibaba.nacos.naming.core.v2.index.ServiceStorage#getData
	//  > com.alibaba.nacos.naming.core.v2.index.ServiceStorage#getPushData
	//   > com.alibaba.nacos.naming.core.v2.index.ServiceStorage#getAllInstancesFromIndex
	// > com.alibaba.nacos.naming.utils.ServiceUtil#selectInstancesWithHealthyProtection(com.alibaba.nacos.api.naming.pojo.ServiceInfo, com.alibaba.nacos.naming.core.v2.metadata.ServiceMetadata, java.lang.String, boolean, boolean, java.lang.String)
	//  > com.alibaba.nacos.naming.utils.ServiceUtil#doSelectInstances
	// 上面是調(diào)用路徑,這里把生產(chǎn)數(shù)據(jù)的方法重點(diǎn)撈出來(lái)
	// ServiceStorage的數(shù)據(jù)生產(chǎn)
    public ServiceInfo getPushData(Service service) {
        ServiceInfo result = emptyServiceInfo(service);
        if (!ServiceManager.getInstance().containSingleton(service)) {
            return result;
        }
        Service singleton = ServiceManager.getInstance().getSingleton(service);
        result.setHosts(getAllInstancesFromIndex(singleton));
        // 從ServiceManager拿到服務(wù)的實(shí)例信息,并登記到ServiceStorage#serviceDataIndexes中
        serviceDataIndexes.put(singleton, result);
        return result;
    }
    private List<Instance> getAllInstancesFromIndex(Service service) {
        Set<Instance> result = new HashSet<>();
        Set<String> clusters = new HashSet<>();
        
        for (String each : serviceIndexesManager.getAllClientsRegisteredService(service)) {
        	// 獲取實(shí)例信息
            Optional<InstancePublishInfo> instancePublishInfo = getInstanceInfo(each, service);
            if (instancePublishInfo.isPresent()) {
                InstancePublishInfo publishInfo = instancePublishInfo.get();
                //If it is a BatchInstancePublishInfo type, it will be processed manually and added to the instance list
                if (publishInfo instanceof BatchInstancePublishInfo) {
                    BatchInstancePublishInfo batchInstancePublishInfo = (BatchInstancePublishInfo) publishInfo;
                    List<Instance> batchInstance = parseBatchInstance(service, batchInstancePublishInfo, clusters);
                    result.addAll(batchInstance);
                } else {
                    Instance instance = parseInstance(service, instancePublishInfo.get());
                    result.add(instance);
                    clusters.add(instance.getClusterName());
                }
            }
        }
        // cache clusters of this service
        // 緩存集群信息
        serviceClusterIndex.put(service, clusters);
        return new LinkedList<>(result);
    }
        
    private Optional<InstancePublishInfo> getInstanceInfo(String clientId, Service service) {
    	// 通過客戶端ID,獲取到Client,進(jìn)而從其獲取客戶端發(fā)布的服務(wù)。
        Client client = clientManager.getClient(clientId);
        if (null == client) {
            return Optional.empty();
        }
        return Optional.ofNullable(client.getInstancePublishInfo(service));
    }

從查詢實(shí)例這里,我們看到有個(gè)數(shù)據(jù)存儲(chǔ):ServiceStorage。重要的是,這個(gè)雖然叫存儲(chǔ),但是實(shí)際上里面的數(shù)據(jù)卻是從別處獲取來(lái)的。來(lái)源于:ServiceManager、ServiceIndexesManager、ClientManager。從這個(gè)角度說(shuō),更像是個(gè)緩存。

總結(jié)

上面的整了一堆源代碼,容易看煩了。感興趣的,可以根據(jù)上面的源碼深入看看。為了方便大家,我畫了圖給大家:
在這里插入圖片描述
為了讓大家重點(diǎn)看到數(shù)據(jù)生產(chǎn)過程:
在這里插入圖片描述
從圖中,我們可以看到,nacos2.x的數(shù)據(jù)結(jié)構(gòu)并不像官方的Service->Group->Instance。而是按照Connection、Client、Service分別通過對(duì)應(yīng)的管理器進(jìn)行管理。此外,為了避免數(shù)據(jù)多處存儲(chǔ),還有ClientServiceIndexesManager作為Client和Service的橋梁。
除此之外,還有ServiceStorage,作為數(shù)據(jù)緩存。不過,當(dāng)我們深入了解ServiceStorage時(shí),會(huì)發(fā)現(xiàn)他的數(shù)據(jù)一致性/數(shù)據(jù)的更新,是在給訂閱服務(wù)的客戶端定時(shí)推送時(shí)通過調(diào)用com.alibaba.nacos.naming.core.v2.index.ServiceStorage#getPushData來(lái)實(shí)現(xiàn)的。個(gè)人認(rèn)為這是有可以優(yōu)化空間的,他們完全可以通過各種事件來(lái)監(jiān)聽實(shí)例的生死來(lái)更新數(shù)據(jù)。

總而言之,如果不算ServiceStorage這個(gè)緩存,那么數(shù)據(jù)主要存在于一下的Manager中:
ConnectionManager、ClientManager、ServiceManager、ClientServiceIndexesManager。

到這里,可能有同學(xué)就有疑問了。那么Group、Cluster這些概念去哪了呢?
這些概念都成為了屬性/字段了。
com.alibaba.nacos.naming.core.v2.pojo.Service#group
com.alibaba.nacos.api.naming.pojo.Instance#clusterName
即使在ServiceStorage封裝ServiceInfo時(shí),他們也是作為屬性來(lái)存儲(chǔ)的。通過ServiceUtil來(lái)過濾目標(biāo)實(shí)例。

最后,提醒大家一下,我們這里只是分析了臨時(shí)實(shí)例。是最常用的場(chǎng)景。當(dāng)然,如果我們用Nacos的持久實(shí)例,SpringCloud也就自然支持了持久實(shí)例。不過,咱們不深究了,感興趣的同學(xué),可以順著往下挖一挖持久實(shí)例。

后記

這種深度刨析源碼、深挖一個(gè)技術(shù)細(xì)節(jié)的實(shí)現(xiàn),太費(fèi)時(shí)間、也太費(fèi)篇幅了。我自己都感覺差點(diǎn)把整個(gè)nacos的源碼都搬上來(lái)了。莫見怪。。。
關(guān)于nacos的一致性協(xié)議,就不在這里聊了,這個(gè)東西得單獨(dú)倒騰,還要與其他分布式中間件相互對(duì)比,還有理論。。
下次,咱們先往后聊OpenFeign。

推薦

Nacos的實(shí)現(xiàn)原理在官網(wǎng)有一電子書《Nacos架構(gòu)&原理》,想要了解頂層設(shè)計(jì)原理的同學(xué),建議看看。

原文鏈接:https://blog.csdn.net/Evan_L/article/details/132513335

  • 上一篇:沒有了
  • 下一篇:沒有了
欄目分類
最近更新