日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

注冊中心eureka的介紹及源碼探索

作者:Gimtom 更新時間: 2022-07-30 編程語言

1. 背景

1.1. 注冊中心是什么

注冊中心可以說是微服務架構中的”通訊錄“,它記錄了服務和服務地址的映射關系。在分布式架構中,服務會注冊到這里,當服務需要調用其它服務時,就到這里找到服務的地址,進行調用。
在這里插入圖片描述

1.2. 為什么需要注冊中心

在分布式系統中,服務可能有上千個,然后每個服務都有好幾個實例,如果通過 ip + port 進行服務之間通信則會使系統變得難維護,并且還需要考慮其他復雜的問題:

  1. 服務注冊后,如何被及時發現
  2. 服務宕機后,如何及時下線
  3. 服務如何有效的水平擴展
  4. 如何獲取服務列表
  5. 注冊中心如何實現自身的高可用

2. Eureka

2.1. 世面上的流行的注冊中心

組件名稱 組件簡介
Zookeeper zookeeper是一個分布式協調工具,可以實現注冊中心功能
Eureka springcloud的注冊中心
Consul Consul 簡化了分布式環境中的服務的注冊和發現流程,國外比較流行
Nacos Nacos 致力于幫助您發現、配置和管理微服務。SpringCloudAlibaba

2.2. Eureka

https://github.com/Netflix/eureka
在這里插入圖片描述
服務注冊中心(可以是一個集群),對外暴露自己的地址

注冊中心有 Eureka Service, Eureka Client,Eureka Client又分為提供者和消費者;

(某一個服務既可以是提供者也可以是消費者)

服務提供者

  • 服務注冊: 啟動的時候會通過發送REST請求的方式將自己注冊到Eureka Server上,同時帶上了自身服務的一些元數據信息。
  • 服務續約: 在注冊完服務之后,服務提供者會維護一個心跳(默認30S) 用來持續告訴Eureka Server: "我還活著 ”
  • 服務下線: 當服務實例進行正常的關閉操作時,它會觸發一個服務下線的REST請求 給Eureka Server, 告訴服務注冊中心:“我要下線了 ”。

服務消費者

  • 獲取服務: 服務消費者(Eureka Client)在啟動的時候,會發送一個REST請求給Eureka Server,獲 取上面注冊的服務清單,并且緩存在Eureka Client本地,默認緩存30秒 (eureka.client.registryFetchIntervalSeconds)。同時,為了性能考慮,Eureka Server也會維護一份只讀的服務清單緩存,該緩存每隔30秒更新一次。
  • 服務調用: 服務消費者在獲取服務清單后,通過服務名可以獲得具體提供服務的實例名和該實例的元數據信息。在進行服務調用的時候,優先訪問同處一個Zone中的服務提供方。

Eureka Server(服務注冊中心)

  • 失效剔除:【在關閉自我保護才有效】 默認每隔一段時間(默認為60秒) 將當前清單中超時(默認為90秒)沒有續約的服務剔除出去。

  • 自我保護: EurekaServer 在運行期間,如果在15分鐘內超過85%的客戶端節點都沒有正常的心跳(通常由于網絡不穩定導致)。 Eureka Server會將當前的實例注冊信息保護起來, 讓這些實例不會過期,盡可能保護這些注冊信息。此時會出現以下幾種情況:

    • Eureka Server不再從注冊列表中移除因為長時間沒收到心跳而應該過期的服務。
    • Eureka Server仍然能夠接受新服務的注冊和查詢請求,但是不會被同步到其它節點上,保證當前節點依然可用。
    • 當網絡穩定時,當前Eureka Server新的注冊信息會被同步到其它節點中。

    因此Eureka Server可以很好的應對因網絡故障導致部分節點失聯的情況,而不會像ZK那樣如果有一半不可用的情況會導 致整個集群不可用而變成癱瘓。

3.1. 服務注冊

3.1.1. Eureka-Client

