日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

apollo配置中心的client端分析

作者:法拉弟弟 更新時間: 2022-08-15 編程語言

文章目錄

  • apollo簡介
  • 整體設計
  • apollo client分析
    • client分析
      • 啟動流程
      • ConfigServiceLocator
      • RemoteConfigLongPollService
      • RemoteConfigRepository
      • SpringValueRegistry

apollo簡介

apollo是攜程開源的一款配置中心的產品。什么是配置中心呢?我們在開發的過程中最簡單的就是在代碼中hard coding,寫好之后直接部署到生產環境,但是這樣的弊端是每次修改一個簡單的配置,就需要重新改代碼重新上線,極大的浪費人力。apollo的作用正是在不上線的情況下可以動態實時的修改系統的配置數據。

整體設計

image-20220703121317472

在分析之前,我們需要對apollo的整體架構有一個大概的了解,因為我們下面對各個模塊進行簡單的介紹。

  • ConfigService

    • 提供配置獲取接口
    • 提供配置推送接口
    • 服務于Apollo客戶端
  • AdminService

    • 提供配置管理接口
    • 提供配置修改發布接口
    • 服務于管理界面Portal
  • Client

    • 為應用獲取配置,支持實時更新
    • 通過MetaServer獲取ConfigService的服務列表
    • 使用客戶端軟負載SLB方式調用ConfigService
  • Portal

    • 配置管理頁面
    • 通過MetaServer獲取AdminService的服務列表
    • 使用客戶端軟負載SLB方式調用AdminService

輔助模塊:

  • Eureka

    • 用于服務發現和注冊
    • Config/AdminService注冊實例并定期報心跳
    • 和ConfigService一起部署
  • MetaServer

    • Portal通過域名訪問MetaServer獲取AdminService的地址列表
    • Client通過域名訪問MetaServer獲取ConfigService的地址列表
    • 相當于Eureka Proxy
    • 和ConfigService一起部署
  • NginxLB

    • 和域名系統配合,協助Portal訪問MetaServer獲取AdminService的地址列表
    • 和域名系統配合,協助Client訪問MetaServer獲取ConfigService的地址列表
    • 和域名系統配置,協助用戶訪問Portal進行配置管理。

要點:

  1. ConfigService是一個獨立的微服務,服務于Client進行配置獲取。
  2. Client和ConfigService保持長連接,通過一種推拉結合(push & pull)的模式,在實現配置實時更新的同時,保證配置更新不丟失。
  3. AdminService是一個獨立的微服務,服務于Portal進行配置管理。Portal通過調用AdminService進行配置管理和發布。
  4. ConfigService和AdminService共享ConfigDB,ConfigDB中存放項目在某個環境中的配置信息。ConfigService/AdminService/ConfigDB三者在每個環境(DEV/FAT/UAT/PRO)中都要部署一份。
  5. Protal有一個獨立的PortalDB,存放用戶權限、項目和配置的元數據信息。Protal只需部署一份,它可以管理多套環境。

apollo client分析

上一節我們談了apollo配置中心的整體架構,但是由于設計的比較全面,不是太好理解,我們通過下面的簡化版進行說明:

image-20220703205937728

需要配置的數據會通過portal調用admin service將數據存儲在DB中。client是我們的業務系統,可以實時的從config service獲取最新的配置數據,而在apollo中,獲取配置數據有推拉結合的方式。

client分析

客戶端總共有四個后臺線程

image-20220707093912867

啟動流程

  • Spring啟動
    • 調用 ApolloApplicationContextInitializer
      • 通過spi方式開始初始化
      • ConfigService ->ConfigManager ->ConfigFactory ->Config ->ConfigRepository
        • 首次初始化時會同步獲取一次資源 RemoteConfigRepository
          • 同步調用一次 this.trySync();
          • 開啟異步定時任務 this.schedulePeriodicRefresh();
          • 開啟長輪詢,及時獲取配置調整 this.scheduleLongPollingRefresh();
    • bean初始化前階段 - SpringValueProcessor
      • 將配置與bean的關系注冊到SpringValueRegistry中

在springboot項目中,通過@EnableApolloConfig啟動apollo client。

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Documented
@Import({ApolloConfigRegistrar.class})
public @interface EnableApolloConfig {
    String[] value() default {"application"};

