Treap:一种高效的高级数据结构

写在最前

Yedis 是一款高性能的nosql数据库,旨在能在某些方面替代Redis。它由不著名码农、秦汉史历史学家、本站站长Yebangyu同学在业余时间独立开发完成。

Github请访问 这里 ,Python客户端请点击 这里

Treap

Yedis也同样支持rank功能,但是所使用的数据结构是Treap(Redis使用的是Skip List)。这两种都是概率性的高级数据结构,具体性能依赖于随机数的选择。

那么,什么是Treap呢?

(图片来源于WikiMedia)

简单说来,Treap是这样的树:Treap中的每个节点至少包含key和优先级两个字段,其中key满足搜索树性质,优先级满足堆序性。如上图所示,数字是优先级字段,字母是key字段。其中优先级构成了一个大根堆。

也就是说,在Treap中,key组成了一个二叉搜索树,优先级组成了一个堆。所谓Treap = Tree + Heap。

插入一个节点时,随机生成一个优先级。这时候可能堆序性被破坏,而这可以通过旋转来恢复。由于只可能有左右单旋转两种情形,因此它的代码编写比AVL树、Red-Black Tree要简单的多,并且可以证明旋转的期望次数小于2。

在Treap上的删除、插入、查找的期望时间复杂度都是O($lgn$)。

Treap PK SkipList

Treap实现

/*
 * treap.h
*/
#ifndef YEDIS_TREAP_H_
#define YEDIS_TREAP_H_
#include <stdlib.h>
#include <time.h>
#define YEDIS_INT32_MAX 2147483647
#define nullptr 0

struct YedisTreapNode
{
  int key;//binary search tree
  int value;
  YedisTreapNode *left;
  YedisTreapNode *right;
  int priority;//max heap
};

class YedisTreap
{
public:
  int init();
  YedisTreapNode *find(const int key);
  int insert(const int key, const int value);
private:
  int insert_(const int key, const int value,YedisTreapNode *&treap);
  YedisTreapNode *left_rotation_(YedisTreapNode *k1, YedisTreapNode *k2);
  YedisTreapNode *right_rotation_(YedisTreapNode *k1, YedisTreapNode *k2);
private:
  YedisTreapNode *root_;
  YedisTreapNode *sentinel_;
};

#define INSERT(subtree1, substree2) \
   ret = insert_(key, value, treap->subtree1); \
   if (treap->subtree1->priority > treap->priority) { \
     treap = substree2##_rotation_(treap, treap->subtree1); \
   }

 int YedisTreap::init()
 {
   int ret = 0;
   sentinel_ = root_ = nullptr;
   YedisTreapNode *tmp = nullptr;
   if (0 == (tmp = static_cast<YedisTreapNode*>(malloc(sizeof(YedisTreapNode))))) {
     ret = -1;
   } else {
     tmp->key = 0;
     tmp->value = 0;
     tmp->left = tmp->right = nullptr;
     tmp->priority = YEDIS_INT32_MAX;
     sentinel_ = tmp;
   }
   return 0;
 }
 int YedisTreap::insert(const int key, const int value)
 {
   int ret = 0;
   if (nullptr == sentinel_) {
     ret = -2;
   } else if (nullptr == root_) {
     YedisTreapNode *tmp = static_cast<YedisTreapNode*>(malloc(sizeof(YedisTreapNode)));
     if (nullptr == tmp) {
       ret = -1;
     } else {
       tmp->key = key;
       tmp->value = value;
       tmp->left = tmp->right = nullptr;
       tmp->priority = key;
       sentinel_->left = tmp;
       root_ = tmp;
     }
   } else {
     ret = insert_(key, value, root_);
   }
   return ret;
 }
 int YedisTreap::insert_(const int key, const int value, YedisTreapNode *&treap)
 {
   int ret = 0;
   if (nullptr == treap) {
     if (nullptr == (treap = static_cast<YedisTreapNode*>(malloc(sizeof(YedisTreapNode))))) {
       ret = -1;
     } else {
       treap->key = key;
       treap->value = value;
       treap->left = treap->right = nullptr;
       srand((int)time(NULL));
       treap->priority = (int)rand();
     }
   } else if(treap->key > key) {
     INSERT(left, right);
   } else if(treap->key < key) {
     INSERT(right, left);
   } else if (treap->priority & 1) {
     INSERT(right, left);//duplicate elements
   } else {
     INSERT(left, right);//duplicate elements
   }
   return ret;
 }
 YedisTreapNode *YedisTreap::left_rotation_(YedisTreapNode *k1, YedisTreapNode *k2)
 {
   k1->right = k2->left;
   k2->left = k1;
   return k2;
 }
 YedisTreapNode *YedisTreap::right_rotation_(YedisTreapNode *k1, YedisTreapNode *k2)
 {
   k1->left = k2->right;
   k2->right = k1;
   return k2;
 }
 YedisTreapNode *YedisTreap::find(const int key)
 {
   YedisTreapNode *tmp = root_;
   while(tmp) {
     if (tmp->key > key) {
       tmp = tmp->left;
     } else if (tmp->key < key) {
       tmp = tmp->right;
     } else {
       break;
     }
   }
   return tmp;
 }

#endif /* YEDIS_TREAP_H_ */

说明:

1,93-95行处理重复key的情形,为了不让某个子树过分倾斜,我们选择根据priority的奇偶性来决定把重复key插入到哪个子树。

2,通过设置一个具有最大优先级的sentinel节点,来简化处理和判断。

3,显然,递归地实现Treap比较方便和容易。

测试程序

本次PK的对象Skip List的代码,来自我的朋友我的上铺叫路遥之前写的一个 实现