啥時候會被注冊

  1. 當客戶端剛剛啟動的時候
  2. 當客戶端的instance信息發生改動
    在這里插入圖片描述

當我們的客戶端引入了Eureka-Client,當主方法啟動時,@SpringBootApplication會掃描所有的META-INF/spring.factories文件下的 xxxAutoConfiguration。這時候 EurekaClientAutoConfiguration 也會被加載。

上面這段代碼,很簡單,就是實例化了一個Bean,主要是這個Bean實現了SmartLifecycle, 當重寫方法 isAutoStartup() 返回值為true,會啟動start()方法。
下面可以詳細看看這個代碼。

EurekaClientAutoConfiguration.java
@Bean
public EurekaAutoServiceRegistration eurekaAutoServiceRegistration(ApplicationContext context, EurekaServiceRegistry registry, EurekaRegistration registration) {

   // 重點代碼
   return new EurekaAutoServiceRegistration(context, registry, registration);
}

EurekaAutoServiceRegistration.java
public class EurekaAutoServiceRegistration implements AutoServiceRegistration, SmartLifecycle, Ordered { 
    
	@Override
	public void start() {
		// ...
		
		// 該實例還未啟動
		if (!this.running.get() && this.registration.getNonSecurePort() > 0) {
            // 重點;自動去注冊服務
			this.serviceRegistry.register(this.registration);
            // 發布 節點注冊事件
			this.context.publishEvent(
					new InstanceRegisteredEvent<>(this, this.registration.getInstanceConfig()));
			this.running.set(true);
		}
	}
	
	@Override
	public boolean isAutoStartup() {
		return true;
	}
}


InstanceInfoReplicator.java
public boolean onDemandUpdate() {
    if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
        if (!scheduler.isShutdown()) {
            scheduler.submit(new Runnable() {
                @Override
                public void run() {
                    // ...
                    
                    // 調用run方法
                    InstanceInfoReplicator.this.run();
                }
            });
            return true;
        } else {
            return false;
        }
    } else {
        return false;
    }
}

public void run() {
    try {
         // 刷新實例信息。
        discoveryClient.refreshInstanceInfo();
        
        Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
        if (dirtyTimestamp != null) {
        
            // 注冊自己的服務
            discoveryClient.register();
            instanceInfo.unsetIsDirty(dirtyTimestamp);
        }
    } catch (Throwable t) {
    
    } finally {
        Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
        scheduledPeriodicRef.set(next);
    }
}

第二種是當我們客戶端instance信息發生變化

private void initScheduledTasks() {
    //省略, 刷新緩存的定時器
    
    // 監聽instance的狀態變更
    instanceInfoReplicator = new InstanceInfoReplicator(
            this,
            instanceInfo,
            clientConfig.getInstanceInfoReplicationIntervalSeconds(),
            2); // burstSize

    statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
        @Override
        public String getId() {
            return "statusChangeListener";
        }

        @Override
        public void notify(StatusChangeEvent statusChangeEvent) {
            // ...
            // 調用方法
            instanceInfoReplicator.onDemandUpdate();
        }
    };

    if (clientConfig.shouldOnDemandUpdateStatusChange()) {
        applicationInfoManager.registerStatusChangeListener(statusChangeListener);
    }

    instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
}

總結 :服務注冊分為2種。
第一種: 當應用啟動的時候,如果應用開啟了自動注冊(默認開啟), 那么在自動配置類加載的時候,會通過EurekaAutoServiceRegistration實例化的時候,去改變instance的status,然后調用注冊。
第二種: 主要應用于啟動之后,當應用的信息發生改變之后,每40每秒執行一次的線程,檢測到了,也會自動去注冊一次。

DiscoveryClient.register()

DiscoveryClient.java

boolean register() throws Throwable {
    EurekaHttpResponse<Void> httpResponse;
    try {
        //發起HTTP請求
        httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
    } catch (Exception e) {
        
    }
    return httpResponse.getStatusCode() == 204;
}