    int order() default 2147483647;
}
public class ApolloConfigRegistrar implements ImportBeanDefinitionRegistrar {
    public ApolloConfigRegistrar() {
    }

    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
        AnnotationAttributes attributes = AnnotationAttributes.fromMap(importingClassMetadata.getAnnotationAttributes(EnableApolloConfig.class.getName()));
        String[] namespaces = attributes.getStringArray("value");
        int order = (Integer)attributes.getNumber("order");
        PropertySourcesProcessor.addNamespaces(Lists.newArrayList(namespaces), order);
        Map<String, Object> propertySourcesPlaceholderPropertyValues = new HashMap();
        propertySourcesPlaceholderPropertyValues.put("order", 0);
        BeanRegistrationUtil.registerBeanDefinitionIfNotExists(registry, PropertySourcesPlaceholderConfigurer.class.getName(), PropertySourcesPlaceholderConfigurer.class, propertySourcesPlaceholderPropertyValues);
        BeanRegistrationUtil.registerBeanDefinitionIfNotExists(registry, PropertySourcesProcessor.class.getName(), PropertySourcesProcessor.class);
        BeanRegistrationUtil.registerBeanDefinitionIfNotExists(registry, ApolloAnnotationProcessor.class.getName(), ApolloAnnotationProcessor.class);
        BeanRegistrationUtil.registerBeanDefinitionIfNotExists(registry, SpringValueProcessor.class.getName(), SpringValueProcessor.class);
        BeanRegistrationUtil.registerBeanDefinitionIfNotExists(registry, SpringValueDefinitionProcessor.class.getName(), SpringValueDefinitionProcessor.class);
        BeanRegistrationUtil.registerBeanDefinitionIfNotExists(registry, ApolloJsonValueProcessor.class.getName(), ApolloJsonValueProcessor.class);
    }
}

enable注解引入了ApolloConfigRegistrar,在該類中注冊了多個apollo必須的BeanDefination(后續會被Spring初始化為bean)。主要有PropertySourcesPlaceholderConfigurer、PropertySourcesProcessor、ApolloAnnotationProcessor、SpringValueProcessor、SpringValueDefinitionProcessor、ApolloJsonValueProcessor。

ConfigServiceLocator

image-20220704200522969

public ConfigServiceLocator() {
    List<ServiceDTO> initial = Lists.newArrayList();
    this.m_configServices = new AtomicReference(initial);
    this.m_responseType = (new TypeToken<List<ServiceDTO>>() {
    }).getType();
    this.m_httpUtil = (HttpUtil)ApolloInjector.getInstance(HttpUtil.class);
    this.m_configUtil = (ConfigUtil)ApolloInjector.getInstance(ConfigUtil.class);
    this.m_executorService = Executors.newScheduledThreadPool(1, ApolloThreadFactory.create("ConfigServiceLocator", true));
    this.initConfigServices();
}

PropertySourcesProcessor的postProcessBeanFactory作為入口會執行ConfigServiceLocator的初始化,在初始化方法里,會初始化

m_executorService。

private void schedulePeriodicRefresh() {
  //啟動固定頻率的定時任務
  this.m_executorService.scheduleAtFixedRate(
      new Runnable() {
        @Override
        public void run() {
          logger.debug("refresh config services");
          Tracer.logEvent("Apollo.MetaService", "periodicRefresh");
          tryUpdateConfigServices();
        }
        //這里是間隔的執行時間,默認是5min
      }, m_configUtil.getRefreshInterval(), m_configUtil.getRefreshInterval(),
      m_configUtil.getRefreshIntervalTimeUnit());
}
private synchronized void updateConfigServices() {
  //拼接到url http://xxx.config.apollo.xxx.com/services/config?appId=my-server-name&ip=10.xx.xxx.xxx
  String url = assembleMetaServiceUrl();

  HttpRequest request = new HttpRequest(url);
  int maxRetries = 2;
  Throwable exception = null;

  for (int i = 0; i < maxRetries; i++) {
    Transaction transaction = Tracer.newTransaction("Apollo.MetaService", "getConfigService");
    transaction.addData("Url", url);
    try {
      //通過http請求,返回的ServiceDTO結構中包含appName、instanceId、homepageUrl
      HttpResponse<List<ServiceDTO>> response = m_httpClient.doGet(request, m_responseType);
      transaction.setStatus(Transaction.SUCCESS);
      List<ServiceDTO> services = response.getBody();
      if (services == null || services.isEmpty()) {
        logConfigService("Empty response!");
        continue;
      }
      setConfigServices(services);
      return;
    } catch (Throwable ex) {
      Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
      transaction.setStatus(ex);
      exception = ex;
    } finally {
      transaction.complete();
    }

    try {
      m_configUtil.getOnErrorRetryIntervalTimeUnit().sleep(m_configUtil.getOnErrorRetryInterval());
    } catch (InterruptedException ex) {
      //ignore
    }
  }

}

