網站首頁 編程語言 正文
上一篇中進行負載均衡選擇具體服務時,是使用的Ribbon 的 LoadBalancer 的 chooseServer 方法,方法中是使用的 Ribbon的 Rule 的 choose 方法。
看一下Ribbon 的 LoadBalancer
1、LoadBalancer 接口
//com.netflix.loadbalancer.ILoadBalancer
public interface ILoadBalancer {
//添加server
public void addServers(List<Server> newServers);
//選擇server
public Server chooseServer(Object key);
//標記server 掛掉
public void markServerDown(Server server);
//丟棄的方法,獲取serverList,根據是否可用
@Deprecated
public List<Server> getServerList(boolean availableOnly);
//獲取可用的servers
public List<Server> getReachableServers();
//獲取所有的servers
public List<Server> getAllServers();
}
2、Loadbalancer的抽象實現類
對比接口,添加了一個枚舉,添加了三個方法
public abstract class AbstractLoadBalancer implements ILoadBalancer {
public enum ServerGroup{
ALL,
STATUS_UP,
STATUS_NOT_UP
}
//選擇server
public Server chooseServer() {
return chooseServer(null);
}
//根據分組獲取serverList
public abstract List<Server> getServerList(ServerGroup serverGroup);
//獲取負載均衡狀態
public abstract LoadBalancerStats getLoadBalancerStats();
}
AbstractLoadBalancer有兩個實現類BaseLoadBalancer,NoOpLoadBalancer
3、BaseLoadBalancer
public class BaseLoadBalancer extends AbstractLoadBalancer implements
PrimeConnections.PrimeConnectionListener, IClientConfigAware {
//默認的rule是輪詢 在這個類中才 組合了 IRule
private final static IRule DEFAULT_RULE = new RoundRobinRule();
//實現接口的方法 添加servers
public void addServers(List<Server> newServers) {
if (newServers != null && newServers.size() > 0) {
try {
ArrayList<Server> newList = new ArrayList<Server>();
newList.addAll(allServerList);
newList.addAll(newServers);
//這個方法里面使用了一個讀寫鎖
setServersList(newList);
} catch (Exception e) {
logger.error("LoadBalancer [{}]: Exception while adding Servers", name, e);
}
}
}
//實現接口的方法,選擇server 調用rule的choose方法
public Server chooseServer(Object key) {
if (counter == null) {
counter = createCounter();
}
counter.increment();
if (rule == null) {
return null;
} else {
try {
return rule.choose(key);
} catch (Exception e) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e);
return null;
}
}
}
//實現接口的方法,標記server下線
public void markServerDown(Server server) {
if (server == null || !server.isAlive()) {
return;
}
logger.error("LoadBalancer [{}]: markServerDown called on [{}]", name, server.getId());
server.setAlive(false);
// forceQuickPing();
//通知server狀態的變化
notifyServerStatusChangeListener(singleton(server));
}
//實現接口的方法,獲取可用servers
public List<Server> getReachableServers() {
return Collections.unmodifiableList(upServerList);
}
//實現接口的方法,獲取所有的server
public List<Server> getAllServers() {
return Collections.unmodifiableList(allServerList);
}
//實現抽象類的方法,獲取serverList
public List<Server> getServerList(ServerGroup serverGroup) {
switch (serverGroup) {
case ALL:
return allServerList;
case STATUS_UP:
return upServerList;
case STATUS_NOT_UP:
ArrayList<Server> notAvailableServers = new ArrayList<Server>(
allServerList);
ArrayList<Server> upServers = new ArrayList<Server>(upServerList);
notAvailableServers.removeAll(upServers);
return notAvailableServers;
}
return new ArrayList<Server>();
}
//抽象類的實現方法,獲取負載均衡狀態
public LoadBalancerStats getLoadBalancerStats() {
return lbStats;
}
}
4、NoOpLoadBalancer 就是什么都不做
public class NoOpLoadBalancer extends AbstractLoadBalancer {
@Override
public void addServers(List<Server> newServers) {
logger.info("addServers to NoOpLoadBalancer ignored");
}
@Override
public Server chooseServer(Object key) {
return null;
}
@Override
public LoadBalancerStats getLoadBalancerStats() {
return null;
}
@Override
public List<Server> getServerList(ServerGroup serverGroup) {
return Collections.emptyList();
}
@Override
public void markServerDown(Server server) {
logger.info("markServerDown to NoOpLoadBalancer ignored");
}
@Override
public List<Server> getServerList(boolean availableOnly) {
// TODO Auto-generated method stub
return null;
}
@Override
public List<Server> getReachableServers() {
return null;
}
@Override
public List<Server> getAllServers() {
return null;
}
}
BaseLoadBalancer有個實現類DynamicServerListLoadBalancer
5、DynamicServerListLoadBalancer
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
//是否https
boolean isSecure = false;
boolean useTunnel = false;
// server狀態是否正在更新
protected AtomicBoolean serverListUpdateInProgress = new AtomicBoolean(false);
volatile ServerList<T> serverListImpl;
volatile ServerListFilter<T> filter;
//這里實現動態刷新服務列表 會有一個定時器來執行
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
updateListOfServers();
}
};
protected volatile ServerListUpdater serverListUpdater;
...
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
ServerList<T> serverList, ServerListFilter<T> filter,
ServerListUpdater serverListUpdater) {
super(clientConfig, rule, ping);
this.serverListImpl = serverList;
this.filter = filter;
//傳入update定時器
this.serverListUpdater = serverListUpdater;
if (filter instanceof AbstractServerListFilter) {
((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
}
restOfInit(clientConfig);
}
...
void restOfInit(IClientConfig clientConfig) {
boolean primeConnection = this.isEnablePrimingConnections();
// turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
this.setEnablePrimingConnections(false);
//啟動update的定時器
enableAndInitLearnNewServersFeature();
updateListOfServers();
if (primeConnection && this.getPrimeConnections() != null) {
this.getPrimeConnections()
.primeConnections(getReachableServers());
}
this.setEnablePrimingConnections(primeConnection);
LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}
//更新servers
@VisibleForTesting
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
servers = serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
}
}
updateAllServerList(servers);
}
//更新所有servers
protected void updateAllServerList(List<T> ls) {
// other threads might be doing this - in which case, we pass
if (serverListUpdateInProgress.compareAndSet(false, true)) {
try {
for (T s : ls) {
s.setAlive(true); // set so that clients can start using these
// servers right away instead
// of having to wait out the ping cycle.
}
setServersList(ls);
super.forceQuickPing();
} finally {
serverListUpdateInProgress.set(false);
}
}
}
...
public void setServersList(List lsrv) {
//調用BaseLoadbalancer的setServersList,更新服務列表
//allServerList upServerList
super.setServersList(lsrv);
//更新zone 服務列表
List<T> serverList = (List<T>) lsrv;
Map<String, List<Server>> serversInZones = new HashMap<String, List<Server>>();
for (Server server : serverList) {
// make sure ServerStats is created to avoid creating them on hot
// path
getLoadBalancerStats().getSingleServerStat(server);
String zone = server.getZone();
if (zone != null) {
zone = zone.toLowerCase();
List<Server> servers = serversInZones.get(zone);
if (servers == null) {
servers = new ArrayList<Server>();
serversInZones.put(zone, servers);
}
servers.add(server);
}
}
setServerListForZones(serversInZones);
}
protected void setServerListForZones(
Map<String, List<Server>> zoneServersMap) {
LOGGER.debug("Setting server list for zones: {}", zoneServersMap);
getLoadBalancerStats().updateZoneServerMapping(zoneServersMap);
}
...
}
DynamicServerListLoadBalancer中添加了一個屬性ServerListUpdater.UpdateAction,通過匿名內部類實現。
public void enableAndInitLearnNewServersFeature() {
LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
serverListUpdater.start(updateAction);
}
6、ZoneAwareLoadBalancer
DynamicServerListLoadBalancer的唯一實現類,也是Ribbon負載均衡默認的負載具衡器
public class ZoneAwareLoadBalancer<T extends Server> extends DynamicServerListLoadBalancer<T> {
//key:zone, value:LoadBalancer
private ConcurrentHashMap<String, BaseLoadBalancer> balancers = new ConcurrentHashMap<String, BaseLoadBalancer>();
private static final Logger logger = LoggerFactory.getLogger(ZoneAwareLoadBalancer.class);
private volatile DynamicDoubleProperty triggeringLoad;
private volatile DynamicDoubleProperty triggeringBlackoutPercentage;
private static final DynamicBooleanProperty ENABLED = DynamicPropertyFactory.getInstance().getBooleanProperty("ZoneAwareNIWSDiscoveryLoadBalancer.enabled", true);
...
//設置區域和服務
@Override
protected void setServerListForZones(Map<String, List<Server>> zoneServersMap) {
super.setServerListForZones(zoneServersMap);
if (balancers == null) {
balancers = new ConcurrentHashMap<String, BaseLoadBalancer>();
}
for (Map.Entry<String, List<Server>> entry: zoneServersMap.entrySet()) {
String zone = entry.getKey().toLowerCase();
getLoadBalancer(zone).setServersList(entry.getValue());
}
// check if there is any zone that no longer has a server
// and set the list to empty so that the zone related metrics does not
// contain stale data
for (Map.Entry<String, BaseLoadBalancer> existingLBEntry: balancers.entrySet()) {
if (!zoneServersMap.keySet().contains(existingLBEntry.getKey())) {
existingLBEntry.getValue().setServersList(Collections.emptyList());
}
}
}
//選擇server,上一篇講過
@Override
public Server chooseServer(Object key) {
if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
logger.debug("Zone aware logic disabled or there is only one zone");
return super.chooseServer(key);
}
Server server = null;
try {
LoadBalancerStats lbStats = getLoadBalancerStats();
Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
logger.debug("Zone snapshots: {}", zoneSnapshot);
if (triggeringLoad == null) {
triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);
}
if (triggeringBlackoutPercentage == null) {
triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
}
Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
logger.debug("Available zones: {}", availableZones);
if (availableZones != null && availableZones.size() < zoneSnapshot.keySet().size()) {
String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
logger.debug("Zone chosen: {}", zone);
if (zone != null) {
BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
server = zoneLoadBalancer.chooseServer(key);
}
}
} catch (Exception e) {
logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
}
if (server != null) {
return server;
} else {
logger.debug("Zone avoidance logic is not invoked.");
return super.chooseServer(key);
}
}
//根據zone獲取負載均衡器
@VisibleForTesting
BaseLoadBalancer getLoadBalancer(String zone) {
zone = zone.toLowerCase();
BaseLoadBalancer loadBalancer = balancers.get(zone);
if (loadBalancer == null) {
// We need to create rule object for load balancer for each zone
IRule rule = cloneRule(this.getRule());
loadBalancer = new BaseLoadBalancer(this.getName() + "_" + zone, rule, this.getLoadBalancerStats());
BaseLoadBalancer prev = balancers.putIfAbsent(zone, loadBalancer);
if (prev != null) {
loadBalancer = prev;
}
}
return loadBalancer;
}
//克隆rule
private IRule cloneRule(IRule toClone) {
IRule rule;
if (toClone == null) {
rule = new AvailabilityFilteringRule();
} else {
String ruleClass = toClone.getClass().getName();
try {
rule = (IRule) ClientFactory.instantiateInstanceWithClientConfig(ruleClass, this.getClientConfig());
} catch (Exception e) {
throw new RuntimeException("Unexpected exception creating rule for ZoneAwareLoadBalancer", e);
}
}
return rule;
}
//設置rule
@Override
public void setRule(IRule rule) {
super.setRule(rule);
if (balancers != null) {
for (String zone: balancers.keySet()) {
balancers.get(zone).setRule(cloneRule(rule));
}
}
}
}
在RibbonClientConfiguration注冊ZoneAwareLoadBalancer時,同時注冊了serverList和serverListUpdater。
@Bean
@ConditionalOnMissingBean
public IRule ribbonRule(IClientConfig config) {
if (this.propertiesFactory.isSet(IRule.class, name)) {
return this.propertiesFactory.get(IRule.class, config, name);
}
ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
rule.initWithNiwsConfig(config);
return rule;
}
@Bean
@ConditionalOnMissingBean
public IPing ribbonPing(IClientConfig config) {
if (this.propertiesFactory.isSet(IPing.class, name)) {
return this.propertiesFactory.get(IPing.class, config, name);
}
return new DummyPing();
}
@Bean
@ConditionalOnMissingBean
@SuppressWarnings("unchecked")
public ServerList<Server> ribbonServerList(IClientConfig config) {
if (this.propertiesFactory.isSet(ServerList.class, name)) {
return this.propertiesFactory.get(ServerList.class, config, name);
}
ConfigurationBasedServerList serverList = new ConfigurationBasedServerList();
serverList.initWithNiwsConfig(config);
return serverList;
}
@Bean
@ConditionalOnMissingBean
public ServerListUpdater ribbonServerListUpdater(IClientConfig config) {
return new PollingServerListUpdater(config);
}
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
return this.propertiesFactory.get(ILoadBalancer.class, config, name);
}
return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
serverListFilter, serverListUpdater);
}
@Bean
@ConditionalOnMissingBean
@SuppressWarnings("unchecked")
public ServerListFilter<Server> ribbonServerListFilter(IClientConfig config) {
if (this.propertiesFactory.isSet(ServerListFilter.class, name)) {
return this.propertiesFactory.get(ServerListFilter.class, config, name);
}
ZonePreferenceServerListFilter filter = new ZonePreferenceServerListFilter();
filter.initWithNiwsConfig(config);
return filter;
}
由此可見,ribbonServerListUpdater是注入的PollingServerListUpdater。
在上面DynamicServerListLoadBalancer的enableAndInitLearnNewServersFeature方法中啟動了ServerListUpdater的start方法,就是調用了PollingServerListUpdater的start方法。
//com.netflix.loadbalancer.PollingServerListUpdater#start
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
//調用update方法,在DynamicServerListLoadBalancer
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
}
};
//定時器延遲1s,定時30s執行一次
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs,
refreshIntervalMs,
TimeUnit.MILLISECONDS
);
} else {
logger.info("Already active, no-op");
}
}
定時器會刷新服務列表
原文鏈接:https://blog.csdn.net/xuwenjingrenca/article/details/125032840
相關推薦
- 2022-09-23 Go語言結構體Go?range的學習教程_Golang
- 2022-07-28 聊聊docker跨主機之間容器通信問題_docker
- 2022-10-14 yum-config-manager 命令找不到的解決方法
- 2022-11-09 Apifox?Echo學習curl?httpie?命令使用詳解_相關技巧
- 2022-03-16 linux下FastDFS搭建圖片服務器_Linux
- 2022-08-21 Android使用貝塞爾曲線畫心形_Android
- 2022-04-19 python?tkinter實現彈窗的輸入輸出_python
- 2022-04-01 關于python中if __name=‘__main__‘的理解
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支