網站首頁 編程語言 正文
1. 背景
1.1. 注冊中心是什么
注冊中心可以說是微服務架構中的”通訊錄“,它記錄了服務和服務地址的映射關系。在分布式架構中,服務會注冊到這里,當服務需要調用其它服務時,就到這里找到服務的地址,進行調用。
1.2. 為什么需要注冊中心
在分布式系統中,服務可能有上千個,然后每個服務都有好幾個實例,如果通過 ip + port 進行服務之間通信則會使系統變得難維護,并且還需要考慮其他復雜的問題:
- 服務注冊后,如何被及時發現
- 服務宕機后,如何及時下線
- 服務如何有效的水平擴展
- 如何獲取服務列表
- 注冊中心如何實現自身的高可用
2. Eureka
2.1. 世面上的流行的注冊中心
組件名稱 | 組件簡介 |
---|---|
Zookeeper | zookeeper是一個分布式協調工具,可以實現注冊中心功能 |
Eureka | springcloud的注冊中心 |
Consul | Consul 簡化了分布式環境中的服務的注冊和發現流程,國外比較流行 |
Nacos | Nacos 致力于幫助您發現、配置和管理微服務。SpringCloudAlibaba |
2.2. Eureka
https://github.com/Netflix/eureka
服務注冊中心(可以是一個集群),對外暴露自己的地址
注冊中心有 Eureka Service, Eureka Client,Eureka Client又分為提供者和消費者;
(某一個服務既可以是提供者也可以是消費者)
服務提供者
- 服務注冊: 啟動的時候會通過發送REST請求的方式將自己注冊到Eureka Server上,同時帶上了自身服務的一些元數據信息。
- 服務續約: 在注冊完服務之后,服務提供者會維護一個心跳(默認30S) 用來持續告訴Eureka Server: "我還活著 ”
- 服務下線: 當服務實例進行正常的關閉操作時,它會觸發一個服務下線的REST請求 給Eureka Server, 告訴服務注冊中心:“我要下線了 ”。
服務消費者
- 獲取服務: 服務消費者(Eureka Client)在啟動的時候,會發送一個REST請求給Eureka Server,獲 取上面注冊的服務清單,并且緩存在Eureka Client本地,默認緩存30秒 (eureka.client.registryFetchIntervalSeconds)。同時,為了性能考慮,Eureka Server也會維護一份只讀的服務清單緩存,該緩存每隔30秒更新一次。
- 服務調用: 服務消費者在獲取服務清單后,通過服務名可以獲得具體提供服務的實例名和該實例的元數據信息。在進行服務調用的時候,優先訪問同處一個Zone中的服務提供方。
Eureka Server(服務注冊中心)
-
失效剔除:【在關閉自我保護才有效】 默認每隔一段時間(默認為60秒) 將當前清單中超時(默認為90秒)沒有續約的服務剔除出去。
-
自我保護: EurekaServer 在運行期間,如果在15分鐘內超過85%的客戶端節點都沒有正常的心跳(通常由于網絡不穩定導致)。 Eureka Server會將當前的實例注冊信息保護起來, 讓這些實例不會過期,盡可能保護這些注冊信息。此時會出現以下幾種情況:
- Eureka Server不再從注冊列表中移除因為長時間沒收到心跳而應該過期的服務。
- Eureka Server仍然能夠接受新服務的注冊和查詢請求,但是不會被同步到其它節點上,保證當前節點依然可用。
- 當網絡穩定時,當前Eureka Server新的注冊信息會被同步到其它節點中。
因此Eureka Server可以很好的應對因網絡故障導致部分節點失聯的情況,而不會像ZK那樣如果有一半不可用的情況會導 致整個集群不可用而變成癱瘓。
3.1. 服務注冊
3.1.1. Eureka-Client
啥時候會被注冊
- 當客戶端剛剛啟動的時候
- 當客戶端的instance信息發生改動
當我們的客戶端引入了Eureka-Client,當主方法啟動時,@SpringBootApplication會掃描所有的META-INF/spring.factories文件下的 xxxAutoConfiguration。這時候 EurekaClientAutoConfiguration 也會被加載。
上面這段代碼,很簡單,就是實例化了一個Bean,主要是這個Bean實現了SmartLifecycle, 當重寫方法 isAutoStartup() 返回值為true,會啟動start()方法。
下面可以詳細看看這個代碼。
EurekaClientAutoConfiguration.java
@Bean
public EurekaAutoServiceRegistration eurekaAutoServiceRegistration(ApplicationContext context, EurekaServiceRegistry registry, EurekaRegistration registration) {
// 重點代碼
return new EurekaAutoServiceRegistration(context, registry, registration);
}
EurekaAutoServiceRegistration.java
public class EurekaAutoServiceRegistration implements AutoServiceRegistration, SmartLifecycle, Ordered {
@Override
public void start() {
// ...
// 該實例還未啟動
if (!this.running.get() && this.registration.getNonSecurePort() > 0) {
// 重點;自動去注冊服務
this.serviceRegistry.register(this.registration);
// 發布 節點注冊事件
this.context.publishEvent(
new InstanceRegisteredEvent<>(this, this.registration.getInstanceConfig()));
this.running.set(true);
}
}
@Override
public boolean isAutoStartup() {
return true;
}
}
InstanceInfoReplicator.java
public boolean onDemandUpdate() {
if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
if (!scheduler.isShutdown()) {
scheduler.submit(new Runnable() {
@Override
public void run() {
// ...
// 調用run方法
InstanceInfoReplicator.this.run();
}
});
return true;
} else {
return false;
}
} else {
return false;
}
}
public void run() {
try {
// 刷新實例信息。
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
// 注冊自己的服務
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
第二種是當我們客戶端instance信息發生變化
private void initScheduledTasks() {
//省略, 刷新緩存的定時器
// 監聽instance的狀態變更
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
// ...
// 調用方法
instanceInfoReplicator.onDemandUpdate();
}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
}
總結 :服務注冊分為2種。
第一種: 當應用啟動的時候,如果應用開啟了自動注冊(默認開啟), 那么在自動配置類加載的時候,會通過EurekaAutoServiceRegistration實例化的時候,去改變instance的status,然后調用注冊。
第二種: 主要應用于啟動之后,當應用的信息發生改變之后,每40每秒執行一次的線程,檢測到了,也會自動去注冊一次。
DiscoveryClient.register()
DiscoveryClient.java
boolean register() throws Throwable {
EurekaHttpResponse<Void> httpResponse;
try {
//發起HTTP請求
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
}
return httpResponse.getStatusCode() == 204;
}
使用的Jersey框架來完成http的請求調用
AbstractJerseyEurekaHttpClient.java
@Override
public EurekaHttpResponse<Void> register(InstanceInfo info) {
// 請求url
String urlPath = "apps/" + info.getAppName();
ClientResponse response = null;
try {
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
addExtraHeaders(resourceBuilder);
response = resourceBuilder
.header("Accept-Encoding", "gzip")
.type(MediaType.APPLICATION_JSON_TYPE)
.accept(MediaType.APPLICATION_JSON)
// post請求;請求參數
.post(ClientResponse.class, info);
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
// ...
}
}
POST 請求 Eureka-Server 的 apps/${APP_NAME} 接口,參數為 InstanceInfo ,實現注冊實例信息的注冊。
3.1.2. Eureka-Service
ApplicationResource.java
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
// 參數校驗
// ...
// 重點代碼
registry.register(info, "true".equals(isReplication));
return Response.status(204).build();
}
瀏覽器發送 localhost:7001/eureka/apps
<applications>
<versions__delta>1</versions__delta>
# applicaitons的組成的hash
<apps__hashcode>UP_3_</apps__hashcode>
<application>
# 應用名
<name>CLOUD-PROVIDER-PAYMENT</name>
# 實例
<instance>
# 實例ID需要唯一
<instanceId>LAPTOP-RHVGCSK0:payment:8001</instanceId>
<hostName>192.168.31.193</hostName>
<app>CLOUD-PROVIDER-PAYMENT</app>
<ipAddr>192.168.31.193</ipAddr>
<status>UP</status>
<overriddenstatus>UNKNOWN</overriddenstatus>
<port enabled="true">8001</port>
<securePort enabled="false">443</securePort>
<countryId>1</countryId>
<dataCenterInfo class="com.netflix.appinfo.InstanceInfo$DefaultDataCenterInfo">
<name>MyOwn</name>
</dataCenterInfo>
<leaseInfo>
<renewalIntervalInSecs>30</renewalIntervalInSecs>
<durationInSecs>90</durationInSecs>
<registrationTimestamp>1617633199552</registrationTimestamp>
<lastRenewalTimestamp>1617633829600</lastRenewalTimestamp>
<evictionTimestamp>0</evictionTimestamp>
<serviceUpTimestamp>1617633199552</serviceUpTimestamp>
</leaseInfo>
<metadata>
<management.port>8001</management.port>
</metadata>
<homePageUrl>http://192.168.31.193:8001/</homePageUrl>
<statusPageUrl>http://192.168.31.193:8001/actuator/info</statusPageUrl>
<healthCheckUrl>http://192.168.31.193:8001/actuator/health</healthCheckUrl>
<vipAddress>cloud-provider-payment</vipAddress>
<secureVipAddress>cloud-provider-payment</secureVipAddress>
<isCoordinatingDiscoveryServer>false</isCoordinatingDiscoveryServer>
<lastUpdatedTimestamp>1617633199552</lastUpdatedTimestamp>
<lastDirtyTimestamp>1617633199491</lastDirtyTimestamp>
<actionType>ADDED</actionType>
</instance>
</application>
<application>
<name>ORDER80</name>
# 多個實例
<instance>
<instanceId>LAPTOP-RHVGCSK0:order:80</instanceId>
<hostName>192.168.31.193</hostName>
<app>ORDER80</app>
<ipAddr>192.168.31.193</ipAddr>
<status>UP</status>
<port enabled="true">80</port>
<securePort enabled="false">443</securePort>
<countryId>1</countryId>
<leaseInfo>
<renewalIntervalInSecs>30</renewalIntervalInSecs>
<durationInSecs>90</durationInSecs>
<registrationTimestamp>1617633135195</registrationTimestamp>
<lastRenewalTimestamp>1617633825249</lastRenewalTimestamp>
<evictionTimestamp>0</evictionTimestamp>
<serviceUpTimestamp>1617633135195</serviceUpTimestamp>
</leaseInfo>
<metadata>
<management.port>80</management.port>
</metadata>
<homePageUrl>http://192.168.31.193:80/</homePageUrl>
<statusPageUrl>http://192.168.31.193:80/actuator/info</statusPageUrl>
<healthCheckUrl>http://192.168.31.193:80/actuator/health</healthCheckUrl>
<vipAddress>order80</vipAddress>
<secureVipAddress>order80</secureVipAddress>
<isCoordinatingDiscoveryServer>false</isCoordinatingDiscoveryServer>
<lastUpdatedTimestamp>1617633135195</lastUpdatedTimestamp>
<lastDirtyTimestamp>1617633135119</lastDirtyTimestamp>
<actionType>ADDED</actionType>
</instance>
<instance>
<instanceId>LAPTOP-RHVGCSK0:order:81</instanceId>
<hostName>192.168.31.193</hostName>
<app>ORDER80</app>
<ipAddr>192.168.31.193</ipAddr>
<status>UP</status>
<overriddenstatus>UNKNOWN</overriddenstatus>
<port enabled="true">81</port>
<securePort enabled="false">443</securePort>
<countryId>1</countryId>
<dataCenterInfo class="com.netflix.appinfo.InstanceInfo$DefaultDataCenterInfo">
<name>MyOwn</name>
</dataCenterInfo>
<leaseInfo>
<renewalIntervalInSecs>30</renewalIntervalInSecs>
<durationInSecs>90</durationInSecs>
<registrationTimestamp>1617631878936</registrationTimestamp>
<lastRenewalTimestamp>1617633829226</lastRenewalTimestamp>
<evictionTimestamp>0</evictionTimestamp>
<serviceUpTimestamp>1617631878937</serviceUpTimestamp>
</leaseInfo>
<metadata>
<management.port>81</management.port>
</metadata>
<homePageUrl>http://192.168.31.193:81/</homePageUrl>
<statusPageUrl>http://192.168.31.193:81/actuator/info</statusPageUrl>
<healthCheckUrl>http://192.168.31.193:81/actuator/health</healthCheckUrl>
<vipAddress>order80</vipAddress>
<secureVipAddress>order80</secureVipAddress>
<isCoordinatingDiscoveryServer>false</isCoordinatingDiscoveryServer>
<lastUpdatedTimestamp>1617631878937</lastUpdatedTimestamp>
<lastDirtyTimestamp>1617631878931</lastDirtyTimestamp>
<actionType>ADDED</actionType>
</instance>
</application>
</applications>
上面的register方法,最終調用的是PeerAwareInstanceRegistryImpl的方法
PeerAwareInstanceRegistryImpl.java
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
// 實例注冊
super.register(info, leaseDuration, isReplication);
// 復制到同等服務節點上去
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
需要先了解一下Lease這個對象,因為Eureka-Server最終處理注冊信息的時候,都會轉化為這個對象來處理。
public class Lease<T> {
// 默認的過期時間 90s
public static final int DEFAULT_DURATION_IN_SECS = 90;
// 實例信息
private T holder;
// 服務剔除是時間,當服務下線的時候,會過來更新這個時間戳registrationTimestamp
private long evictionTimestamp;
// 服務注冊的時間
private long registrationTimestamp;
// 服務啟動時間 ,當客戶端在注冊的時候,instanceInfo的status為UP的時候,則更新這個 時間戳
private long serviceUpTimestamp;
// Make it volatile so that the expiration task would see this quicker
// 最后更新時間,每次續約的時候,都會更新這個時間戳,在判斷實例是否過期時,需要用到這個屬性。
private volatile long lastUpdateTimestamp;
// 過期時間
private long duration;
/**
* 服務是否過期
*/
public boolean isExpired(long additionalLeaseMs) {
return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
}
服務注冊重要代碼
// eureka的注冊表
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
= new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
// 上讀鎖
read.lock();
// 通過服務名從本地MAP里面獲取當前服務列表。
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
// 如果第一次進來,那么gMap為空,則創建一個ConcurrentHashMap放入到registry里面去
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
// putIfAbsent方法主要是在向ConcurrentHashMap中添加鍵—值對的時候,它會先判斷該鍵值對是否已經存在。
// 如果不存在(新的entry),那么會向map中添加該鍵值對,并返回null。
// 如果已經存在,那么不會覆蓋已有的值,直接返回已經存在的值。
// 線程安全操作
gMap =
registry.putIfAbsent(registrant.getAppName(), gNewMap);
// 表明map中確實不存在,則設置gMap為最新創建的那個
if (gMap == null) {
gMap = gNewMap;
}
}
// 從MAP中查詢已經存在的Lease信息 (比如第二次來)
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// 構建一個最新的Lease信息
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
// 如果該實例是第一次啟動,設置啟動啟動
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
// 放入本地Map中
gMap.put(registrant.getId(), lease);
// 設置注冊類型為添加
registrant.setActionType(ActionType.ADDED);
// 最近變更記錄隊列,記錄了實例的每次變化, 用于注冊信息的增量獲取、
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
// 清除讀寫緩存
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
} finally {
read.unlock();
}
3.2. 拉取服務列表
3.2.1. Eureka-Client
還是在 initScheduledTasks() 初始化所有的定時任務 這個方法中:
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
// 拉取服務30秒;每30秒刷新一次
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
// 其他代碼
}
定時更新服務注冊列表線程CacheRefreshThread
class CacheRefreshThread implements Runnable {
public void run() {
refreshRegistry();
}
}
@VisibleForTesting
void refreshRegistry() {
try {
// ...
// 重要代碼,拉取服務列表
boolean success = fetchRegistry(remoteRegionsModified);
if (success) {
registrySize = localRegionApps.get().size();
lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
}
} catch (Throwable e) {
}
}
}
由上可以看到,系統在啟動的時候,初始化了一個定時器,每30秒一次,用來刷新本地緩存信息。
獲得實例信息
/**
* 客戶端的服務列表
*/
private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>();
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
// 取出之前獲取的服務列表
Applications applications = getApplications();
// 判斷多個條件,確定是否觸發全量更新,如下任一個滿足都會全量更新:
// 1. 是否禁用增量
// 2. 是否對某個region特別關注
// 3. 外部傳參是否要全量拉取
// 4. 本地緩存服務列表是否為empty
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
// 拉取全量信息
getAndStoreFullRegistry();
} else {
// 拉取并更新增量信息
getAndUpdateDelta(applications);
}
// 重新計算hash值
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();
} catch (Throwable e) {
} finally {
if (tracer != null) {
tracer.stop();
}
}
return true;
}
全量獲取
private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>();
private void getAndStoreFullRegistry() throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
Applications apps = null;
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
?
// 全量獲取
eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
apps = httpResponse.getEntity();
}
if (apps == null) {
logger.error("The application is null for some reason. Not storing this information");
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
// 設置到本地緩存里面去
localRegionApps.set(this.filterAndShuffle(apps));
} else {
// ...
}
}
增量獲取
DiscoveryClient.java
private void getAndUpdateDelta(Applications applications) throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
// 增量獲取信息
Applications delta = null;
EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
delta = httpResponse.getEntity();
}
// 增量獲取為空,則全量返回
if (delta == null) {
getAndStoreFullRegistry();
}
// CAS
else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
String reconcileHashCode = "";
// 這里設置原子鎖的原因是怕某次調度網絡請求時間過長,導致同一時間有多線程拉取到增量信息并發修改
if (fetchRegistryUpdateLock.tryLock()) {
try {
// 將獲取到的增量信息和本地緩存信息合并
updateDelta(delta);
// 計算本地的hash; ${status}_${count}_;DOWN _2_UP_100_
reconcileHashCode = getReconcileHashCode(applications);
} finally {
fetchRegistryUpdateLock.unlock();
}
} else {
// ..
}
// 如果本地的hash與service的hash不一致,全量去拉取
if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall
}
} else {
// ...
}
}
- 發起http請求,將服務端的客戶端變化的信息拉取過來,如: register, cancle, modify 有過這些操作的數據
- 上鎖,防止某次調度網絡請求時間過長,導致同一時間有多線程拉取到增量信息并發修改
- 將請求過來的增量數據和本地的數據做合并
- 計算hashCode
- 如果hashCode不一致,增量更新錯誤,則又會去服務端發起一次全量獲取
獲取注冊信息
ApplicationsResource.java
@Path("/{version}/apps")
public class ApplicationsResource {
// 省略代碼
@GET
public Response getContainers(@PathParam("version") String version,
@HeaderParam(HEADER_ACCEPT) String acceptHeader,
@HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
@HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
@Context UriInfo uriInfo,
@Nullable @QueryParam("regions") String regionsStr) {
// ...
// 構建全量數據緩存key
Key cacheKey = new Key(Key.EntityType.Application,
ResponseCacheImpl.ALL_APPS,
keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
);
Response response;
// 重點代碼
response = Response.ok(responseCache.get(cacheKey))
.build();
return response;
}
responseCache.get(cacheKey)從緩存中獲取
// ...
Value getValue(final Key key, boolean useReadOnlyCache) {
Value payload = null;
try {
// 是否使用只讀緩存,默認為true
if (useReadOnlyCache) {
// 從緩存中獲取數據
final Value currentPayload = readOnlyCacheMap.get(key);
// 如果不為空,直接返回數據
if (currentPayload != null) {
payload = currentPayload;
}
// 如果為空
else {
// 從讀寫讀寫中獲取
payload = readWriteCacheMap.get(key);
// 同時將數據放入只讀緩存中
readOnlyCacheMap.put(key, payload);
}
} else {
// 從讀寫讀寫中獲取
payload = readWriteCacheMap.get(key);
}
} catch (Throwable t) {
}
return payload;
}
// 只讀緩存
private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();
// 讀寫緩存
private final LoadingCache<Key, Value> readWriteCacheMap;
// 在構造器中實現邏輯
ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
this.serverConfig = serverConfig;
this.serverCodecs = serverCodecs;
// 是否使用只讀緩存,默認為true
this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
this.registry = registry;
// 緩存更新的時間間隔,默認為30s
long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
// 讀寫緩存構造,使用Google的CacheBuilder緩存
this.readWriteCacheMap =
CacheBuilder.newBuilder().initialCapacity(1000)
// 過期180s
.expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
.removalListener(new RemovalListener<Key, Value>() {
@Override
public void onRemoval(RemovalNotification<Key, Value> notification) {
Key removedKey = notification.getKey();
}
})
// 緩存加載器,當緩存不存在時,會自動執行load方法,進行緩存加載。同時返回緩存數據
.build(new CacheLoader<Key, Value>() {
@Override
public Value load(Key key) throws Exception {
// 加載數據
Value value = generatePayload(key);
return value;
}
});
// 是否使用只讀緩存,如果使用,此處則啟動一個定時器,默認每隔30s用來復制readWriteCacheMap 的數據至readOnlyCacheMap
if (shouldUseReadOnlyResponseCache) {
timer.schedule(getCacheUpdateTask(),
new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
+ responseCacheUpdateIntervalMs),
responseCacheUpdateIntervalMs);
}
}
private TimerTask getCacheUpdateTask() {
return new TimerTask() {
@Override
public void run() {
// 遍歷只讀緩存中的key
for (Key key : readOnlyCacheMap.keySet()) {
try {
CurrentRequestVersion.set(key.getVersion());
// 從讀寫緩存中獲得數據
Value cacheValue = readWriteCacheMap.get(key);
// 從只讀緩存中獲得數據
Value currentCacheValue = readOnlyCacheMap.get(key);
// 如果兩者不一致,以讀寫緩存為準,覆蓋只讀緩存的數據
if (cacheValue != currentCacheValue) {
readOnlyCacheMap.put(key, cacheValue);
}
} catch (Throwable th) {
}
}
}
};
}
總結:
在拉取注冊的時候:
- 首先從ReadOnlyCacheMap里查緩存的注冊表;
- 若沒有,就找ReadWriteCacheMap里緩存的注冊表;
- 如果還沒有,就從內存中獲取實際的注冊表數據。
在注冊表發生時候:
- 會在內存中更新變更的注冊表數據,同時過期掉ReadWriteCacheMap;
- 此過程不會影響ReadOnlyCacheMap提供人家查詢注冊表;
- ReadOnlyCacheMap 默認30秒會從ReadWriteCacheMap中更新數據;
- ReadWriteCacheMap 默認是180秒數據會失效。
- 下次有服務拉取列表或者是ReadOnlyCacheMap更新時, 如果緩存沒有命中,都會去注冊表重新獲取最新的值。
多級緩存的優點:
- 盡可能保證了內存注冊表數據不會出現頻繁的讀寫沖突問題;
- 進一步保證了對eurekaService的大量請求,都是快速走純內存。【如我們公司】
參考文檔 :
https://github.com/Netflix/eureka
https://www.iocoder.cn/categories/Eureka/
原文鏈接:https://blog.csdn.net/One_hundred_nice/article/details/123018451
相關推薦
- 2023-05-23 numpy增加維度、刪除維度的方法_python
- 2022-10-22 Python中的Unittest基本使用_python
- 2022-06-15 C#實現冒泡排序和插入排序算法_C#教程
- 2021-11-28 C/C++中的?Qt?StandardItemModel?數據模型應用解析_C 語言
- 2022-09-09 React前端DOM常見Hook封裝示例上_React
- 2022-12-10 Flutter改變狀態變量是否必須寫在setState回調詳解_Android
- 2022-06-28 python神經網絡使用Keras進行模型的保存與讀取_python
- 2024-04-03 Map+函數式接口(替換if-else)
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支