該方法的主要作用就是定時拉取服務配置

用ApolloInjector做依賴管理

RemoteConfigRepository定時輪詢Config Service

RemoteConfigLongPollService ,長輪詢Config Service的配置變更通知/notifications/v2接口

image-20220705100059884

  • 一個Namespace對應一個RemoteConfigRepository
  • 多個RemoteConfigRepository注冊到全局唯一的RemoteConfigLongPollService中

RemoteConfigLongPollService

上面我們介紹過,apollo獲取最新配置是通過推拉結合的方式,而推的方式主要是通過長輪詢實現的,這個后臺線程就是長輪詢的實現。大體步驟如下:

  1. 客戶端發起一個Http請求到服務端,設置超時時間為90秒
  2. 服務端會保持住這個連接60秒
  3. 如果在60秒內有客戶端關心的配置變化,被保持住的客戶端請求會立即返回,并告知客戶端有配置變化的namespace信息,客戶端會據此拉取對應namespace的最新配置
  4. 如果在60秒內沒有客戶端關心的配置變化,那么會返回Http狀態碼304給客戶端
  5. 客戶端在收到服務端請求后會立即重新發起連接,回到第一步
    考慮到會有數萬客戶端向服務端發起長連,在服務端使用了async servlet(Spring DeferredResult)來服務Http Long Polling請求。
//客戶端發起請求的代碼
RemoteConfigLongPollServiceprivate void doLongPollingRefresh(String appId, String cluster, String dataCenter, String secret) {
  ServiceDTO lastServiceDto = null;
  //這就是各while循環
  while (!m_longPollingStopped.get() && !Thread.currentThread().isInterrupted()) {
    if (!m_longPollRateLimiter.tryAcquire(5, TimeUnit.SECONDS)) {
      //wait at most 5 seconds
      try {
        TimeUnit.SECONDS.sleep(5);
      } catch (InterruptedException e) {
      }
    }
    Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "pollNotification");
    String url = null;
    try {
      if (lastServiceDto == null) {
        lastServiceDto = this.resolveConfigService();
      }

      url =
          assembleLongPollRefreshUrl(lastServiceDto.getHomepageUrl(), appId, cluster, dataCenter,
              m_notifications);

      logger.debug("Long polling from {}", url);

      HttpRequest request = new HttpRequest(url);
      request.setReadTimeout(LONG_POLLING_READ_TIMEOUT);
      if (!StringUtils.isBlank(secret)) {
        Map<String, String> headers = Signature.buildHttpHeaders(url, appId, secret);
        request.setHeaders(headers);
      }

      transaction.addData("Url", url);

      final HttpResponse<List<ApolloConfigNotification>> response =
          m_httpClient.doGet(request, m_responseType);

      logger.debug("Long polling response: {}, url: {}", response.getStatusCode(), url);
      if (response.getStatusCode() == 200 && response.getBody() != null) {
        updateNotifications(response.getBody());
        updateRemoteNotifications(response.getBody());
        transaction.addData("Result", response.getBody().toString());
        notify(lastServiceDto, response.getBody());
      }

      //try to load balance
      if (response.getStatusCode() == 304 && ThreadLocalRandom.current().nextBoolean()) {
        lastServiceDto = null;
      }

      m_longPollFailSchedulePolicyInSecond.success();
      transaction.addData("StatusCode", response.getStatusCode());
      transaction.setStatus(Transaction.SUCCESS);
    } catch (Throwable ex) {
      lastServiceDto = null;
      Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
      transaction.setStatus(ex);
      long sleepTimeInSecond = m_longPollFailSchedulePolicyInSecond.fail();
      logger.warn(
          "Long polling failed, will retry in {} seconds. appId: {}, cluster: {}, namespaces: {}, long polling url: {}, reason: {}",
          sleepTimeInSecond, appId, cluster, assembleNamespaces(), url, ExceptionUtil.getDetailMessage(ex));
      try {
        TimeUnit.SECONDS.sleep(sleepTimeInSecond);
      } catch (InterruptedException ie) {
        //ignore
      }
    } finally {
      transaction.complete();
    }
  }
}