使用的Jersey框架來完成http的請求調用

AbstractJerseyEurekaHttpClient.java
@Override
public EurekaHttpResponse<Void> register(InstanceInfo info) {
    // 請求url
    String urlPath = "apps/" + info.getAppName();
    ClientResponse response = null;
    try {
        Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
        addExtraHeaders(resourceBuilder);
        response = resourceBuilder
                .header("Accept-Encoding", "gzip")
                .type(MediaType.APPLICATION_JSON_TYPE)
                .accept(MediaType.APPLICATION_JSON)
                 // post請求;請求參數
                .post(ClientResponse.class, info);
        return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
    } finally {
        // ...
    }
}

POST 請求 Eureka-Server 的 apps/${APP_NAME} 接口,參數為 InstanceInfo ,實現注冊實例信息的注冊。

3.1.2. Eureka-Service

ApplicationResource.java
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
                            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
    // 參數校驗
    // ...
    // 重點代碼
    registry.register(info, "true".equals(isReplication));
    return Response.status(204).build();
}

在這里插入圖片描述

瀏覽器發送 localhost:7001/eureka/apps

<applications>
    <versions__delta>1</versions__delta>
    # applicaitons的組成的hash
    <apps__hashcode>UP_3_</apps__hashcode>
    <application>
    # 應用名
    <name>CLOUD-PROVIDER-PAYMENT</name>
    # 實例
    <instance>
        # 實例ID需要唯一
        <instanceId>LAPTOP-RHVGCSK0:payment:8001</instanceId>
        <hostName>192.168.31.193</hostName>
        <app>CLOUD-PROVIDER-PAYMENT</app>
        <ipAddr>192.168.31.193</ipAddr>
        <status>UP</status>
        <overriddenstatus>UNKNOWN</overriddenstatus>
        <port enabled="true">8001</port>
        <securePort enabled="false">443</securePort>
        <countryId>1</countryId>
        <dataCenterInfo class="com.netflix.appinfo.InstanceInfo$DefaultDataCenterInfo">
            <name>MyOwn</name>
        </dataCenterInfo>
        <leaseInfo>
            <renewalIntervalInSecs>30</renewalIntervalInSecs>
            <durationInSecs>90</durationInSecs>
            <registrationTimestamp>1617633199552</registrationTimestamp>
            <lastRenewalTimestamp>1617633829600</lastRenewalTimestamp>
            <evictionTimestamp>0</evictionTimestamp>
            <serviceUpTimestamp>1617633199552</serviceUpTimestamp>
        </leaseInfo>
        <metadata>
            <management.port>8001</management.port>
        </metadata>
        <homePageUrl>http://192.168.31.193:8001/</homePageUrl>
        <statusPageUrl>http://192.168.31.193:8001/actuator/info</statusPageUrl>
        <healthCheckUrl>http://192.168.31.193:8001/actuator/health</healthCheckUrl>
        <vipAddress>cloud-provider-payment</vipAddress>
        <secureVipAddress>cloud-provider-payment</secureVipAddress>
        <isCoordinatingDiscoveryServer>false</isCoordinatingDiscoveryServer>
        <lastUpdatedTimestamp>1617633199552</lastUpdatedTimestamp>
        <lastDirtyTimestamp>1617633199491</lastDirtyTimestamp>
        <actionType>ADDED</actionType>
    </instance>
    </application>
    <application>
        <name>ORDER80</name>
        # 多個實例
        <instance>
            <instanceId>LAPTOP-RHVGCSK0:order:80</instanceId>
            <hostName>192.168.31.193</hostName>
            <app>ORDER80</app>
            <ipAddr>192.168.31.193</ipAddr>
            <status>UP</status>
            <port enabled="true">80</port>
            <securePort enabled="false">443</securePort>
            <countryId>1</countryId>
            <leaseInfo>
                <renewalIntervalInSecs>30</renewalIntervalInSecs>
                <durationInSecs>90</durationInSecs>
                <registrationTimestamp>1617633135195</registrationTimestamp>
                <lastRenewalTimestamp>1617633825249</lastRenewalTimestamp>
                <evictionTimestamp>0</evictionTimestamp>
                <serviceUpTimestamp>1617633135195</serviceUpTimestamp>
            </leaseInfo>
            <metadata>
                <management.port>80</management.port>
            </metadata>
            <homePageUrl>http://192.168.31.193:80/</homePageUrl>
            <statusPageUrl>http://192.168.31.193:80/actuator/info</statusPageUrl>
            <healthCheckUrl>http://192.168.31.193:80/actuator/health</healthCheckUrl>
            <vipAddress>order80</vipAddress>
            <secureVipAddress>order80</secureVipAddress>
            <isCoordinatingDiscoveryServer>false</isCoordinatingDiscoveryServer>
            <lastUpdatedTimestamp>1617633135195</lastUpdatedTimestamp>
            <lastDirtyTimestamp>1617633135119</lastDirtyTimestamp>
            <actionType>ADDED</actionType>
        </instance>
        <instance>
            <instanceId>LAPTOP-RHVGCSK0:order:81</instanceId>
            <hostName>192.168.31.193</hostName>
            <app>ORDER80</app>
            <ipAddr>192.168.31.193</ipAddr>
            <status>UP</status>
            <overriddenstatus>UNKNOWN</overriddenstatus>
            <port enabled="true">81</port>
            <securePort enabled="false">443</securePort>
            <countryId>1</countryId>
            <dataCenterInfo class="com.netflix.appinfo.InstanceInfo$DefaultDataCenterInfo">
            <name>MyOwn</name>
            </dataCenterInfo>
            <leaseInfo>
                <renewalIntervalInSecs>30</renewalIntervalInSecs>
                <durationInSecs>90</durationInSecs>
                <registrationTimestamp>1617631878936</registrationTimestamp>
                <lastRenewalTimestamp>1617633829226</lastRenewalTimestamp>
                <evictionTimestamp>0</evictionTimestamp>
                <serviceUpTimestamp>1617631878937</serviceUpTimestamp>
            </leaseInfo>
            <metadata>
                <management.port>81</management.port>
            </metadata>
            <homePageUrl>http://192.168.31.193:81/</homePageUrl>
            <statusPageUrl>http://192.168.31.193:81/actuator/info</statusPageUrl>
            <healthCheckUrl>http://192.168.31.193:81/actuator/health</healthCheckUrl>
            <vipAddress>order80</vipAddress>
            <secureVipAddress>order80</secureVipAddress>
            <isCoordinatingDiscoveryServer>false</isCoordinatingDiscoveryServer>
            <lastUpdatedTimestamp>1617631878937</lastUpdatedTimestamp>
            <lastDirtyTimestamp>1617631878931</lastDirtyTimestamp>
            <actionType>ADDED</actionType>
        </instance>
    </application>
