網站首頁 編程語言 正文
1、接口 IRule
public interface IRule{
//選擇server
public Server choose(Object key);
//設置LoadBalancer
public void setLoadBalancer(ILoadBalancer lb);
//獲取LoadBalancer
public ILoadBalancer getLoadBalancer();
}
2、接口的抽象實現類 AbstractLoadBalancerRule
public abstract class AbstractLoadBalancerRule implements IRule, IClientConfigAware {
private ILoadBalancer lb;
@Override
public void setLoadBalancer(ILoadBalancer lb){
this.lb = lb;
}
@Override
public ILoadBalancer getLoadBalancer(){
return lb;
}
}
上一篇中,IRule是在 BaseLoadBalancer 中做了一個綁定,在這里,在接口中設置了Loadbalancer
3、輪詢 RoundRobinRule
public class RoundRobinRule extends AbstractLoadBalancerRule {
//下一個計數
private AtomicInteger nextServerCyclicCounter;
private static final boolean AVAILABLE_ONLY_SERVERS = true;
private static final boolean ALL_SERVERS = false;
private static Logger log = LoggerFactory.getLogger(RoundRobinRule.class);
public RoundRobinRule() {
nextServerCyclicCounter = new AtomicInteger(0);
}
public RoundRobinRule(ILoadBalancer lb) {
this();
setLoadBalancer(lb);
}
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
log.warn("no load balancer");
return null;
}
Server server = null;
//嘗試次數
int count = 0;
while (server == null && count++ < 10) {
//可用的servers
List<Server> reachableServers = lb.getReachableServers();
//所有的servers
List<Server> allServers = lb.getAllServers();
int upCount = reachableServers.size();
int serverCount = allServers.size();
if ((upCount == 0) || (serverCount == 0)) {
log.warn("No up servers available from load balancer: " + lb);
return null;
}
//獲取server,遞增計數并取模
int nextServerIndex = incrementAndGetModulo(serverCount);
server = allServers.get(nextServerIndex);
if (server == null) {
/* Transient. */
Thread.yield();
continue;
}
if (server.isAlive() && (server.isReadyToServe())) {
return (server);
}
// Next.
server = null;
}
//嘗試10次還沒有獲取到server,則返回空
if (count >= 10) {
log.warn("No available alive servers after 10 tries from load balancer: "
+ lb);
}
return server;
}
//遞增計數并取模
private int incrementAndGetModulo(int modulo) {
for (;;) {
int current = nextServerCyclicCounter.get();
int next = (current + 1) % modulo;
if (nextServerCyclicCounter.compareAndSet(current, next))
return next;
}
}
@Override
public Server choose(Object key) {
return choose(getLoadBalancer(), key);
}
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
}
}
4、權重輪詢WeightedResponseTimeRule
將server的應答時間的平均值或者百分比作為權重
public class WeightedResponseTimeRule extends RoundRobinRule {
public static final IClientConfigKey<Integer> WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY = new IClientConfigKey<Integer>() {
@Override
public String key() {
return "ServerWeightTaskTimerInterval";
}
@Override
public String toString() {
return key();
}
@Override
public Class<Integer> type() {
return Integer.class;
}
};
//默認時間周期 30秒
public static final int DEFAULT_TIMER_INTERVAL = 30 * 1000;
//權重計算定時時間 默認 30秒
private int serverWeightTaskTimerInterval = DEFAULT_TIMER_INTERVAL;
private static final Logger logger = LoggerFactory.getLogger(WeightedResponseTimeRule.class);
//服務的權重值,后面的會疊加前面的
private volatile List<Double> accumulatedWeights = new ArrayList<Double>();
private final Random random = new Random();
protected Timer serverWeightTimer = null;
protected AtomicBoolean serverWeightAssignmentInProgress = new AtomicBoolean(false);
String name = "unknown";
public WeightedResponseTimeRule() {
super();
}
public WeightedResponseTimeRule(ILoadBalancer lb) {
super(lb);
}
@Override
public void setLoadBalancer(ILoadBalancer lb) {
super.setLoadBalancer(lb);
if (lb instanceof BaseLoadBalancer) {
name = ((BaseLoadBalancer) lb).getName();
}
initialize(lb);
}
void initialize(ILoadBalancer lb) {
if (serverWeightTimer != null) {
serverWeightTimer.cancel();
}
serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-"
+ name, true);
serverWeightTimer.schedule(new DynamicServerWeightTask(), 0,
serverWeightTaskTimerInterval);
// do a initial run
ServerWeight sw = new ServerWeight();
sw.maintainWeights();
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
logger
.info("Stopping NFLoadBalancer-serverWeightTimer-"
+ name);
serverWeightTimer.cancel();
}
}));
}
public void shutdown() {
if (serverWeightTimer != null) {
logger.info("Stopping NFLoadBalancer-serverWeightTimer-" + name);
serverWeightTimer.cancel();
}
}
List<Double> getAccumulatedWeights() {
return Collections.unmodifiableList(accumulatedWeights);
}
//選擇server
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE")
@Override
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
return null;
}
Server server = null;
while (server == null) {
//獲取權重值
List<Double> currentWeights = accumulatedWeights;
if (Thread.interrupted()) {
return null;
}
//獲取所有server
List<Server> allList = lb.getAllServers();
int serverCount = allList.size();
if (serverCount == 0) {
return null;
}
int serverIndex = 0;
//權重值集合的最后一個,代表權重值得和
double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1);
// 沒有server或者權重值沒有初始化,退化到 輪詢 rule
if (maxTotalWeight < 0.001d || serverCount != currentWeights.size()) {
server = super.choose(getLoadBalancer(), key);
if(server == null) {
return server;
}
} else {
// 創建一個隨機值,0~maxTotalWeight
double randomWeight = random.nextDouble() * maxTotalWeight;
// pick the server index based on the randomIndex
int n = 0;
//選擇隨機數小于等于權重值的server
for (Double d : currentWeights) {
if (d >= randomWeight) {
serverIndex = n;
break;
} else {
n++;
}
}
server = allList.get(serverIndex);
}
if (server == null) {
/* Transient. */
Thread.yield();
continue;
}
if (server.isAlive()) {
return (server);
}
// Next.
server = null;
}
return server;
}
//權重值更新定時器
class DynamicServerWeightTask extends TimerTask {
public void run() {
ServerWeight serverWeight = new ServerWeight();
try {
serverWeight.maintainWeights();
} catch (Exception e) {
logger.error("Error running DynamicServerWeightTask for {}", name, e);
}
}
}
//權重值更新邏輯
class ServerWeight {
public void maintainWeights() {
//獲取負載均衡器
ILoadBalancer lb = getLoadBalancer();
if (lb == null) {
return;
}
if (!serverWeightAssignmentInProgress.compareAndSet(false, true)) {
return;
}
try {
logger.info("Weight adjusting job started");
AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb;
//負載均衡器的狀態
LoadBalancerStats stats = nlb.getLoadBalancerStats();
if (stats == null) {
// no statistics, nothing to do
return;
}
//計算所有server的平均相應時間的綜合
double totalResponseTime = 0;
for (Server server : nlb.getAllServers()) {
// 獲取服務狀態
ServerStats ss = stats.getSingleServerStat(server);
totalResponseTime += ss.getResponseTimeAvg();
}
// 計算每個server的權重
//weightSoFar + totalResponseTime - server的平均響應時間
Double weightSoFar = 0.0;
List<Double> finalWeights = new ArrayList<Double>();
for (Server server : nlb.getAllServers()) {
ServerStats ss = stats.getSingleServerStat(server);
double weight = totalResponseTime - ss.getResponseTimeAvg();
weightSoFar += weight;
finalWeights.add(weightSoFar);
}
setWeights(finalWeights);
} catch (Exception e) {
logger.error("Error calculating server weights", e);
} finally {
serverWeightAssignmentInProgress.set(false);
}
}
}
void setWeights(List<Double> weights) {
this.accumulatedWeights = weights;
}
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
super.initWithNiwsConfig(clientConfig);
serverWeightTaskTimerInterval = clientConfig.get(WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY, DEFAULT_TIMER_INTERVAL);
}
}
5、ClientConfigEnabledRoundRobinRule
public class ClientConfigEnabledRoundRobinRule extends AbstractLoadBalancerRule {
RoundRobinRule roundRobinRule = new RoundRobinRule();
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
roundRobinRule = new RoundRobinRule();
}
@Override
public void setLoadBalancer(ILoadBalancer lb) {
super.setLoadBalancer(lb);
roundRobinRule.setLoadBalancer(lb);
}
@Override
public Server choose(Object key) {
if (roundRobinRule != null) {
return roundRobinRule.choose(key);
} else {
throw new IllegalArgumentException(
"This class has not been initialized with the RoundRobinRule class");
}
}
}
這個策略什么也沒有做,直接依賴了一個RoundRobinRule,但是它是這些策略的基礎類,提供了一個兜底能力。
BestAvailableRule,PredicateBasedRule,ZoneAvoidanceRule,AvailabilityFilteringRule。
6、BestAvailableRule 最小連接數
public class BestAvailableRule extends ClientConfigEnabledRoundRobinRule {
private LoadBalancerStats loadBalancerStats;
@Override
public Server choose(Object key) {
if (loadBalancerStats == null) {
return super.choose(key);
}
List<Server> serverList = getLoadBalancer().getAllServers();
int minimalConcurrentConnections = Integer.MAX_VALUE;
long currentTime = System.currentTimeMillis();
Server chosen = null;
//遍歷所有server
for (Server server: serverList) {
//獲取server的狀態
ServerStats serverStats = loadBalancerStats.getSingleServerStat(server);
//如果沒有熔斷
if (!serverStats.isCircuitBreakerTripped(currentTime)) {
//當前server連接數
int concurrentConnections = serverStats.getActiveRequestsCount(currentTime);
//判斷當前可用server的連接數是否小于最小連接數
//將最小連接數賦值當前連接數,循環后得到最小連接的server
if (concurrentConnections < minimalConcurrentConnections) {
minimalConcurrentConnections = concurrentConnections;
chosen = server;
}
}
}
//如果沒有選擇出server,則直接調用父類的方法選擇server
if (chosen == null) {
return super.choose(key);
} else {
return chosen;
}
}
@Override
public void setLoadBalancer(ILoadBalancer lb) {
super.setLoadBalancer(lb);
if (lb instanceof AbstractLoadBalancer) {
loadBalancerStats = ((AbstractLoadBalancer) lb).getLoadBalancerStats();
}
}
}
7、PredicateBasedRule 斷言基礎策略
public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule {
//獲取斷言
public abstract AbstractServerPredicate getPredicate();
//選擇server,在過濾后進行輪詢
@Override
public Server choose(Object key) {
ILoadBalancer lb = getLoadBalancer();
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
if (server.isPresent()) {
return server.get();
} else {
return null;
}
}
}
8、AvailabilityFilteringRule 可用性過濾
public class AvailabilityFilteringRule extends PredicateBasedRule {
private AbstractServerPredicate predicate;
public AvailabilityFilteringRule() {
super();
predicate = CompositePredicate.withPredicate(new AvailabilityPredicate(this, null))
.addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
.build();
}
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
predicate = CompositePredicate.withPredicate(new AvailabilityPredicate(this, clientConfig))
.addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
.build();
}
//獲得可用服務的數量
@Monitor(name="AvailableServersCount", type = DataSourceType.GAUGE)
public int getAvailableServersCount() {
ILoadBalancer lb = getLoadBalancer();
List<Server> servers = lb.getAllServers();
if (servers == null) {
return 0;
}
return Collections2.filter(servers, predicate.getServerOnlyPredicate()).size();
}
//用輪詢策略選擇server,再判斷是否可用,最大嘗試十次
@Override
public Server choose(Object key) {
int count = 0;
Server server = roundRobinRule.choose(key);
while (count++ <= 10) {
//predicate.apply 是否不用跳過
if (predicate.apply(new PredicateKey(server))) {
return server;
}
server = roundRobinRule.choose(key);
}
return super.choose(key);
}
@Override
public AbstractServerPredicate getPredicate() {
return predicate;
}
}
9、ZoneAvoidanceRule
public class ZoneAvoidanceRule extends PredicateBasedRule {
private static final Random random = new Random();
private CompositePredicate compositePredicate;
public ZoneAvoidanceRule() {
super();
ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this);
AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this);
compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate);
}
private CompositePredicate createCompositePredicate(ZoneAvoidancePredicate p1, AvailabilityPredicate p2) {
return CompositePredicate.withPredicates(p1, p2)
.addFallbackPredicate(p2)
.addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
.build();
}
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this, clientConfig);
AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this, clientConfig);
compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate);
}
static Map<String, ZoneSnapshot> createSnapshot(LoadBalancerStats lbStats) {
Map<String, ZoneSnapshot> map = new HashMap<String, ZoneSnapshot>();
for (String zone : lbStats.getAvailableZones()) {
ZoneSnapshot snapshot = lbStats.getZoneSnapshot(zone);
map.put(zone, snapshot);
}
return map;
}
static String randomChooseZone(Map<String, ZoneSnapshot> snapshot,
Set<String> chooseFrom) {
if (chooseFrom == null || chooseFrom.size() == 0) {
return null;
}
String selectedZone = chooseFrom.iterator().next();
if (chooseFrom.size() == 1) {
return selectedZone;
}
int totalServerCount = 0;
for (String zone : chooseFrom) {
totalServerCount += snapshot.get(zone).getInstanceCount();
}
int index = random.nextInt(totalServerCount) + 1;
int sum = 0;
for (String zone : chooseFrom) {
sum += snapshot.get(zone).getInstanceCount();
if (index <= sum) {
selectedZone = zone;
break;
}
}
return selectedZone;
}
//獲取可用區域
public static Set<String> getAvailableZones(
Map<String, ZoneSnapshot> snapshot, double triggeringLoad,
double triggeringBlackoutPercentage) {
if (snapshot.isEmpty()) {
return null;
}
//獲取可用區域名稱
Set<String> availableZones = new HashSet<String>(snapshot.keySet());
if (availableZones.size() == 1) {
return availableZones;
}
Set<String> worstZones = new HashSet<String>();
double maxLoadPerServer = 0;
boolean limitedZoneAvailability = false;
//遍歷區域快照
for (Map.Entry<String, ZoneSnapshot> zoneEntry : snapshot.entrySet()) {
//區域名稱
String zone = zoneEntry.getKey();
//區域快照
ZoneSnapshot zoneSnapshot = zoneEntry.getValue();
//區域實例
int instanceCount = zoneSnapshot.getInstanceCount();
//該區域沒有服務,移除
if (instanceCount == 0) {
availableZones.remove(zone);
limitedZoneAvailability = true;
} else {
double loadPerServer = zoneSnapshot.getLoadPerServer();
//該區域服務的負載小于0,或者服務的故障率大于等于閾值,移除
if (((double) zoneSnapshot.getCircuitTrippedCount())
/ instanceCount >= triggeringBlackoutPercentage
|| loadPerServer < 0) {
availableZones.remove(zone);
limitedZoneAvailability = true;
} else {
//選出平均負載最差的區域
if (Math.abs(loadPerServer - maxLoadPerServer) < 0.000001d) {
worstZones.add(zone);
} else if (loadPerServer > maxLoadPerServer) {
maxLoadPerServer = loadPerServer;
worstZones.clear();
worstZones.add(zone);
}
}
}
}
//最大負載小于閾值并且沒有被限制的區域
if (maxLoadPerServer < triggeringLoad && !limitedZoneAvailability) {
// zone override is not needed here
return availableZones;
}
//從最差區域中輪詢一個,然后從可用區域中移除
String zoneToAvoid = randomChooseZone(snapshot, worstZones);
if (zoneToAvoid != null) {
availableZones.remove(zoneToAvoid);
}
return availableZones;
}
//獲取可用的區域
public static Set<String> getAvailableZones(LoadBalancerStats lbStats,
double triggeringLoad, double triggeringBlackoutPercentage) {
if (lbStats == null) {
return null;
}
//生成區域快照
Map<String, ZoneSnapshot> snapshot = createSnapshot(lbStats);
return getAvailableZones(snapshot, triggeringLoad,
triggeringBlackoutPercentage);
}
@Override
public AbstractServerPredicate getPredicate() {
return compositePredicate;
}
}
原文鏈接:https://blog.csdn.net/xuwenjingrenca/article/details/125032956
相關推薦
- 2022-10-23 python如何在一個py文件中獲取另一個py文件中的值(一個或多個)_python
- 2023-11-12 Check failed: top_shape[j] == bottom[i]->shape(j)
- 2023-03-18 C#調用dll報錯:無法加載dll,找不到指定模塊的解決_C#教程
- 2022-04-15 python中對正則表達式re包的簡單引用方式_python
- 2022-12-15 C++進程鏈接工具之通信器詳解_C 語言
- 2022-07-13 nginx-1.20*安裝check模塊
- 2022-08-02 C#5.0中的異步編程關鍵字async和await_C#教程
- 2022-03-03 GitHub 私人private倉庫添加成員(協作者Collaborators)
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支