客戶端發送一個http請求到服務端的/notifications/v2接口。服務端不會立刻返回,而是通過Spring DeferredResult把請求掛起,如果60s內沒有該客戶端關心的配置發布,那么會返回http狀態碼304給客戶端,為什么是60s呢?這個不重要,但重要的是這個時間要小于客戶端設置的超時時間90s,否則客戶端會經常timeout。若該客戶端關心的配置有更新,則會立刻返回。客戶端從返回的結果中獲取到配置變化的namespace后,會立刻請求config service獲取該namespace的最新配置。

下面我們簡單看下服務端的代碼:

NotificationControllerV2public DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> pollNotification(
    @RequestParam(value = "appId") String appId,
    @RequestParam(value = "cluster") String cluster,
    @RequestParam(value = "notifications") String notificationsAsString,
    @RequestParam(value = "dataCenter", required = false) String dataCenter,
    @RequestParam(value = "ip", required = false) String clientIp) {
  List<ApolloConfigNotification> notifications = null;

  try {
    notifications =
        gson.fromJson(notificationsAsString, notificationsTypeReference);
  } catch (Throwable ex) {
    Tracer.logError(ex);
  }

  if (CollectionUtils.isEmpty(notifications)) {
    throw new BadRequestException("Invalid format of notifications: " + notificationsAsString);
  }
  
  Map<String, ApolloConfigNotification> filteredNotifications = filterNotifications(appId, notifications);

  if (CollectionUtils.isEmpty(filteredNotifications)) {
    throw new BadRequestException("Invalid format of notifications: " + notificationsAsString);
  }
  
  DeferredResultWrapper deferredResultWrapper = new DeferredResultWrapper(bizConfig.longPollingTimeoutInMilli());
  Set<String> namespaces = Sets.newHashSetWithExpectedSize(filteredNotifications.size());
  Map<String, Long> clientSideNotifications = Maps.newHashMapWithExpectedSize(filteredNotifications.size());
  
  for (Map.Entry<String, ApolloConfigNotification> notificationEntry : filteredNotifications.entrySet()) {
    String normalizedNamespace = notificationEntry.getKey();
    ApolloConfigNotification notification = notificationEntry.getValue();
    namespaces.add(normalizedNamespace);
    clientSideNotifications.put(normalizedNamespace, notification.getNotificationId());
    if (!Objects.equals(notification.getNamespaceName(), normalizedNamespace)) {
      deferredResultWrapper.recordNamespaceNameNormalizedResult(notification.getNamespaceName(), normalizedNamespace);
    }
  }

  Multimap<String, String> watchedKeysMap =
      watchKeysUtil.assembleAllWatchKeys(appId, cluster, namespaces, dataCenter);

  Set<String> watchedKeys = Sets.newHashSet(watchedKeysMap.values());

  /**
   * 1、set deferredResult before the check, for avoid more waiting
   * If the check before setting deferredResult,it may receive a notification the next time
   * when method handleMessage is executed between check and set deferredResult.
   */
  deferredResultWrapper
        .onTimeout(() -> logWatchedKeys(watchedKeys, "Apollo.LongPoll.TimeOutKeys"));

  deferredResultWrapper.onCompletion(() -> {
    //unregister all keys
    for (String key : watchedKeys) {
      deferredResults.remove(key, deferredResultWrapper);
    }
    logWatchedKeys(watchedKeys, "Apollo.LongPoll.CompletedKeys");
  });

  //register all keys
  for (String key : watchedKeys) {
    this.deferredResults.put(key, deferredResultWrapper);
  }

  logWatchedKeys(watchedKeys, "Apollo.LongPoll.RegisteredKeys");
  logger.debug("Listening {} from appId: {}, cluster: {}, namespace: {}, datacenter: {}",
      watchedKeys, appId, cluster, namespaces, dataCenter);

  /**
   * 2、check new release
   */
  List<ReleaseMessage> latestReleaseMessages =
      releaseMessageService.findLatestReleaseMessagesGroupByMessages(watchedKeys);

  /**
   * Manually close the entity manager.
   * Since for async request, Spring won't do so until the request is finished,
   * which is unacceptable since we are doing long polling - means the db connection would be hold
   * for a very long time
   */
  entityManagerUtil.closeEntityManager();

  List<ApolloConfigNotification> newNotifications =
      getApolloConfigNotifications(namespaces, clientSideNotifications, watchedKeysMap,
          latestReleaseMessages);

  if (!CollectionUtils.isEmpty(newNotifications)) {
    deferredResultWrapper.setResult(newNotifications);
  }

  return deferredResultWrapper.getResult();
}

