跳表实现


背景

有这么一道比较有意思的面试题:“Redis 的有序集合底层为什么要用跳表”

Redis 的有序集合

Redis 有个比较常用的数据结构叫有序集合(sorted set,简称 zset),正如其名它是一个可以保证有序且元素唯一的集合,所以它经常用于排行榜等需要进行统计排列的场景。

因为设计者考虑到 Redis 数据存放于内存,为了节约宝贵的内存空间,在有序集合元素小于 64 字节且个数小于 128 的时候,会使用 ziplist,而这个阈值的默认值的设置就来自下面这两个配置项。

zset-max-ziplist-value 64
zset-max-ziplist-entries 128

一旦有序集合中的某个元素超出这两个其中的一个阈值它就会转为 skiplist(实际是 dict+skiplist,还会借用字典来提高获取指定元素的效率)。

也就是说,ZSet 有两种不同的实现,分别是 ziplist 和 skiplist,具体使用哪种结构进行存储的规则如下:

  • 当有序集合对象同时满足以下两个条件时,使用 ziplist:
    • ZSet 保存的键值对数量少于 128 个;
    • 每个元素的长度小于 64 字节。
  • 如果不满足上述两个条件,那么使用 skiplist

跳表

有序链表在添加、查询、删除的平均时间复杂都都是 O(n) 即线性增长,所以一旦节点数量达到一定体量后其性能表现就会非常差劲。而跳表我们完全可以理解为在原始链表基础上,建立多级索引,通过多级索引检索定位将增删改查的时间复杂度变为 O(log n) 。可能这里说的有些抽象,我们举个例子,以下图跳表为例,其原始链表存储按序存储 1-10,有 2 级索引,每级索引的索引个数都是基于下层元素个数的一半。

查询节点

假如我们需要查询元素 6,其工作流程如下:

  • 从 2 级索引开始,先来到节点 4。
  • 查看 4 的后继节点,是 8 的 2 级索引,这个值大于 6,说明 2 级索引后续的索引都是大于 6 的,没有再往后搜寻的必要
  • 我们索引向下查找。来到 4 的 1 级索引,比对其后继节点为 6,查找结束。

相较于原始有序链表需要 6 次,我们的跳表通过建立多级索引,我们只需两次就直接定位到了目标元素,其查寻的复杂度被直接优化为O(log n)。

添加节点

假如我们需要在这个有序集合中添加一个元素 7,那么我们就需要通过跳表找到小于元素 7 的最大值,也就是元素 6 的位置,将其插入到元素 6 的后面,让元素 6 的索引指向新插入的节点 7,其工作流程如下:

  • 从 2 级索引开始定位到了元素 4 的索引。
  • 查看索引 4 的后继索引为 8,索引向下推进。
  • 来到 1 级索引,发现索引 4 后继索引为 6,小于插入元素 7,指针推进到索引 6 位置。- 继续比较 6 的后继节点为索引 8,大于元素 7,索引继续向下。
  • 最终我们来到 6 的原始节点,发现其后继节点为 7,指针没有继续向下的空间,自此我们可知元素 6 就是小于插入元素 7 的最大值,于是便将元素 7 插入。

索引高度

理想情况是每一层索引是下一层元素个数的二分之一,所以假设元素个数为 n,那么对应 k 层索引的元素个数 r 计算公式为: r=n/2^k

而 Redis 又是内存数据库,我们假设元素最大个数是65536,我们把65536代入上述公式可知最大高度为 16。所以我们建议添加一个元素后为其建立的索引高度不超过 16。

因为我们要求尽可能保证每一个上级索引都是下级索引的一半,在实现高度生成算法时,我们可以这样设计:

  • 跳表的高度计算从原始链表开始,即默认情况下插入的元素的高度为 1,代表没有索引,只有元素节点。
  • 设计一个为插入元素生成节点索引高度 level 的方法。
  • 进行一次随机运算,随机数值范围为 0-1 之间。
  • 如果随机数大于 0.5 则为当前元素添加一级索引,自此我们保证生成一级索引的概率为 50% ,这也就保证了 1 级索引理想情况下只有一半的元素会生成索引。
  • 同理后续每次随机算法得到的值大于 0.5 时,我们的索引高度就加 1,这样就可以保证节点生成的 2 级索引概率为 25% ,3 级索引为 12.5% ……

删除节点

假设我们这里要删除元素 10,我们必须定位到当前跳表各层元素小于 10 的最大值,索引执行步骤为:

  • 2 级索引 4 的后继节点为 8,指针推进。
  • 索引 8 无后继节点,该层无要删除的元素,指针直接向下。
  • 1 级索引 8 后继节点为 10,说明 1 级索引 8 在进行删除时需要将自己的指针和 1 级索引 10 断开联系,将 10 删除。
  • 1 级索引完成定位后,指针向下,后继节点为 9,指针推进。
  • 9 的后继节点为 10,同理需要让其指向 null,将 10 删除。

