一致性哈希实现对 WebSocket 负载均衡

**目标:**使用 Gateway LoadBalance 实现一致性哈希算法,使得请求负载均衡到 Netty 集群

需求:
① 同一用户的请求都打在同一节点
② 同一空间的用户尽可能与同一节点建立连接
③ 不同热度的空间尽可能在不同的节点上
④ 动态增删节点时,能自动调整长连接到正确的结点上

1. 一致性哈希算法的设计

目标:通过空间 ID、空间热度和各节点负载情况,确定要进行连接建立的结点

使用一致性哈希算法,要求 ① 已经满足了,同一用户的请求一定是打在同一节点

**需求 ②:**同一个空间内的用户经常需要互相获取信息(移动、聊天等),这些信息存储在用户连接的节点上。所以处于同一个空间的用户要尽可能地连接到同一个节点,同一个节点可以直接获取信息,而跨节点的信息则需要进行远程调用会拉低速度,在空间内用户较少时跨空间调用应保证只占少部分(用户多的话需要使用多个节点进行负载了)

**需求 ③:**不同空间的热度也有所不同,有的空间用户多而有的很少。应该尽可能使用户多的空间分配在不同的结点,避免数据倾斜导致负载效果差。且空间的热度也会发生变化,需要实时调整。

1.1. 传统一致性哈希存在的问题

如果应用传统的一致性哈希算法,那么就是使用 空间 ID 求一个哈希值,打到哈希环上,并寻找最近的节点作为目标。

  1. 这种方法只有在节点数量固定时才可以满足需求 1,使得同一空间的用户处在同一节点上

    1. 但也仅仅只会使得同一空间的连接请求都打到同一个节点,而无法负载到其他节点,对于热点空间来说没有起到负载均衡应有的效果
    2. 虽然可以使用带最大负载值的方法来使得跳过到达最大负载值的节点,但这样下来就是打满一个节点再打下一个,负载均衡的效果依旧很差
  2. 更糟糕的是,如果增加节点,那么这个节点要么承载同一个空间的所有节点,要么不承载同一个空间的所有节点,无法对热点空间起到负载均衡的效果,如下图:

    这时候如果新增的节点处在 ① 的位置,那么它将会承载空间 1 所有 600 用户的连接,而 server2-2 完全不会帮忙负载这 600 个用户

    而如果处在 ② 的位置,则不会负载任何用户,没有起到新增节点平摊负载的效果

  3. 无法满足需求 2,仅仅通过对空间 ID 取哈希值,无法确保不同热度的空间负载到不同的节点,如图 1 出现了严重的倾斜

1.2. 空间受理配置代替直接哈希

对于上边提到的问题,总结起来就是以下两点:

  1. 直接通过空间 ID 进行哈希,会导致同一空间所有用户的哈希值都相同,从而使得同一个空间的所有用户只能由同一个节点处理,无法按用户粒度进行负载,也导致了新增节点时没有起到分摊压力的效果,减少节点时下一个节点将承受一整个空间的用户
  2. 直接通过空间 ID 进行哈希无法使用空间当前的人数作为参考,导致没办法将不同热度的空间负载到不同的节点

找出了这两个问题,就可以针对性地进行解决了

  1. 首先,既然空间 ID 的哈希值都相同,那我们就不使用空间 ID 进行哈希了,而是使用用户 ID,这样就能够按用户粒度将用户均摊在哈希环上了

  2. 但是怎么保证同一个空间的连接请求尽可能打在同一个节点上呢?这里我的做法是为每个节点配置可受理的空间列表,如下图:

    为每个节点配置可受理的空间后,用户就不再是寻找下一个节点了,而是寻找下一个可受理要连接的空间的节点,这样依旧可以做到对同一空间的连接都尽可能在同一节点上。
    同时由于使用了用户 ID 进行哈希,用户可以在哈希环均匀地分布了,从而使得同一空间的连接也能负载到不同的节点(如节点 2 和节点 3 都处理了空间 1 的连接)

  3. 为每个节点配置可受理的空间后,需求 2 也可以满足了,只需要将热度不同的空间配置到不同的节点,就可以解决这个问题了。

    由于空间的热度不是一成不变的,有时候某个空间有活动,那么热度就会高一些,而一些原本热度高的也会出现热度下降的情况。这时候可配置的受理空间也能派上用场。当一个空间用户主键增多,现有的节点无法承载时,那么可以使当前负载比较轻松的节点受理该空间,而当空间热度下降时也可以撤销某些节点对该空间的受理

    受理空间的增加和撤销操作,和节点的增加和撤销的处理逻辑基本上是相同的,都是将影响范围内的连接断开,触发客户端的重连机制使得连接作用到新的可受理节点(类比新节点)

    例如空间 1 的热度持续上升,我们可以使节点 1 也受理空间 1,那这时候会导致用户 4 的连接发生变化,服务端主动断开该连接,客户端触发重连之后,连接就到了 server1-2 上,从而完成仅对空间 1 的局部扩容