测试程序则是在他之前写的 文件 的基础上稍作修改。(主要是修改为更准确的计时方式)。

测试数据则是随机打乱的10,000,000个元素。

对于Treap的测试程序如下:

/*
 * test_treap.cpp
*/
#include "treap.h"
#include <stdlib.h>
#include <time.h>
#include <iostream>
#include <vector>
#include <algorithm>
using namespace std;
#define N 10000000
int main()
{
  int i;
  int *key = (int*) malloc(N * sizeof(int));
  vector<int> tmp;
  cout<<"generating data for test..."<<endl;
  for (i = 0; i < N; i++) {
    tmp.push_back(i);
  }
  //通过random_shuffle打乱数据
  std::random_shuffle(tmp.begin(),tmp.end());
  for (i = 0; i < N; i++) {
    key[i] = tmp[i];
  }
  cout<<"generating data for test successfully..."<<endl;
  YedisTreap *p = new YedisTreap();
  p->init();
  timespec t1,t2;
  cout<<"starting to insert data to treap"<<endl;
  clock_gettime(CLOCK_MONOTONIC, &t1);
  for (i = 0; i < N; i++) {
    p->insert(key[i], key[i]);
  }
  clock_gettime(CLOCK_MONOTONIC, &t2);
  long long cost1 = (t2.tv_sec - t1.tv_sec) * 1000000000 + t2.tv_nsec - t1.tv_nsec;
  cout<<"Insertion takes "<<cost1<<"ns"<<endl;
  cout<<"starting to search data in treap"<<endl;
  clock_gettime(CLOCK_MONOTONIC, &t1);
  for (i = 0; i < N; i++) {
    YedisTreapNode *node = p->find(key[i]);
    if (!node) cout<<"so bad"<<endl;
  }
  clock_gettime(CLOCK_MONOTONIC, &t2);
  long long cost2 = (t2.tv_sec - t1.tv_sec) * 1000000000 + t2.tv_nsec - t1.tv_nsec;
  cout<<"Searching takes "<<cost2<<"ns"<<endl;
  return 0;
}

Skip List的测试程序如下:

/*
 * test_skiplist.cpp
*/
#include<stdio.h>
#include<stdlib.h>
#include <stdlib.h>
#include <time.h>
#include <iostream>
#include <vector>
#include <algorithm>
#include "skiplist.h"
using namespace std;
#define N 10000000
int main()
{
  int i;
  int *key = (int*) malloc(N * sizeof(int));
  vector<int> tmp;
  cout<<"generating data for test..."<<endl;
  for (i = 0; i < N; i++) {
    tmp.push_back(i);
  }
  std::random_shuffle(tmp.begin(),tmp.end());
  for (i = 0; i < N; i++) {
    key[i] = tmp[i];
  }
  cout<<"generating data for test successfully..."<<endl;
  struct skiplist *list = skiplist_new();
  cout<<"starting to insert data to skip list"<<endl;
  timespec t1,t2;
  clock_gettime(CLOCK_MONOTONIC, &t1);
  for (i = 0; i < N; i++) {
    skiplist_insert(list, key[i], key[i]);
  }
  clock_gettime(CLOCK_MONOTONIC, &t2);
  long long cost1 = (t2.tv_sec - t1.tv_sec) * 1000000000 + t2.tv_nsec - t1.tv_nsec;
  cout<<"Insertion takes "<<cost1<<"ns"<<endl;
  cout<<"starting to search data in skip list"<<endl;
  clock_gettime(CLOCK_MONOTONIC, &t1);
  for (i = 0; i < N; i++) {
    struct skipnode *node = skiplist_search(list, key[i]);
    if (!node) cout<<"so bad"<<endl;
  }
  clock_gettime(CLOCK_MONOTONIC, &t2);
  long long cost2 = (t2.tv_sec - t1.tv_sec) * 1000000000 + t2.tv_nsec - t1.tv_nsec;
  cout<<"Searching takes "<<cost2<<"ns"<<endl;
  skiplist_delete(list);
  return 0;
}

编译链接这两个测试程序时,请记得加 -lrt 选项。

测试结果

测试环境是Ubuntu 14.04 64位系统 + 16GB内存 + gcc4.8

未开启优化:

SkipList:插入,27783017437ns = 27s,查找,30369272721ns = 30s

Treap:插入,36651505605ns = 36s,查找,15831823811ns = 15s

开启O3优化后:

SkipList:插入,25877643715ns = 25s,查找,28741237934ns = 28s

Treap:插入,35317025584ns = 35s,查找,14883111063ns = 14s

可以看出在查找上,Treap的性能确实是可圈可点的。

Further Thinking

1,为什么Treap的查找会这么高效?

针对本次测试数据,对find函数进行统计分析,发现查找的平均深度是29,而lg(10000000) = 23.2。也就是说本次测试时平均深度是$1.26lgn$。BTW,根据分析,AVL树的最大深度不会超过$1.44lgn$,对于Red-Black Tree,这个值大概在$2lg(n+1)$。

2,如何优化Treap的插入性能?

可能可以实施的点包括优化随机数生成程序、将递归改为非递归实现、优化分支预测等。具体怎么做?无从下手?建议您阅读我的这篇博客。

参考文献

1,Mark Allen Weiss的《Data Structures & Algorithm Analysis in C++》中介绍了Treap,这也是我第一次接触和知道Treap的地方。

2,《Introduction to Algorithms》中在某个章节里,以习题的形式介绍了Treap。

3,之前陈利人童鞋在微博上推荐了某大学的某学生写的Treap资料,他们都说好。我没看,也没兴趣。

我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章