</applications>

上面的register方法,最終調用的是PeerAwareInstanceRegistryImpl的方法

PeerAwareInstanceRegistryImpl.java
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
   
    // 實例注冊
    super.register(info, leaseDuration, isReplication);
    // 復制到同等服務節點上去
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}

需要先了解一下Lease這個對象,因為Eureka-Server最終處理注冊信息的時候,都會轉化為這個對象來處理。

public class Lease<T> {


    // 默認的過期時間 90s
    public static final int DEFAULT_DURATION_IN_SECS = 90;

    // 實例信息
    private T holder;
    
    // 服務剔除是時間,當服務下線的時候,會過來更新這個時間戳registrationTimestamp
    private long evictionTimestamp;
    // 服務注冊的時間
    private long registrationTimestamp;
    // 服務啟動時間 ,當客戶端在注冊的時候,instanceInfo的status為UP的時候,則更新這個 時間戳
    private long serviceUpTimestamp;
    // Make it volatile so that the expiration task would see this quicker
    // 最后更新時間,每次續約的時候,都會更新這個時間戳,在判斷實例是否過期時,需要用到這個屬性。
    private volatile long lastUpdateTimestamp;
    // 過期時間
    private long duration;

    /**
     * 服務是否過期
     */
    public boolean isExpired(long additionalLeaseMs) {
        return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
    }

服務注冊重要代碼

// eureka的注冊表
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
            = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();