RemoteConfigRepository

RemoteConfigRepository定時輪詢Config Service的配置讀取/configs/{appId}/{clusterName}/{namespace:.+}
詳細請查看com.ctrip.framework.apollo.configservice.controller.ConfigController的configs/{appId}/{clusterName}/{namespace:.+}接口

RemoteConfigRepository:
protected synchronized void sync() {
  Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "syncRemoteConfig");

  try {
    //緩存中的apolloConfig
    ApolloConfig previous = m_configCache.get();
    //從configServer獲得apolloConfig
    ApolloConfig current = loadApolloConfig();

    //reference equals means HTTP 304
    //如果不相等說明有更新,更新緩存
    if (previous != current) {
      logger.debug("Remote Config refreshed!");
      m_configCache.set(current);
      //發布事件,由監聽的listener進行消費
      this.fireRepositoryChange(m_namespace, this.getConfig());
    }

    if (current != null) {
      Tracer.logEvent(String.format("Apollo.Client.Configs.%s", current.getNamespaceName()),
          current.getReleaseKey());
    }

    transaction.setStatus(Transaction.SUCCESS);
  } catch (Throwable ex) {
    transaction.setStatus(ex);
    throw ex;
  } finally {
    transaction.complete();
  }
}

SpringValueRegistry

private void initialize() {
    Executors.newSingleThreadScheduledExecutor(ApolloThreadFactory.create("SpringValueRegistry", true)).scheduleAtFixedRate(new Runnable() {
        public void run() {
            try {
                SpringValueRegistry.this.scanAndClean();
            } catch (Throwable var2) {
                var2.printStackTrace();
            }

        }
    }, 5L, 5L, TimeUnit.SECONDS);
}

private void scanAndClean() {
    Iterator iterator = this.registry.values().iterator();

    while(!Thread.currentThread().isInterrupted() && iterator.hasNext()) {
        Multimap<String, SpringValue> springValues = (Multimap)iterator.next();
        Iterator springValueIterator = springValues.entries().iterator();

        while(springValueIterator.hasNext()) {
            Entry<String, SpringValue> springValue = (Entry)springValueIterator.next();
            if (!((SpringValue)springValue.getValue()).isTargetBeanValid()) {
                springValueIterator.remove();
            }
        }
    }

}

將符合條件的屬性封裝成一個 SpringValue 對象,放在一個Map 中。當 clien 檢測到配置發生變化時,就會更新這個 Map 里面的值,從而達到自動更新的目的。

參考:

https://mp.weixin.qq.com/s/-hUaQPzfsl9Lm3IqQW3VDQ

https://mp.weixin.qq.com/s/ALRSZCvtgv7m8q4tC8qlUg

https://www.jianshu.com/p/915b893eae20

https://blog.csdn.net/qq_40378034/article/details/114778207

https://cloud.tencent.com/developer/article/1878847

https://www.bilibili.com/read/cv11916999/

https://blog.csdn.net/pdwljhlg/article/details/89459786

原文鏈接:https://blog.csdn.net/u013978512/article/details/126336350

欄目分類
最近更新