时间:2023-04-28 08:29:21来源:搜狐
今天带来负载均衡 策略「负载均衡方案」,关于负载均衡 策略「负载均衡方案」很多人还不知道,现在让我们一起来看看吧!
1 三组概念欢迎大家关注今日头条号「JAVA前线」查看更多精彩分享文章,主要包括源码分析、实际应用、架构思维、职场分享、产品思考
负载均衡、集群容错、服务降级这三个概念在DUBBO中非常重要,同理其它分布式框架也都有相同或者相近之概念。
从调用顺序角度分析,调用顺序依次是负载均衡、集群容错、服务降级。从解决问题角度分析,负载均衡解决了「选哪一个」问题,集群容错解决了「换哪一个」问题,服务降级解决了「全错怎么办」问题。
假设有1个服务消费者面对10个提供者,这时面临第一个问题就是「选哪一个」进行调用,所以负载均衡最先调用,假设选定了5号服务提供者进行服务调用。
假设消费者调用5号提供者发生了超时异常,这时面临第二个问题就是「换哪一个」进行调用:5号超时要不要换1号试一试,或者直接返回不进行重试,所以集群容错第二个调用。
假设已经重试了1号、3号、6号提供者全部超时,这时面临「全错怎么办」这第三个问题,这时可以直接返回一个固定值或者提示文案,所以服务降级第三个调用。
负载均衡作为整个链路第一个节点非常重要,本文结合DUBBO源码分析以下七种负载均衡策略:
简单随机加权随机简单轮询简单加权轮询平滑加权轮询一致性哈希最少活跃数简单随机含义是服务消费者每次会任意访问一个服务提供者,并且从概率角度看每个提供者被访问概率一致,可以通过指定范围随机数实现。第一步编写服务器代码
public class Myserver {private String ip;public MyServer(String ip) {this.ip = ip;}public String getIp() {return ip;}public void setIp(String ip) {this.ip = ip;}}
第二步编写基础负载均衡策略
public abstract class AbstractLoadBalance {public MyServer select(List<MyServer> serverList) {return doSelect(serverList);}public abstract MyServer doSelect(List<MyServer> serverList);}
第三步编写简单随机策略
public class RandomBalance extends AbstractLoadBalance {@Overridepublic MyServer doSelect(List<MyServer> serverList) {// 随机数范围[0,serverListSize)int index = ThreadLocalRandom.current().nextInt(serverList.size());return serverList.get(index);}}
第四步编写测试代码
public class LoadBalanceTest {public static void main(String[] args) {List<MyServer> serverList = buildData();testRandomBalance(serverList);}public static void testRandomBalance(List<MyServer> serverList) {AbstractLoadBalance randomBalance = new RandomBalance();for (int i = 0; i < 10; i) {MyServer server = randomBalance.select(serverList);System.out.println("RandomBalance route server=" server);}}public static List<MyServer> buildData() {List<MyServer> serverList = new ArrayList<MyServer>();MyServer server1 = new MyServer("192.1.1.1");MyServer server2 = new MyServer("192.1.1.2");MyServer server3 = new MyServer("192.1.1.3");serverList.add(server1);serverList.add(server2);serverList.add(server3);return serverList;}}
第五步输出结果,循环次数越多结果越准确
RandomBalance route server=MyServer(ip=192.1.1.2)RandomBalance route server=MyServer(ip=192.1.1.1)RandomBalance route server=MyServer(ip=192.1.1.3)RandomBalance route server=MyServer(ip=192.1.1.2)RandomBalance route server=MyServer(ip=192.1.1.1)RandomBalance route server=MyServer(ip=192.1.1.1)RandomBalance route server=MyServer(ip=192.1.1.2)RandomBalance route server=MyServer(ip=192.1.1.2)RandomBalance route server=MyServer(ip=192.1.1.3)RandomBalance route server=MyServer(ip=192.1.1.3)
加权随机新增了权重概念,假设服务器A权重等于1,服务器B权重等于5,从概率角度看B服务器被访问概率5倍于A服务器。实现按照权重访问有很多种方式,我们选择使用概率区间这个思路。
假设现在有三台服务器,服务器权重分别是3、5、2,那么三者构成如下图概率区间:
假设随机数等于8,其位于[8,9]区间,所以选择server3,下图说明了概率区间计算步骤:
第一步编写服务器代码
public class MyServer {private String ip;private int weight;public MyServer(String ip) {this.ip = ip;}public String getIp() {return ip;}public void setIp(String ip) {this.ip = ip;}public int getWeight() {return weight;}public void setWeight(int weight) {this.weight = weight;}}
第二步编写加权随机策略
public class RandomWeightBalance extends AbstractLoadBalance {@Overridepublic MyServer doSelect(List<MyServer> serverList) {// 所有服务器总权重int totalWeight = 0;// 第一个服务器权重int firstWeight = serverList.get(0).getWeight();// 所有服务器权重相等boolean sameWeight = true;// 遍历所有服务器for (MyServer server : serverList) {// 计算总权重totalWeight= server.getWeight();// 任意一个invoker权重不等于第一个权重则设置sameWeight=falseif (sameWeight && server.getWeight() != firstWeight) {sameWeight = false;}}// 权重不相等则根据权重选择if (!sameWeight) {// 在总区间范围[0,totalWeight)生成随机数AInteger offset = ThreadLocalRandom.current().nextInt(totalWeight);// 遍历所有服务器区间for (MyServer server : serverList) {// 如果A在server区间直接返回if (offset < server.getWeight()) {return server;}// 如果A不在server区间则减去此区间范围并继续匹配其它区间offset -= server.getWeight();}}// 所有服务器权重相等则随机选择return serverList.get(ThreadLocalRandom.current().nextInt(serverList.size()));}}
第三步编写测试代码
public class LoadBalanceTest {public static void main(String[] args) {List<MyServer> serverList = buildData();testRandomWeightBalance(serverList);}public static void testRandomWeightBalance(List<MyServer> serverList) {AbstractLoadBalance randomBalance = new RandomWeightBalance();for (int i = 0; i < 10; i) {MyServer server = randomBalance.select(serverList);System.out.println("RandomWeightBalance route server=" server);}}public static List<MyServer> buildData() {List<MyServer> serverList = new ArrayList<MyServer>();MyServer server1 = new MyServer("192.1.1.1", 3);MyServer server2 = new MyServer("192.1.1.2", 5);MyServer server3 = new MyServer("192.1.1.3", 2);serverList.add(server1);serverList.add(server2);serverList.add(server3);return serverList;}}
第四步输出结果,循环次数越多结果越准确
RandomWeightBalance route server=MyServer(ip=192.1.1.2, weight=2)RandomWeightBalance route server=MyServer(ip=192.1.1.2, weight=3)RandomWeightBalance route server=MyServer(ip=192.1.1.1, weight=3)RandomWeightBalance route server=MyServer(ip=192.1.1.1, weight=3)RandomWeightBalance route server=MyServer(ip=192.1.1.3, weight=2)RandomWeightBalance route server=MyServer(ip=192.1.1.3, weight=2)RandomWeightBalance route server=MyServer(ip=192.1.1.2, weight=5)RandomWeightBalance route server=MyServer(ip=192.1.1.1, weight=3)RandomWeightBalance route server=MyServer(ip=192.1.1.2, weight=5)RandomWeightBalance route server=MyServer(ip=192.1.1.2, weight=5)
public class RandomLoadBalance extends AbstractLoadBalance {public static final String NAME = "random";@Overrideprotected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {// invoker数量int length = invokers.size();// 所有权重是否相等boolean sameWeight = true;// 权重数组int[] weights = new int[length];// 第一个权重int firstWeight = getWeight(invokers.get(0), invocation);weights[0] = firstWeight;// 权重之和int totalWeight = firstWeight;// 遍历所有invokerfor (int i = 1; i < length; i) {// 获取权重int weight = getWeight(invokers.get(i), invocation);weights[i] = weight;// 计算总权重totalWeight= weight;// 任意一个invoker权重不等于第一个权重则设置sameWeight=falseif (sameWeight && weight != firstWeight) {sameWeight = false;}}// 权重不相等则根据权重选择if (totalWeight > 0 && !sameWeight) {int offset = ThreadLocalRandom.current().nextInt(totalWeight);for (int i = 0; i < length; i) {offset -= weights[i];if (offset < 0) {return invokers.get(i);}}}// 所有服务权重相等则随机选择return invokers.get(ThreadLocalRandom.current().nextInt(length));}}
简单轮询含义是服务消费者每次会依次访问一个服务提供者,并且从概率角度看每个提供者被访问概率一致,可以通过原子变量累加实现。第一步编写简单轮询策略
public class RoundRobinBalance extends AbstractLoadBalance {private AtomicInteger atomicIndex = new AtomicInteger(0);@Overridepublic MyServer doSelect(List<MyServer> serverList) {// atomicIndex自增大于服务器数量时取余int index = atomicIndex.getAndIncrement() % serverList.size();return serverList.get(index);}}
第二步编写测试代码
public class LoadBalanceTest {public static void main(String[] args) {List<MyServer> serverList = buildData();testRoundRobinBalance(serverList);}public static void testRoundRobinBalance(List<MyServer> serverList) {AbstractLoadBalance roundRobinBalance = new RoundRobinBalance();for (int i = 0; i < 10; i) {MyServer server = roundRobinBalance.select(serverList);System.out.println("RoundRobinBalance route server=" server);}}public static List<MyServer> buildData() {List<MyServer> serverList = new ArrayList<MyServer>();MyServer server1 = new MyServer("192.1.1.1");MyServer server2 = new MyServer("192.1.1.2");MyServer server3 = new MyServer("192.1.1.3");serverList.add(server1);serverList.add(server2);serverList.add(server3);return serverList;}}
第三步输出结果
RoundRobinBalance route server=MyServer(ip=192.1.1.1)RoundRobinBalance route server=MyServer(ip=192.1.1.2)RoundRobinBalance route server=MyServer(ip=192.1.1.3)RoundRobinBalance route server=MyServer(ip=192.1.1.1)RoundRobinBalance route server=MyServer(ip=192.1.1.2)RoundRobinBalance route server=MyServer(ip=192.1.1.3)RoundRobinBalance route server=MyServer(ip=192.1.1.1)RoundRobinBalance route server=MyServer(ip=192.1.1.2)RoundRobinBalance route server=MyServer(ip=192.1.1.3)RoundRobinBalance route server=MyServer(ip=192.1.1.1)
简单加权轮询新增了权重概念,假设服务器A权重等于1,服务器B权重等于5,从概率角度看B服务器被访问概率5倍于A服务器,我们还是使用概率区间这个思路,相较于加权随机会有一些变化。
第一步编写简单加权轮询策略
public class RoundRobinWeightBalance1 extends AbstractLoadBalance {private AtomicInteger atomicIndex = new AtomicInteger(0);@Overridepublic MyServer doSelect(List<MyServer> serverList) {int totalWeight = 0;int firstWeight = serverList.get(0).getWeight();boolean sameWeight = true;for (MyServer server : serverList) {totalWeight= server.getWeight();if (sameWeight && server.getWeight() != firstWeight) {sameWeight = false;}}if (!sameWeight) {// 自增方式计算offsetint offset = atomicIndex.getAndIncrement() % totalWeight;for (MyServer server : serverList) {if (offset < server.getWeight()) {return server;}offset -= server.getWeight();}}int index = atomicIndex.getAndIncrement() % serverList.size();return serverList.get(index);}}
第二步编写测试代码
public class LoadBalanceTest {public static void main(String[] args) {List<MyServer> serverList = buildData();testRoundRobinWeightBalance1(serverList);}public static void testRoundRobinWeightBalance1(List<MyServer> serverList) {AbstractLoadBalance roundRobinBalance = new RoundRobinWeightBalance1();for (int i = 0; i < 10; i) {MyServer server = roundRobinBalance.select(serverList);System.out.println("RoundRobinWeightBalance1 route server=" server);}}public static List<MyServer> buildData() {List<MyServer> serverList = new ArrayList<MyServer>();MyServer server1 = new MyServer("192.1.1.1", 3);MyServer server2 = new MyServer("192.1.1.2", 5);MyServer server3 = new MyServer("192.1.1.3", 2);serverList.add(server1);serverList.add(server2);serverList.add(server3);return serverList;}}
第三步输出结果
RoundRobinWeightBalance1 route server=MyServer(ip=192.1.1.1, weight=3)RoundRobinWeightBalance1 route server=MyServer(ip=192.1.1.1, weight=3)RoundRobinWeightBalance1 route server=MyServer(ip=192.1.1.1, weight=3)RoundRobinWeightBalance1 route server=MyServer(ip=192.1.1.2, weight=5)RoundRobinWeightBalance1 route server=MyServer(ip=192.1.1.2, weight=5)RoundRobinWeightBalance1 route server=MyServer(ip=192.1.1.2, weight=5)RoundRobinWeightBalance1 route server=MyServer(ip=192.1.1.2, weight=5)RoundRobinWeightBalance1 route server=MyServer(ip=192.1.1.2, weight=5)RoundRobinWeightBalance1 route server=MyServer(ip=192.1.1.3, weight=2)RoundRobinWeightBalance1 route server=MyServer(ip=192.1.1.3, weight=2)
简单加权轮询有什么问题?我们分析其输出结果发现,连续3次访问服务器1,连续5次访问服务器2,连续2次访问服务器3,所以简单加权轮询策略会导致请求集中问题。
RoundRobinWeightBalance1 route server=MyServer(ip=192.1.1.1, weight=3)RoundRobinWeightBalance1 route server=MyServer(ip=192.1.1.1, weight=3)RoundRobinWeightBalance1 route server=MyServer(ip=192.1.1.1, weight=3)RoundRobinWeightBalance1 route server=MyServer(ip=192.1.1.2, weight=5)RoundRobinWeightBalance1 route server=MyServer(ip=192.1.1.2, weight=5)RoundRobinWeightBalance1 route server=MyServer(ip=192.1.1.2, weight=5)RoundRobinWeightBalance1 route server=MyServer(ip=192.1.1.2, weight=5)RoundRobinWeightBalance1 route server=MyServer(ip=192.1.1.2, weight=5)RoundRobinWeightBalance1 route server=MyServer(ip=192.1.1.3, weight=2)RoundRobinWeightBalance1 route server=MyServer(ip=192.1.1.3, weight=2)
所以需要使用平滑加权轮询策略,将请求比较均匀地分散至各个服务器,下图说明了计算步骤:
第一步编写服务器代码
public class MyServer {private String ip;private int weight;private int currentWeight = 0;public MyServer(String ip) {this.ip = ip;}public MyServer(String ip, int weight) {this.ip = ip;this.weight = weight;}public int getWeight() {return weight;}public void setWeight(int weight) {this.weight = weight;}public int getCurrentWeight() {return currentWeight;}public void setCurrentWeight(int currentWeight) {this.currentWeight = currentWeight;}public String getIp() {return ip;}public void setIp(String ip) {this.ip = ip;}}
第二步编写平滑加权轮询策略
public class RoundRobinWeightBalance2 extends AbstractLoadBalance {private AtomicInteger atomicIndex = new AtomicInteger(0);@Overridepublic MyServer doSelect(List<MyServer> serverList) {int totalWeight = 0;int firstWeight = serverList.get(0).getWeight();boolean sameWeight = true;for (MyServer server : serverList) {totalWeight= server.getWeight();if (sameWeight && server.getWeight() != firstWeight) {sameWeight = false;}// 设置动态权重 -> currentWeight= weightserver.setCurrentWeight(server.getCurrentWeight() server.getWeight());}if (!sameWeight) {// 最大动态权重服务器 -> max(currentWeight)MyServer maxCurrentWeightServer = serverList.stream().max((s1, s2) -> s1.getCurrentWeight() - s2.getCurrentWeight()).get();// 设置最大动态权重 -> max(currentWeight) - totalWeightmaxCurrentWeightServer.setCurrentWeight(maxCurrentWeightServer.getCurrentWeight() - totalWeight);// 返回最大动态权重服务器return maxCurrentWeightServer;}// 权重相同依次轮询int index = atomicIndex.getAndIncrement() % serverList.size();return serverList.get(index);}}
第三步编写测试代码
public class LoadBalanceTest {public static void main(String[] args) {List<MyServer> serverList = buildData();testRoundRobinWeightBalance2(serverList);}public static void testRoundRobinWeightBalance2(List<MyServer> serverList) {AbstractLoadBalance roundRobinBalance = new RoundRobinWeightBalance2();for (int i = 0; i < 10; i) {MyServer server = roundRobinBalance.select(serverList);System.out.println("RoundRobinWeightBalance2 route server=" server);}}public static List<MyServer> buildData() {List<MyServer> serverList = new ArrayList<MyServer>();MyServer server1 = new MyServer("192.1.1.1", 3);MyServer server2 = new MyServer("192.1.1.2", 5);MyServer server3 = new MyServer("192.1.1.3", 2);serverList.add(server1);serverList.add(server2);serverList.add(server3);return serverList;}}
第四步输出结果
RoundRobinWeightBalance2 route server=MyServer(ip=192.1.1.2, weight=5, currentWeight=-5)RoundRobinWeightBalance2 route server=MyServer(ip=192.1.1.1, weight=3, currentWeight=-4)RoundRobinWeightBalance2 route server=MyServer(ip=192.1.1.3, weight=2, currentWeight=-4)RoundRobinWeightBalance2 route server=MyServer(ip=192.1.1.2, weight=5, currentWeight=0)RoundRobinWeightBalance2 route server=MyServer(ip=192.1.1.1, weight=3, currentWeight=-5)RoundRobinWeightBalance2 route server=MyServer(ip=192.1.1.2, weight=5, currentWeight=0)RoundRobinWeightBalance2 route server=MyServer(ip=192.1.1.2, weight=5, currentWeight=-5)RoundRobinWeightBalance2 route server=MyServer(ip=192.1.1.3, weight=2, currentWeight=-4)RoundRobinWeightBalance2 route server=MyServer(ip=192.1.1.1, weight=3, currentWeight=-3)RoundRobinWeightBalance2 route server=MyServer(ip=192.1.1.2, weight=5, currentWeight=0)
public class RoundRobinLoadBalance extends AbstractLoadBalance {public static final String NAME = "roundrobin";private static int RECYCLE_PERIOD = 60000;protected static class WeightedRoundRobin {// 权重private int weight;// 动态权重private AtomicLong current = new AtomicLong(0);// 更新时间private long lastUpdate;public int getWeight() {return weight;}public void setWeight(int weight) {this.weight = weight;current.set(0);}public long increaseCurrent() {return current.addAndGet(weight);}public void sel(int total) {current.addAndGet(-1 * total);}public long getLastUpdate() {return lastUpdate;}public void setLastUpdate(long lastUpdate) {this.lastUpdate = lastUpdate;}}private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>();private AtomicBoolean updateLock = new AtomicBoolean();protected <T> Collection<String> getInvokerAddrList(List<Invoker<T>> invokers, Invocation invocation) {String key = invokers.get(0).getUrl().getServiceKey() "." invocation.getMethodName();Map<String, WeightedRoundRobin> map = methodWeightMap.get(key);if (map != null) {return map.keySet();}return null;}@Overrideprotected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {String key = invokers.get(0).getUrl().getServiceKey() "." invocation.getMethodName();ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);if (map == null) {methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>());map = methodWeightMap.get(key);}// 总权重int totalWeight = 0;// 最大当前权重long maxCurrent = Long.MIN_VALUE;// 当前时间long now = System.currentTimeMillis();// 选中提供者Invoker<T> selectedInvoker = null;// 选中提供者权重对象WeightedRoundRobin selectedWRR = null;// 遍历所有提供者for (Invoker<T> invoker : invokers) {String identifyString = invoker.getUrl().toIdentityString();WeightedRoundRobin weightedRoundRobin = map.get(identifyString);// 获取权重int weight = getWeight(invoker, invocation);if (weightedRoundRobin == null) {weightedRoundRobin = new WeightedRoundRobin();weightedRoundRobin.setWeight(weight);map.putIfAbsent(identifyString, weightedRoundRobin);}if (weight != weightedRoundRobin.getWeight()) {weightedRoundRobin.setWeight(weight);}// 选择动态权重最大提供者long cur = weightedRoundRobin.increaseCurrent();weightedRoundRobin.setLastUpdate(now);if (cur > maxCurrent) {maxCurrent = cur;selectedInvoker = invoker;selectedWRR = weightedRoundRobin;}// 计算总权重totalWeight= weight;}// 更新负载均衡容器if (!updateLock.get() && invokers.size() != map.size()) {if (updateLock.compareAndSet(false, true)) {try {ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<String, WeightedRoundRobin>();newMap.putAll(map);Iterator<Entry<String, WeightedRoundRobin>> it = newMap.entrySet().iterator();while (it.hasNext()) {Entry<String, WeightedRoundRobin> item = it.next();if (now - item.getValue().getLastUpdate() > RECYCLE_PERIOD) {it.remove();}}methodWeightMap.put(key, newMap);} finally {updateLock.set(false);}}}if (selectedInvoker != null) {// 最大动态权重减去总权重selectedWRR.sel(totalWeight);return selectedInvoker;}return invokers.get(0);}}
一致性哈希策略具有三个显著特性:第一是在不新增或者删除提供者情况下,同一个客户端总是可以访问到同一个提供者。第二是一致性哈希可以有效分散新增或者删除服务提供者带来的波动性。第三是一致性哈希虚拟节点可以更加有效分散特性二之波动性。
在不新增或者删除提供者情况下,同一个客户端总是可以访问同一个提供者。第一步6个提供者分布在哈希环中,第二步client1发起访问请求,此时计算客户端哈希值位于哈希环位置,第三步沿着哈希环顺时针旋转,找到与客户端哈希值最近的提供者server5。当哈希环结构不发生改变时,client1总是路由到server5。
哈希环应该选择什么数据结构呢?我们可以选择TreeMap构建哈希环,其底层使用了红黑树。
public class TreeMapTest {public static void main(String[] args) {// 构建哈希环TreeMap<Integer, MyServer> treeMap = new TreeMap<Integer, MyServer>();treeMap.put(1, new MyServer("1"));treeMap.put(2, new MyServer("2"));treeMap.put(3, new MyServer("3"));treeMap.put(4, new MyServer("4"));treeMap.put(5, new MyServer("5"));treeMap.put(6, new MyServer("6"));// 找到第一个大于客户端哈希值的服务器Integer clientHashCode = 5;SortedMap<Integer, MyServer> tailMap = treeMap.tailMap(clientHashCode, false);MyServer server = tailMap.get(tailMap.firstKey());// MyServer(ip=6)System.out.println(server);}}
一致性哈希可以有效分散新增或者删除提供者带来的波动性,例如新增服务器server7,但是并不影响client1路由结果:
服务器server5发生宕机只会影响client1路由结果,并不会影响其它客户端路由结果:
一致性哈希虚拟节点可以更加有效分散特性二之波动性,例如我们可以为每个服务器节点新增一个虚拟节点,使得服务器分散得更加均匀:
第一步编写基础负载均衡策略
public abstract class AbstractConsistentHashLoadBalance {public MyServer select(String clientIP, List<MyServer> serverList) {return doSelect(clientIP, serverList);}public abstract MyServer doSelect(String clientIP, List<MyServer> serverList);}
第二步编写一致性哈希策略
public class ConsistentHashBalance1 extends AbstractConsistentHashLoadBalance {private ConsistentHashSelector consistentHashSelector;@Overridepublic MyServer doSelect(String clientIP, List<MyServer> serverList) {initialConsistentHashSelector(serverList);return consistentHashSelector.select(clientIP);}private class ConsistentHashSelector {private Integer identityHashCode;private TreeMap<Integer /* hashcode */, MyServer> serverNodes = new TreeMap<Integer, MyServer>();// 构建哈希环public ConsistentHashSelector(Integer identityHashCode, List<MyServer> serverList) {this.identityHashCode = identityHashCode;TreeMap<Integer, MyServer> newServerNodes = new TreeMap<Integer, MyServer>();for (MyServer server : serverList) {newServerNodes.put(hashCode(server.getIp()), server);}serverNodes = newServerNodes;}// 根据客户端IP路由public MyServer select(String clientIP) {// 计算客户端哈希值int clientHashCode = hashCode(clientIP);// 找到第一个大于客户端哈希值的服务器SortedMap<Integer, MyServer> tailMap = serverNodes.tailMap(clientHashCode, false);if (CollectionUtils.isEmpty(tailMap)) {Integer firstKey = serverNodes.firstKey();return serverNodes.get(firstKey);}// 找不到表示在最后一个节点和第一个节点之间 ->选择第一个节点Integer firstKey = tailMap.firstKey();return tailMap.get(firstKey);}// 计算哈希值private int hashCode(String key) {return Objects.hashCode(key);}// 提供者列表哈希值 -> 如果新增或者删除提供者会发生变化public Integer getIdentityHashCode() {return identityHashCode;}}private void initialConsistentHashSelector(List<MyServer> serverList) {// 计算提供者列表哈希值Integer newIdentityHashCode = System.identityHashCode(serverList);// 提供者列表哈希值没有变化则无需重新构建哈希环if (null != consistentHashSelector && (null != consistentHashSelector.getIdentityHashCode() && newIdentityHashCode == consistentHashSelector.getIdentityHashCode())) {return;}// 提供者列表哈希值发生变化则重新构建哈希环consistentHashSelector = new ConsistentHashSelector(newIdentityHashCode, serverList);}}
第三步编写测试代码
public class LoadBalanceTest {public static void main(String[] args) {testConsistentHashBalance1();}public static void testConsistentHashBalance1() {List<MyServer> serverList = new ArrayList<MyServer>();MyServer server1 = new MyServer("1");MyServer server2 = new MyServer("2");MyServer server3 = new MyServer("3");MyServer server4 = new MyServer("4");MyServer server5 = new MyServer("5");MyServer server6 = new MyServer("6");serverList.add(server1);serverList.add(server2);serverList.add(server3);serverList.add(server4);serverList.add(server5);serverList.add(server6);AbstractConsistentHashLoadBalance consistentHashBalance = new ConsistentHashBalance1();for (int i = 0; i < 10; i) {String clientIP = "5";MyServer server = consistentHashBalance.select(clientIP, serverList);System.out.println("clientIP=" clientIP ",consistentHashBalance1 route server=" server);}}}
第四步输出结果
clientIP=5,consistentHashBalance1 route server=MyServer(ip=6)clientIP=5,consistentHashBalance1 route server=MyServer(ip=6)clientIP=5,consistentHashBalance1 route server=MyServer(ip=6)clientIP=5,consistentHashBalance1 route server=MyServer(ip=6)clientIP=5,consistentHashBalance1 route server=MyServer(ip=6)clientIP=5,consistentHashBalance1 route server=MyServer(ip=6)clientIP=5,consistentHashBalance1 route server=MyServer(ip=6)clientIP=5,consistentHashBalance1 route server=MyServer(ip=6)clientIP=5,consistentHashBalance1 route server=MyServer(ip=6)clientIP=5,consistentHashBalance1 route server=MyServer(ip=6)
如果新增虚拟节点参看以下代码
public class ConsistentHashBalance2 extends AbstractConsistentHashLoadBalance {private ConsistentHashSelector consistentHashSelector;@Overridepublic MyServer doSelect(String clientIP, List<MyServer> serverList) {initialSelector(serverList);return consistentHashSelector.select(clientIP);}private class ConsistentHashSelector {private Integer identityHashCode;private Integer VIRTUAL_NODES_NUM = 16;private TreeMap<Integer /* hashcode */, MyServer> serverNodes = new TreeMap<Integer, MyServer>();public ConsistentHashSelector(Integer identityHashCode, List<MyServer> serverList) {this.identityHashCode = identityHashCode;TreeMap<Integer, MyServer> newServerNodes = new TreeMap<Integer, MyServer>();for (MyServer server : serverList) {// 虚拟节点for (int i = 0; i < VIRTUAL_NODES_NUM; i) {int virtualKey = hashCode(server.getIp() "_" i);newServerNodes.put(virtualKey, server);}}serverNodes = newServerNodes;}public MyServer select(String clientIP) {int clientHashCode = hashCode(clientIP);SortedMap<Integer, MyServer> tailMap = serverNodes.tailMap(clientHashCode, false);if (CollectionUtils.isEmpty(tailMap)) {Integer firstKey = serverNodes.firstKey();return serverNodes.get(firstKey);}Integer firstKey = tailMap.firstKey();return tailMap.get(firstKey);}private int hashCode(String key) {return Objects.hashCode(key);}public Integer getIdentityHashCode() {return identityHashCode;}}private void initialSelector(List<MyServer> serverList) {Integer newIdentityHashCode = System.identityHashCode(serverList);if (null != consistentHashSelector && (null != consistentHashSelector.getIdentityHashCode() && newIdentityHashCode == consistentHashSelector.getIdentityHashCode())) {return;}consistentHashSelector = new ConsistentHashSelector(newIdentityHashCode, serverList);}}
public class ConsistentHashLoadBalance extends AbstractLoadBalance {public static final String NAME = "consistenthash";private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();@Overrideprotected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {String methodName = RpcUtils.getMethodName(invocation);String key = invokers.get(0).getUrl().getServiceKey() "." methodName;int identityHashCode = System.identityHashCode(invokers);ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);// 提供者列表哈希值发生变化则重新构建哈希环if (selector == null || selector.identityHashCode != identityHashCode) {selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, identityHashCode));selector = (ConsistentHashSelector<T>) selectors.get(key);}return selector.select(invocation);}private static final class ConsistentHashSelector<T> {private final TreeMap<Long, Invoker<T>> virtualInvokers;private final int replicaNumber;private final int identityHashCode;private final int[] argumentIndex;// 构建哈希环ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {this.virtualInvokers = new TreeMap<Long, Invoker<T>>();this.identityHashCode = identityHashCode;URL url = invokers.get(0).getUrl();this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));argumentIndex = new int[index.length];for (int i = 0; i < index.length; i) {argumentIndex[i] = Integer.parseInt(index[i]);}for (Invoker<T> invoker : invokers) {String address = invoker.getUrl().getAddress();// 新增虚拟节点(默认160个)for (int i = 0; i < replicaNumber / 4; i) {byte[] digest = md5(address i);for (int h = 0; h < 4; h) {long m = hash(digest, h);virtualInvokers.put(m, invoker);}}}}// 负载均衡public Invoker<T> select(Invocation invocation) {String key = toKey(invocation.getArguments());byte[] digest = md5(key);return selectForKey(hash(digest, 0));}private String toKey(Object[] args) {StringBuilder buf = new StringBuilder();for (int i : argumentIndex) {if (i >= 0 && i < args.length) {buf.append(args[i]);}}return buf.toString();}private Invoker<T> selectForKey(long hash) {Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash);if (entry == null) {entry = virtualInvokers.firstEntry();}return entry.getValue();}// 哈希运算private long hash(byte[] digest, int number) {return (((long) (digest[3 number * 4] & 0xFF) << 24)| ((long) (digest[2 number * 4] & 0xFF) << 16)| ((long) (digest[1 number * 4] & 0xFF) << 8)| (digest[number * 4] & 0xFF)) & 0xFFFFFFFFL;}private byte[] md5(String value) {MessageDigest md5;try {md5 = MessageDigest.getInstance("MD5");} catch (NoSuchAlgorithmException e) {throw new IllegalStateException(e.getMessage(), e);}md5.reset();byte[] bytes = value.getBytes(StandardCharsets.UTF_8);md5.update(bytes);return md5.digest();}}}
每个提供者维护并发处理的任务个数,任务个数越大活跃度越高。在服务消费者进行负载均衡时,第一查询提供者负载情况,第二选择活跃度最低的提供者,我们直接分析DUBBO源码:
public class LeastActiveLoadBalance extends AbstractLoadBalance {public static final String NAME = "leastactive";@Overrideprotected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {// invoker数量int length = invokers.size();// 最小调用次数int leastActive = -1;// 调用次数等于最小次数invoker数量int leastCount = 0;// 调用次数等于最小调用次数invoker下标集合int[] leastIndexes = new int[length];// 每个服务提供者权重int[] weights = new int[length];// 总权重int totalWeight = 0;// 第一个调用者权重int firstWeight = 0;// 权重值是否相同boolean sameWeight = true;// 遍历invokersfor (int i = 0; i < length; i) {Invoker<T> invoker = invokers.get(i);// 调用次数int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();// 获取权重int afterWarmup = getWeight(invoker, invocation);// 设置权重weights[i] = afterWarmup;// 第一个invoker或者调用次数小于最小调用次数if (leastActive == -1 || active < leastActive) {leastActive = active;leastCount = 1;leastIndexes[0] = i;totalWeight = afterWarmup;firstWeight = afterWarmup;sameWeight = true;}// 当前服务提供者调用次数等于最小调用次数else if (active == leastActive) {// 记录下标leastIndexes[leastCount] = i;// 新增总权重值totalWeight= afterWarmup;// 权重值是否相同if (sameWeight && i > 0 && afterWarmup != firstWeight) {sameWeight = false;}}}// 只有一个invoker调用次数等于最小调用次数直接返回if (leastCount == 1) {return invokers.get(leastIndexes[0]);}// 多个invoker调用次数等于最小调用次数并且权重值不相同->根据权重值选择if (!sameWeight && totalWeight > 0) {int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);for (int i = 0; i < leastCount; i) {int leastIndex = leastIndexes[i];offsetWeight -= weights[leastIndex];if (offsetWeight < 0) {return invokers.get(leastIndex);}}}// 多个invoker调用次数等于最小调用次数并且权重值相同->随机选择return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);}}
第一本文首先分析了负载均衡、集群容错、服务降级这三组概念,第二结合代码分析了简单随机,加权随机,简单轮询,简单加权轮询,平滑加权轮询,一致性哈希,最少活跃数七种负载均衡策略,其中权重计算、平滑加权轮询,一致性哈希算法值得注意,希望本文对大家有所帮助。
欢迎大家关注今日头条号「JAVA前线」查看更多精彩分享文章,主要包括源码分析、实际应用、架构思维、职场分享、产品思考
声明:文章仅代表原作者观点,不代表本站立场;如有侵权、违规,可直接反馈本站,我们将会作修改或删除处理。
图文推荐
2023-01-16 12:37:52
2023-01-13 17:30:49
2023-01-01 10:53:04
2023-01-01 10:47:31
2023-01-01 10:17:20
2023-01-01 10:11:57
热点排行
精彩文章
2023-01-01 09:47:20
2023-01-01 09:41:40
2023-01-01 08:53:19
2022-12-31 19:05:28
2022-12-31 18:47:03
2022-12-31 18:41:02