 public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
    try {
        // 上讀鎖
        read.lock();
         // 通過服務名從本地MAP里面獲取當前服務列表。
        Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
        REGISTER.increment(isReplication);
        // 如果第一次進來,那么gMap為空,則創建一個ConcurrentHashMap放入到registry里面去
        if (gMap == null) {
            final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
            // putIfAbsent方法主要是在向ConcurrentHashMap中添加鍵—值對的時候,它會先判斷該鍵值對是否已經存在。
            // 如果不存在(新的entry),那么會向map中添加該鍵值對,并返回null。
            // 如果已經存在,那么不會覆蓋已有的值,直接返回已經存在的值。
            // 線程安全操作
            gMap =
            registry.putIfAbsent(registrant.getAppName(), gNewMap);
            // 表明map中確實不存在,則設置gMap為最新創建的那個
            if (gMap == null) {
                gMap = gNewMap;
            }
        }
        // 從MAP中查詢已經存在的Lease信息 (比如第二次來)
        Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
       
        
        // 構建一個最新的Lease信息
        Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
        if (existingLease != null) {
            // 如果該實例是第一次啟動,設置啟動啟動
            lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
        }
        // 放入本地Map中
        gMap.put(registrant.getId(), lease);
        
         // 設置注冊類型為添加
        registrant.setActionType(ActionType.ADDED);
        // 最近變更記錄隊列,記錄了實例的每次變化, 用于注冊信息的增量獲取、
        recentlyChangedQueue.add(new RecentlyChangedItem(lease));
        registrant.setLastUpdatedTimestamp();
         // 清除讀寫緩存
        invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
     
    } finally {
        read.unlock();
    }

3.2. 拉取服務列表

3.2.1. Eureka-Client

在這里插入圖片描述

還是在 initScheduledTasks() 初始化所有的定時任務 這個方法中:

 private void initScheduledTasks() {
    if (clientConfig.shouldFetchRegistry()) {
        // registry cache refresh timer
        // 拉取服務30秒;每30秒刷新一次
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        scheduler.schedule(
                new TimedSupervisorTask(
                        "cacheRefresh",
                        scheduler,
                        cacheRefreshExecutor,
                        registryFetchIntervalSeconds,
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new CacheRefreshThread()
                ),
                registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }
    // 其他代碼 
 }

定時更新服務注冊列表線程CacheRefreshThread

class CacheRefreshThread implements Runnable {
        public void run() {
            refreshRegistry();
        }
    }

    @VisibleForTesting
    void refreshRegistry() {
        try {
            // ...
            
            // 重要代碼,拉取服務列表
            boolean success = fetchRegistry(remoteRegionsModified);
            if (success) {
                registrySize = localRegionApps.get().size();
                lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
            }
        } catch (Throwable e) {
           
        }
    }
}

由上可以看到,系統在啟動的時候,初始化了一個定時器,每30秒一次,用來刷新本地緩存信息。

獲得實例信息

/**
 * 客戶端的服務列表
 */
private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>();

private boolean fetchRegistry(boolean forceFullRegistryFetch) {
    Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
    try {
        // 取出之前獲取的服務列表
        Applications applications = getApplications();
        
        // 判斷多個條件,確定是否觸發全量更新,如下任一個滿足都會全量更新:
        // 1. 是否禁用增量
        // 2. 是否對某個region特別關注
        // 3. 外部傳參是否要全量拉取
        // 4. 本地緩存服務列表是否為empty
        if (clientConfig.shouldDisableDelta()
                || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                || forceFullRegistryFetch
                || (applications == null)
                || (applications.getRegisteredApplications().size() == 0)
                || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
        {
            
            // 拉取全量信息
            getAndStoreFullRegistry();
        } else {
            // 拉取并更新增量信息
            getAndUpdateDelta(applications);
        }
        // 重新計算hash值
        applications.setAppsHashCode(applications.getReconcileHashCode());
        logTotalInstances();
    } catch (Throwable e) {
    } finally {
        if (tracer != null) {
            tracer.stop();
        }
    }
    return true;
}

全量獲取

private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>();

 private void getAndStoreFullRegistry() throws Throwable {
    long currentUpdateGeneration = fetchRegistryGeneration.get();

    Applications apps = null;
    EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
            ?
            // 全量獲取
            eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
            : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
        apps = httpResponse.getEntity();
    }
   
    if (apps == null) {
        logger.error("The application is null for some reason. Not storing this information");
    } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
        // 設置到本地緩存里面去
        localRegionApps.set(this.filterAndShuffle(apps));
    } else {
        // ...
    }
}

