背景
有这么一道比较有意思的面试题:“Redis 的有序集合底层为什么要用跳表”
Redis 的有序集合
Redis 有个比较常用的数据结构叫有序集合(sorted set,简称 zset),正如其名它是一个可以保证有序且元素唯一的集合,所以它经常用于排行榜等需要进行统计排列的场景。
因为设计者考虑到 Redis 数据存放于内存,为了节约宝贵的内存空间,在有序集合元素小于 64 字节且个数小于 128 的时候,会使用 ziplist,而这个阈值的默认值的设置就来自下面这两个配置项。
zset-max-ziplist-value 64
zset-max-ziplist-entries 128
一旦有序集合中的某个元素超出这两个其中的一个阈值它就会转为 skiplist(实际是 dict+skiplist,还会借用字典来提高获取指定元素的效率)。
也就是说,ZSet 有两种不同的实现,分别是 ziplist 和 skiplist,具体使用哪种结构进行存储的规则如下:
- 当有序集合对象同时满足以下两个条件时,使用 ziplist:
- ZSet 保存的键值对数量少于 128 个;
- 每个元素的长度小于 64 字节。
- 如果不满足上述两个条件,那么使用 skiplist
跳表
有序链表在添加、查询、删除的平均时间复杂都都是 O(n) 即线性增长,所以一旦节点数量达到一定体量后其性能表现就会非常差劲。而跳表我们完全可以理解为在原始链表基础上,建立多级索引,通过多级索引检索定位将增删改查的时间复杂度变为 O(log n) 。可能这里说的有些抽象,我们举个例子,以下图跳表为例,其原始链表存储按序存储 1-10,有 2 级索引,每级索引的索引个数都是基于下层元素个数的一半。
查询节点
假如我们需要查询元素 6,其工作流程如下:
- 从 2 级索引开始,先来到节点 4。
- 查看 4 的后继节点,是 8 的 2 级索引,这个值大于 6,说明 2 级索引后续的索引都是大于 6 的,没有再往后搜寻的必要
- 我们索引向下查找。来到 4 的 1 级索引,比对其后继节点为 6,查找结束。
相较于原始有序链表需要 6 次,我们的跳表通过建立多级索引,我们只需两次就直接定位到了目标元素,其查寻的复杂度被直接优化为O(log n)。
添加节点
假如我们需要在这个有序集合中添加一个元素 7,那么我们就需要通过跳表找到小于元素 7 的最大值,也就是元素 6 的位置,将其插入到元素 6 的后面,让元素 6 的索引指向新插入的节点 7,其工作流程如下:
- 从 2 级索引开始定位到了元素 4 的索引。
- 查看索引 4 的后继索引为 8,索引向下推进。
- 来到 1 级索引,发现索引 4 后继索引为 6,小于插入元素 7,指针推进到索引 6 位置。- 继续比较 6 的后继节点为索引 8,大于元素 7,索引继续向下。
- 最终我们来到 6 的原始节点,发现其后继节点为 7,指针没有继续向下的空间,自此我们可知元素 6 就是小于插入元素 7 的最大值,于是便将元素 7 插入。
索引高度
理想情况是每一层索引是下一层元素个数的二分之一,所以假设元素个数为 n,那么对应 k 层索引的元素个数 r 计算公式为: r=n/2^k
而 Redis 又是内存数据库,我们假设元素最大个数是65536,我们把65536代入上述公式可知最大高度为 16。所以我们建议添加一个元素后为其建立的索引高度不超过 16。
因为我们要求尽可能保证每一个上级索引都是下级索引的一半,在实现高度生成算法时,我们可以这样设计:
- 跳表的高度计算从原始链表开始,即默认情况下插入的元素的高度为 1,代表没有索引,只有元素节点。
- 设计一个为插入元素生成节点索引高度 level 的方法。
- 进行一次随机运算,随机数值范围为 0-1 之间。
- 如果随机数大于 0.5 则为当前元素添加一级索引,自此我们保证生成一级索引的概率为 50% ,这也就保证了 1 级索引理想情况下只有一半的元素会生成索引。
- 同理后续每次随机算法得到的值大于 0.5 时,我们的索引高度就加 1,这样就可以保证节点生成的 2 级索引概率为 25% ,3 级索引为 12.5% ……
删除节点
假设我们这里要删除元素 10,我们必须定位到当前跳表各层元素小于 10 的最大值,索引执行步骤为:
- 2 级索引 4 的后继节点为 8,指针推进。
- 索引 8 无后继节点,该层无要删除的元素,指针直接向下。
- 1 级索引 8 后继节点为 10,说明 1 级索引 8 在进行删除时需要将自己的指针和 1 级索引 10 断开联系,将 10 删除。
- 1 级索引完成定位后,指针向下,后继节点为 9,指针推进。
- 9 的后继节点为 10,同理需要让其指向 null,将 10 删除。
代码实现
public class SkipList {
/**
* 跳表索引最大高度为16
*/
private static final int MAX_LEVEL = 16;
/**
* 每个节点添加一层索引高度的概率为二分之一
*/
private static final float PROB = 0.5 f;
/**
* 默认情况下的高度为1,即只有自己一个节点
*/
private int levelCount = 1;
/**
* 跳表最底层的节点,即头节点
*/
private Node h = new Node();
public SkipList() {}
public class Node {
private int data = -1;
/**
*
*/
private Node[] forwards = new Node[MAX_LEVEL];
private int maxLevel = 0;
@Override
public String toString() {
return "Node{" +
"data=" + data +
", maxLevel=" + maxLevel +
'}';
}
}
public void add(int value) {
//随机生成高度
int level = randomLevel();
Node newNode = new Node();
newNode.data = value;
newNode.maxLevel = level;
//创建一个node数组,用于记录小于当前value的最大值
Node[] maxOfMinArr = new Node[level];
//默认情况下指向头节点
for (int i = 0; i < level; i++) {
maxOfMinArr[i] = h;
}
//基于上述结果拿到当前节点的后继节点
Node p = h;
for (int i = level - 1; i >= 0; i--) {
while (p.forwards[i] != null && p.forwards[i].data < value) {
p = p.forwards[i];
}
maxOfMinArr[i] = p;
}
//更新前驱节点的后继节点为当前节点newNode
for (int i = 0; i < level; i++) {
newNode.forwards[i] = maxOfMinArr[i].forwards[i];
maxOfMinArr[i].forwards[i] = newNode;
}
//如果当前newNode高度大于跳表最高高度则更新levelCount
if (levelCount < level) {
levelCount = level;
}
}
/**
* 理论来讲,一级索引中元素个数应该占原始数据的 50%,二级索引中元素个数占 25%,三级索引12.5% ,一直到最顶层。
* 因为这里每一层的晋升概率是 50%。对于每一个新插入的节点,都需要调用 randomLevel 生成一个合理的层数。
* 该 randomLevel 方法会随机生成 1~MAX_LEVEL 之间的数,且 :
* 50%的概率返回 1
* 25%的概率返回 2
* 12.5%的概率返回 3 ...
* @return
*/
private int randomLevel() {
int level = 1;
while (Math.random() > PROB && level < MAX_LEVEL) {
++level;
}
return level;
}
public Node get(int value) {
Node p = h;
//找到小于value的最大值
for (int i = levelCount - 1; i >= 0; i--) {
while (p.forwards[i] != null && p.forwards[i].data < value) {
p = p.forwards[i];
}
}
//如果p的前驱节点等于value则直接返回
if (p.forwards[0] != null && p.forwards[0].data == value) {
return p.forwards[0];
}
return null;
}
/**
* 删除
*
* @param value
*/
public void delete(int value) {
Node p = h;
//找到各级节点小于value的最大值
Node[] updateArr = new Node[levelCount];
for (int i = levelCount - 1; i >= 0; i--) {
while (p.forwards[i] != null && p.forwards[i].data < value) {
p = p.forwards[i];
}
updateArr[i] = p;
}
//查看原始层节点前驱是否等于value,若等于则说明存在要删除的值
if (p.forwards[0] != null && p.forwards[0].data == value) {
//从最高级索引开始查看其前驱是否等于value,若等于则将当前节点指向value节点的后继节点
for (int i = levelCount - 1; i >= 0; i--) {
if (updateArr[i].forwards[i] != null && updateArr[i].forwards[i].data == value) {
updateArr[i].forwards[i] = updateArr[i].forwards[i].forwards[i];
}
}
}
//从最高级开始查看是否有一级索引为空,若为空则层级减1
while (levelCount > 1 && h.forwards[levelCount - 1] == null) {
levelCount--;
}
}
public void printAll() {
Node p = h;
//基于最底层的非索引层进行遍历,只要后继节点不为空,则速速出当前节点,并移动到后继节点
while (p.forwards[0] != null) {
System.out.println(p.forwards[0]);
p = p.forwards[0];
}
}
}