代码实现

public class SkipList {

    /**
     * 跳表索引最大高度为16
     */
    private static final int MAX_LEVEL = 16;

    /**
     * 每个节点添加一层索引高度的概率为二分之一
     */
    private static final float PROB = 0.5 f;

    /**
     * 默认情况下的高度为1,即只有自己一个节点
     */
    private int levelCount = 1;

    /**
     * 跳表最底层的节点,即头节点
     */
    private Node h = new Node();

    public SkipList() {}

    public class Node {
        private int data = -1;
        /**
         *
         */
        private Node[] forwards = new Node[MAX_LEVEL];
        private int maxLevel = 0;

        @Override
        public String toString() {
            return "Node{" +
                "data=" + data +
                ", maxLevel=" + maxLevel +
                '}';
        }
    }

    public void add(int value) {

        //随机生成高度
        int level = randomLevel();

        Node newNode = new Node();
        newNode.data = value;
        newNode.maxLevel = level;

        //创建一个node数组,用于记录小于当前value的最大值
        Node[] maxOfMinArr = new Node[level];
        //默认情况下指向头节点
        for (int i = 0; i < level; i++) {
            maxOfMinArr[i] = h;
        }

        //基于上述结果拿到当前节点的后继节点
        Node p = h;
        for (int i = level - 1; i >= 0; i--) {
            while (p.forwards[i] != null && p.forwards[i].data < value) {
                p = p.forwards[i];
            }
            maxOfMinArr[i] = p;
        }

        //更新前驱节点的后继节点为当前节点newNode
        for (int i = 0; i < level; i++) {
            newNode.forwards[i] = maxOfMinArr[i].forwards[i];
            maxOfMinArr[i].forwards[i] = newNode;
        }

        //如果当前newNode高度大于跳表最高高度则更新levelCount
        if (levelCount < level) {
            levelCount = level;
        }

    }

    /**
     * 理论来讲,一级索引中元素个数应该占原始数据的 50%,二级索引中元素个数占 25%,三级索引12.5% ,一直到最顶层。
     * 因为这里每一层的晋升概率是 50%。对于每一个新插入的节点,都需要调用 randomLevel 生成一个合理的层数。
     * 该 randomLevel 方法会随机生成 1~MAX_LEVEL 之间的数,且 :
     * 50%的概率返回 1
     * 25%的概率返回 2
     *  12.5%的概率返回 3 ...
     * @return
     */
    private int randomLevel() {
        int level = 1;
        while (Math.random() > PROB && level < MAX_LEVEL) {
            ++level;
        }
        return level;
    }

    public Node get(int value) {
        Node p = h;
        //找到小于value的最大值
        for (int i = levelCount - 1; i >= 0; i--) {
            while (p.forwards[i] != null && p.forwards[i].data < value) {
                p = p.forwards[i];
            }
        }
        //如果p的前驱节点等于value则直接返回
        if (p.forwards[0] != null && p.forwards[0].data == value) {
            return p.forwards[0];
        }

        return null;
    }

    /**
     * 删除
     *
     * @param value
     */
    public void delete(int value) {
        Node p = h;
        //找到各级节点小于value的最大值
        Node[] updateArr = new Node[levelCount];
        for (int i = levelCount - 1; i >= 0; i--) {
            while (p.forwards[i] != null && p.forwards[i].data < value) {
                p = p.forwards[i];
            }
            updateArr[i] = p;
        }
        //查看原始层节点前驱是否等于value,若等于则说明存在要删除的值
        if (p.forwards[0] != null && p.forwards[0].data == value) {
            //从最高级索引开始查看其前驱是否等于value,若等于则将当前节点指向value节点的后继节点
            for (int i = levelCount - 1; i >= 0; i--) {
                if (updateArr[i].forwards[i] != null && updateArr[i].forwards[i].data == value) {
                    updateArr[i].forwards[i] = updateArr[i].forwards[i].forwards[i];
                }
            }
        }

        //从最高级开始查看是否有一级索引为空,若为空则层级减1
        while (levelCount > 1 && h.forwards[levelCount - 1] == null) {
            levelCount--;
        }

    }

    public void printAll() {
        Node p = h;
        //基于最底层的非索引层进行遍历,只要后继节点不为空,则速速出当前节点,并移动到后继节点
        while (p.forwards[0] != null) {
            System.out.println(p.forwards[0]);
            p = p.forwards[0];
        }

    }

}

Author: stream
Reprint policy: All articles in this blog are used except for special statements CC BY 4.0 reprint policy. If reproduced, please indicate source stream !
  TOC