增量獲取

DiscoveryClient.java
private void getAndUpdateDelta(Applications applications) throws Throwable {
    long currentUpdateGeneration = fetchRegistryGeneration.get();
    
    // 增量獲取信息
    Applications delta = null;
    EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
        delta = httpResponse.getEntity();
    }
    // 增量獲取為空,則全量返回
    if (delta == null) {
        getAndStoreFullRegistry();
    }
    // CAS
    else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
        String reconcileHashCode = "";
        // 這里設置原子鎖的原因是怕某次調度網絡請求時間過長,導致同一時間有多線程拉取到增量信息并發修改
        if (fetchRegistryUpdateLock.tryLock()) {
            try {
                //  將獲取到的增量信息和本地緩存信息合并
                updateDelta(delta);
                // 計算本地的hash; ${status}_${count}_;DOWN _2_UP_100_
                reconcileHashCode = getReconcileHashCode(applications);
            } finally {
                fetchRegistryUpdateLock.unlock();
            }
        } else {
            // ..
        }
        // 如果本地的hash與service的hash不一致,全量去拉取
        if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
            reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
        }
    } else {
        // ...
    }
}
  1. 發起http請求,將服務端的客戶端變化的信息拉取過來,如: register, cancle, modify 有過這些操作的數據
  2. 上鎖,防止某次調度網絡請求時間過長,導致同一時間有多線程拉取到增量信息并發修改
  3. 將請求過來的增量數據和本地的數據做合并
  4. 計算hashCode
  5. 如果hashCode不一致,增量更新錯誤,則又會去服務端發起一次全量獲取

獲取注冊信息

在這里插入圖片描述

ApplicationsResource.java
@Path("/{version}/apps")
public class ApplicationsResource {
// 省略代碼

@GET
public Response getContainers(@PathParam("version") String version,
                              @HeaderParam(HEADER_ACCEPT) String acceptHeader,
                              @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
                              @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
                              @Context UriInfo uriInfo,
                              @Nullable @QueryParam("regions") String regionsStr) {

    // ...
    // 構建全量數據緩存key
    Key cacheKey = new Key(Key.EntityType.Application,
            ResponseCacheImpl.ALL_APPS,
            keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
    );

    Response response;
    
    // 重點代碼
    response = Response.ok(responseCache.get(cacheKey))
            .build();
    
    return response;
}

responseCache.get(cacheKey)從緩存中獲取

// ...

Value getValue(final Key key, boolean useReadOnlyCache) {
    Value payload = null;
    try {
        // 是否使用只讀緩存,默認為true
        if (useReadOnlyCache) {
            // 從緩存中獲取數據
            final Value currentPayload = readOnlyCacheMap.get(key);
            // 如果不為空,直接返回數據
            if (currentPayload != null) {
                payload = currentPayload;
            }
            // 如果為空
            else {
                // 從讀寫讀寫中獲取
                payload = readWriteCacheMap.get(key);
                // 同時將數據放入只讀緩存中
                readOnlyCacheMap.put(key, payload);
            }
        } else {
            // 從讀寫讀寫中獲取
            payload = readWriteCacheMap.get(key);
        }
    } catch (Throwable t) {
    }
    return payload;
}
// 只讀緩存
private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();
// 讀寫緩存
private final LoadingCache<Key, Value> readWriteCacheMap;

// 在構造器中實現邏輯
ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
    this.serverConfig = serverConfig;
    this.serverCodecs = serverCodecs;
    // 是否使用只讀緩存,默認為true
    this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
    this.registry = registry;

