Yedis 是一款高性能的nosql数据库,旨在能在某些方面替代Redis。它由不著名码农、秦汉史历史学家、本站站长Yebangyu同学在业余时间独立开发完成。
Yedis也同样支持rank功能,但是所使用的数据结构是Treap(Redis使用的是Skip List)。这两种都是概率性的高级数据结构,具体性能依赖于随机数的选择。
那么,什么是Treap呢?
(图片来源于WikiMedia)
简单说来,Treap是这样的树:Treap中的每个节点至少包含key和优先级两个字段,其中key满足搜索树性质,优先级满足堆序性。如上图所示,数字是优先级字段,字母是key字段。其中优先级构成了一个大根堆。
也就是说,在Treap中,key组成了一个二叉搜索树,优先级组成了一个堆。所谓Treap = Tree + Heap。
插入一个节点时,随机生成一个优先级。这时候可能堆序性被破坏,而这可以通过旋转来恢复。由于只可能有左右单旋转两种情形,因此它的代码编写比AVL树、Red-Black Tree要简单的多,并且可以证明旋转的期望次数小于2。
在Treap上的删除、插入、查找的期望时间复杂度都是O($lgn$)。
/*
* treap.h
*/
#ifndef YEDIS_TREAP_H_
#define YEDIS_TREAP_H_
#include <stdlib.h>
#include <time.h>
#define YEDIS_INT32_MAX 2147483647
#define nullptr 0
struct YedisTreapNode
{
int key;//binary search tree
int value;
YedisTreapNode *left;
YedisTreapNode *right;
int priority;//max heap
};
class YedisTreap
{
public:
int init();
YedisTreapNode *find(const int key);
int insert(const int key, const int value);
private:
int insert_(const int key, const int value,YedisTreapNode *&treap);
YedisTreapNode *left_rotation_(YedisTreapNode *k1, YedisTreapNode *k2);
YedisTreapNode *right_rotation_(YedisTreapNode *k1, YedisTreapNode *k2);
private:
YedisTreapNode *root_;
YedisTreapNode *sentinel_;
};
#define INSERT(subtree1, substree2) \
ret = insert_(key, value, treap->subtree1); \
if (treap->subtree1->priority > treap->priority) { \
treap = substree2##_rotation_(treap, treap->subtree1); \
}
int YedisTreap::init()
{
int ret = 0;
sentinel_ = root_ = nullptr;
YedisTreapNode *tmp = nullptr;
if (0 == (tmp = static_cast<YedisTreapNode*>(malloc(sizeof(YedisTreapNode))))) {
ret = -1;
} else {
tmp->key = 0;
tmp->value = 0;
tmp->left = tmp->right = nullptr;
tmp->priority = YEDIS_INT32_MAX;
sentinel_ = tmp;
}
return 0;
}
int YedisTreap::insert(const int key, const int value)
{
int ret = 0;
if (nullptr == sentinel_) {
ret = -2;
} else if (nullptr == root_) {
YedisTreapNode *tmp = static_cast<YedisTreapNode*>(malloc(sizeof(YedisTreapNode)));
if (nullptr == tmp) {
ret = -1;
} else {
tmp->key = key;
tmp->value = value;
tmp->left = tmp->right = nullptr;
tmp->priority = key;
sentinel_->left = tmp;
root_ = tmp;
}
} else {
ret = insert_(key, value, root_);
}
return ret;
}
int YedisTreap::insert_(const int key, const int value, YedisTreapNode *&treap)
{
int ret = 0;
if (nullptr == treap) {
if (nullptr == (treap = static_cast<YedisTreapNode*>(malloc(sizeof(YedisTreapNode))))) {
ret = -1;
} else {
treap->key = key;
treap->value = value;
treap->left = treap->right = nullptr;
srand((int)time(NULL));
treap->priority = (int)rand();
}
} else if(treap->key > key) {
INSERT(left, right);
} else if(treap->key < key) {
INSERT(right, left);
} else if (treap->priority & 1) {
INSERT(right, left);//duplicate elements
} else {
INSERT(left, right);//duplicate elements
}
return ret;
}
YedisTreapNode *YedisTreap::left_rotation_(YedisTreapNode *k1, YedisTreapNode *k2)
{
k1->right = k2->left;
k2->left = k1;
return k2;
}
YedisTreapNode *YedisTreap::right_rotation_(YedisTreapNode *k1, YedisTreapNode *k2)
{
k1->left = k2->right;
k2->right = k1;
return k2;
}
YedisTreapNode *YedisTreap::find(const int key)
{
YedisTreapNode *tmp = root_;
while(tmp) {
if (tmp->key > key) {
tmp = tmp->left;
} else if (tmp->key < key) {
tmp = tmp->right;
} else {
break;
}
}
return tmp;
}
#endif /* YEDIS_TREAP_H_ */
说明:
1,93-95行处理重复key的情形,为了不让某个子树过分倾斜,我们选择根据priority的奇偶性来决定把重复key插入到哪个子树。
2,通过设置一个具有最大优先级的sentinel节点,来简化处理和判断。
3,显然,递归地实现Treap比较方便和容易。
本次PK的对象Skip List的代码,来自我的朋友我的上铺叫路遥之前写的一个 实现
测试程序则是在他之前写的 文件 的基础上稍作修改。(主要是修改为更准确的计时方式)。
测试数据则是随机打乱的10,000,000个元素。
对于Treap的测试程序如下:
/*
* test_treap.cpp
*/
#include "treap.h"
#include <stdlib.h>
#include <time.h>
#include <iostream>
#include <vector>
#include <algorithm>
using namespace std;
#define N 10000000
int main()
{
int i;
int *key = (int*) malloc(N * sizeof(int));
vector<int> tmp;
cout<<"generating data for test..."<<endl;
for (i = 0; i < N; i++) {
tmp.push_back(i);
}
//通过random_shuffle打乱数据
std::random_shuffle(tmp.begin(),tmp.end());
for (i = 0; i < N; i++) {
key[i] = tmp[i];
}
cout<<"generating data for test successfully..."<<endl;
YedisTreap *p = new YedisTreap();
p->init();
timespec t1,t2;
cout<<"starting to insert data to treap"<<endl;
clock_gettime(CLOCK_MONOTONIC, &t1);
for (i = 0; i < N; i++) {
p->insert(key[i], key[i]);
}
clock_gettime(CLOCK_MONOTONIC, &t2);
long long cost1 = (t2.tv_sec - t1.tv_sec) * 1000000000 + t2.tv_nsec - t1.tv_nsec;
cout<<"Insertion takes "<<cost1<<"ns"<<endl;
cout<<"starting to search data in treap"<<endl;
clock_gettime(CLOCK_MONOTONIC, &t1);
for (i = 0; i < N; i++) {
YedisTreapNode *node = p->find(key[i]);
if (!node) cout<<"so bad"<<endl;
}
clock_gettime(CLOCK_MONOTONIC, &t2);
long long cost2 = (t2.tv_sec - t1.tv_sec) * 1000000000 + t2.tv_nsec - t1.tv_nsec;
cout<<"Searching takes "<<cost2<<"ns"<<endl;
return 0;
}
Skip List的测试程序如下:
/*
* test_skiplist.cpp
*/
#include<stdio.h>
#include<stdlib.h>
#include <stdlib.h>
#include <time.h>
#include <iostream>
#include <vector>
#include <algorithm>
#include "skiplist.h"
using namespace std;
#define N 10000000
int main()
{
int i;
int *key = (int*) malloc(N * sizeof(int));
vector<int> tmp;
cout<<"generating data for test..."<<endl;
for (i = 0; i < N; i++) {
tmp.push_back(i);
}
std::random_shuffle(tmp.begin(),tmp.end());
for (i = 0; i < N; i++) {
key[i] = tmp[i];
}
cout<<"generating data for test successfully..."<<endl;
struct skiplist *list = skiplist_new();
cout<<"starting to insert data to skip list"<<endl;
timespec t1,t2;
clock_gettime(CLOCK_MONOTONIC, &t1);
for (i = 0; i < N; i++) {
skiplist_insert(list, key[i], key[i]);
}
clock_gettime(CLOCK_MONOTONIC, &t2);
long long cost1 = (t2.tv_sec - t1.tv_sec) * 1000000000 + t2.tv_nsec - t1.tv_nsec;
cout<<"Insertion takes "<<cost1<<"ns"<<endl;
cout<<"starting to search data in skip list"<<endl;
clock_gettime(CLOCK_MONOTONIC, &t1);
for (i = 0; i < N; i++) {
struct skipnode *node = skiplist_search(list, key[i]);
if (!node) cout<<"so bad"<<endl;
}
clock_gettime(CLOCK_MONOTONIC, &t2);
long long cost2 = (t2.tv_sec - t1.tv_sec) * 1000000000 + t2.tv_nsec - t1.tv_nsec;
cout<<"Searching takes "<<cost2<<"ns"<<endl;
skiplist_delete(list);
return 0;
}
编译链接这两个测试程序时,请记得加 -lrt
选项。
测试环境是Ubuntu 14.04 64位系统 + 16GB内存 + gcc4.8
未开启优化:
SkipList:插入,27783017437ns = 27s,查找,30369272721ns = 30s
Treap:插入,36651505605ns = 36s,查找,15831823811ns = 15s
开启O3优化后:
SkipList:插入,25877643715ns = 25s,查找,28741237934ns = 28s
Treap:插入,35317025584ns = 35s,查找,14883111063ns = 14s
可以看出在查找上,Treap的性能确实是可圈可点的。
1,为什么Treap的查找会这么高效?
针对本次测试数据,对find函数进行统计分析,发现查找的平均深度是29,而lg(10000000) = 23.2。也就是说本次测试时平均深度是$1.26lgn$。BTW,根据分析,AVL树的最大深度不会超过$1.44lgn$,对于Red-Black Tree,这个值大概在$2lg(n+1)$。
2,如何优化Treap的插入性能?
可能可以实施的点包括优化随机数生成程序、将递归改为非递归实现、优化分支预测等。具体怎么做?无从下手?建议您阅读我的这篇博客。
1,Mark Allen Weiss的《Data Structures & Algorithm Analysis in C++》中介绍了Treap,这也是我第一次接触和知道Treap的地方。
2,《Introduction to Algorithms》中在某个章节里,以习题的形式介绍了Treap。
3,之前陈利人童鞋在微博上推荐了某大学的某学生写的Treap资料,他们都说好。我没看,也没兴趣。
我来评几句
登录后评论已发表评论数()