1.3. 哈希环的实现

各节点和用户都需要保存到哈希环上,需要各采用一个哈希环来保存。环上的各节点需要按哈希值顺序存放,因为有 ”按顺序寻找下一个节点“ 的需求。

对于 排序+检索 场景,效率较高的是使用 红黑树 实现(参考:一致性哈希数据结构选择),目前先使用 Java 的实现(TreeMap,使用扩展的接口 NavigableMap),这样的实现就只能应用到单机的网关,如果网关需要搭建集群,那哈希环就得存放到 Redis 了(使用 zset)

原算法的描述是先有真实节点在哈希环上,再产生多个虚拟节点。但实际实现可以不拘囿于这个描述。结合设计,服务节点需要包含的信息包括了:IP、端口、受理空间、最大负载用户,如果环上每个虚拟节点都保存这些信息就太浪费了,所以我决定将服务实体抽离出来,哈希环上全部方置虚拟节点,通过 IP 找到服务实体。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class ConsistentHashing {
/** 节点哈希环 (hash -> serverId) */
private final NavigableMap<Integer, String> serverRing;
/** 用户哈希环 (hash -> userEntry) */
private final NavigableMap<Integer, UserEntry> userRing;
/** 服务实例 (serverId -> serverEntry) */
private final Map<String, ServerEntry> serverMap;
}

public class ServerEntry {
/** 节点 IP 地址 */
private final String ip;
/** 节点端口 */
private final Integer port;
/** 节点最大负载 */
private Integer maxUserNum;
/** 节点受理的空间 ID 集合 */
private Set<Integer> accessSpaces;

/** 当前节点总用户连接数 */
private Integer userNum;
/** 拥有的所有虚拟节点的哈希值列表 */
private List<Integer> nodeHashList;
}

public class UserEntry {
/** 用户 ID */
private final Integer userId;
/** 所处空间 ID */
private final Integer spaceId;
}

其中服务实体中还额外保存了所有虚拟节点的哈希值列表,用于服务下线时,可以很方便地将该服务的所有虚拟节点从哈希环上清除掉

用户的哈希环上,保存了用户 ID 和空间 ID,在用户状态发生变化时(需要更换连接、用户上下线等)可以使用到

详细的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
/**
* 使用一致性哈希算法,选择一个节点
* @param instances 现有所有实例列表
* @param userId 请求的用户 ID
* @param spaceId 请求的空间 ID
* @return 要提供服务的的节点实例
*/
public ServiceInstance getInstance(List<ServiceInstance> instances, String userId, String spaceId) {
// 对用户 ID 取哈希,放到用户哈希环
int userHash = getHash(String.join("-", userId, spaceId));
int uId = Integer.parseInt(userId);
int sId = Integer.parseInt(spaceId);
userRing.put(userHash, new UserEntry(uId, sId));

// 把遍历到的节点存储起来,防止重复遍历虚拟节点
Set<String> visited = new HashSet<>(serverMap.size());
// 遍历节点哈希环,找到第一个能受理所请求空间的
Map.Entry<Integer, String> ringEntry = serverRing.higherEntry(userHash);
while (ringEntry != null) {
// 获取虚拟节点对应的服务节点 ID
String serverId = ringEntry.getValue();
if (!visited.contains(serverId)) {
visited.add(serverId);
// 找到能受理的节点则退出
ServerEntry serverEntry = serverMap.get(serverId);
if (serverEntry != null && serverEntry.accessSpace(sId)) {
return getInstance(instances, serverEntry);
}
}
// 否则继续寻找下一个节点
ringEntry = serverRing.higherEntry(ringEntry.getKey());
}

// 没找到的话,则尝试寻找第一个(环的概念)
ServerEntry serverEntry = serverMap.get(serverRing.firstEntry().getValue());
if (serverEntry != null && serverEntry.accessSpace(sId)) {
return getInstance(instances, serverEntry);
}

// 否则说明没有任何节点可以受理这个空间,返回空
return null;
}

/**
* 获取服务节点对应的实例
* @param instances 现有所有实例列表
* @param serverEntry 选中的服务节点
* @return 服务节点对应的实例
*/
private ServiceInstance getInstance(List<ServiceInstance> instances, ServerEntry serverEntry) {
// 遍历所有实例,找到 IP 和端口都相同的
for (ServiceInstance instance : instances) {
if (serverEntry.getIp().equals(instance.getHost()) &&
serverEntry.getPort().equals(instance.getPort())) {
return instance;
}
}
// 寻找到目标节点,却在实例中没有找到,说明该服务实际已经下线,补偿一下
serverRemove(serverEntry.getIp(), serverEntry.getPort());
return null;
}

2. 动态扩缩容的设计

一致性哈希算法解决了前三个需求,最后一个动态扩缩容就需要 Nacos 来帮忙了