    // 緩存更新的時間間隔,默認為30s
    long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
    // 讀寫緩存構造,使用Google的CacheBuilder緩存
    this.readWriteCacheMap =
            CacheBuilder.newBuilder().initialCapacity(1000)
                   // 過期180s
                   .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
                    .removalListener(new RemovalListener<Key, Value>() {
                        @Override
                        public void onRemoval(RemovalNotification<Key, Value> notification) {
                            Key removedKey = notification.getKey();
                        }
                    })
                    // 緩存加載器,當緩存不存在時,會自動執行load方法,進行緩存加載。同時返回緩存數據
                    .build(new CacheLoader<Key, Value>() {
                        @Override
                        public Value load(Key key) throws Exception {
                            // 加載數據
                            Value value = generatePayload(key);
                            return value;
                        }
                    });

    // 是否使用只讀緩存,如果使用,此處則啟動一個定時器,默認每隔30s用來復制readWriteCacheMap 的數據至readOnlyCacheMap
    if (shouldUseReadOnlyResponseCache) {
        timer.schedule(getCacheUpdateTask(),
                new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
                        + responseCacheUpdateIntervalMs),
                responseCacheUpdateIntervalMs);
    }
}

private TimerTask getCacheUpdateTask() {
    return new TimerTask() {
        @Override
        public void run() {
            // 遍歷只讀緩存中的key
            for (Key key : readOnlyCacheMap.keySet()) {
                try {
                    CurrentRequestVersion.set(key.getVersion());
                    // 從讀寫緩存中獲得數據
                    Value cacheValue = readWriteCacheMap.get(key);
                    // 從只讀緩存中獲得數據
                    Value currentCacheValue = readOnlyCacheMap.get(key);
                    // 如果兩者不一致,以讀寫緩存為準,覆蓋只讀緩存的數據
                    if (cacheValue != currentCacheValue) {
                        readOnlyCacheMap.put(key, cacheValue);
                    }
                } catch (Throwable th) {
                }
            }
        }
    };
}

總結:

在拉取注冊的時候:

  1. 首先從ReadOnlyCacheMap里查緩存的注冊表;
  2. 若沒有,就找ReadWriteCacheMap里緩存的注冊表;
  3. 如果還沒有,就從內存中獲取實際的注冊表數據。

在注冊表發生時候:

  1. 會在內存中更新變更的注冊表數據,同時過期掉ReadWriteCacheMap;
  2. 此過程不會影響ReadOnlyCacheMap提供人家查詢注冊表;
  3. ReadOnlyCacheMap 默認30秒會從ReadWriteCacheMap中更新數據;
  4. ReadWriteCacheMap 默認是180秒數據會失效。
  5. 下次有服務拉取列表或者是ReadOnlyCacheMap更新時, 如果緩存沒有命中,都會去注冊表重新獲取最新的值。

多級緩存的優點

  1. 盡可能保證了內存注冊表數據不會出現頻繁的讀寫沖突問題;
  2. 進一步保證了對eurekaService的大量請求,都是快速走純內存。【如我們公司】

參考文檔 :
https://github.com/Netflix/eureka
https://www.iocoder.cn/categories/Eureka/

原文鏈接:https://blog.csdn.net/One_hundred_nice/article/details/123018451

欄目分類
最近更新