**需求 ④:**随着用户数量的增多,节点扩容时必须要做的,扩容后部分的用户连接就需要进行重连了,缩容也是一样的道理。而在一致性哈希算法的设计中,每个节点的受理空间也是会发生变化的,这也会导致用户连接发生变化。在这两种情况下要能够无感知地切换用户连接到不同的节点

2.1. Nacos 监听

Nacos 可以监听服务的上下线以及配置的变化,通过创建监听器来确定进行扩缩容的时机。但 Nacos 的下线判断时间较长(15s 健康检查 + 15s 等待,一共需要 30s 才能判断一个节点下线),这部分可以考虑优化,目前就先这样用着吧

在实际的实现中,发现 Nacos 的监听器每次都会返回全量的数据,而不是说哪些发生变化就返回哪些。且事件没有类型,无论是服务上下线了还是元数据改变了,都会触发一次。所以就自己写了个小算法处理了一下,能够准确区分出:节点上线、节点下线和节点元数据改变这三种情况:

其中 serverUrl 为 Nacos 的地址,可以加端口号,没加默认为 8848

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/** 存储最后一次更新的实例状态,用于下一次判断服务更变情况 */
private Map<String, Instance> lastInstanceMap = new HashMap<>(0);
private void serverListener() throws NacosException {
NamingService namingService = NacosFactory.createNamingService(serverUrl);
namingService.subscribe("chatspace-netty", event -> {
// 本次实例情况
List<Instance> instances = ((NamingEvent) event).getInstances();
// 本次服务映射
Map<String, Instance> nowInstanceMap = new HashMap<>(instances.size());

// 筛选出发生改变的服务
List<Instance> newInstances = new ArrayList<>(instances.size());
List<Instance> removedInstances = new ArrayList<>(instances.size());
List<Instance> metadataUpdates = new ArrayList<>(instances.size());
for (Instance i : instances) {
String id = i.getIp() + ":" + i.getPort();
nowInstanceMap.put(id, i);
// 从旧实例中取出并删除
Instance lastI = lastInstanceMap.remove(id);
if (lastI == null) {
// 实例不存在,则为新增的
newInstances.add(i);
} else if (!lastI.getMetadata().equals(i.getMetadata())) {
// 实例存在且元数据发生改变
metadataUpdates.add(i);
}
}
// 最后剩下的实例即为删除的
removedInstances.addAll(lastInstanceMap.values());
lastInstanceMap = nowInstanceMap;

log.info("新增的实例({}):{}", newInstances.size(), newInstances);
log.info("移除的实例({}):{}", removedInstances.size(), removedInstances);
log.info("配置发生变化的实例({}):{}", metadataUpdates.size(), metadataUpdates);

// 更新哈希环
consistentHashing.serverAdd(newInstances);
consistentHashing.serverRemove(removedInstances);
consistentHashing.serverMetadataUpdate(metadataUpdates);
});
}

2.2. 节点扩缩容

如果发生 节点缩容 了,那一定是 Nacos 检测到某个节点已经不可用了。这时候该节点持有的所有用户连接已经全部断开了,所以只需要将该节点从服务实体和哈希环中移除就好(但前边提到过 Nacos 判断节点下线会有 30s 的延迟,这 30s 内,这些用户连接的重连还依旧会达到这个节点上,导致用户连接一直失败,所以在节点下线时会有 30s 的服务不可用时间)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* Netty 节点移除
* @param removeInstances 移除的节点实例列表
*/
public void serverRemove(List<Instance> removeInstances) {
if (removeInstances.isEmpty()) {
return;
}
// 简单地将节点从哈希环中去除即可,由客户端进行重连
removeInstances.forEach(i -> serverRemove(i.getIp(), i.getPort()));
}

/**
* 通过 IP 和端口移除服务节点
* @param ip 节点 IP 地址
* @param port 节点端口号
*/
private void serverRemove(String ip, int port) {
String id = ip + ":" + port;
ServerEntry entry = serverMap.remove(id);
if (entry != null) {
// 从哈希环上移除掉该节点的所有虚拟节点
serverRing.keySet().removeAll(entry.getNodeHashList());
log.info("成功移除节点:{}", id);
} else {
// 没有可移除的
log.error("没有可移除的节点:{}", id);
}
}

如果发生 节点扩容,不仅需要创建服务实体和在哈希环上放置虚拟节点,还需要计算出哪些用户连接因为新节点的出现而需要重新连接。这些用户连接也比较好

2.3. 元数据更改(扩缩容的另一种表现)

2.4. 用户连接无感知切换

3. 用户通讯的设计

4. 自定义负载均衡器

一致性哈希算法设计完成后,就是将其应用到负载均衡器了。这里需要在 Gateway 进行负载均衡时,加入自己的逻辑,方法就是自定义一个负载均衡器覆盖掉默认的

environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME)
返回为null

**解决:**不能将负载均衡器注册为 Bean,而是使用 @LoadBalancerClient@LoadBalancerClients 进行注册