基于跳表实现的键值型存储引擎

加油伙计!

基于跳表实现的键值型存储引擎

1 项目说明

Key-Value 存储引擎 🌻

基于 跳表 实现的 KV 存储引擎,使用 C++ 实现。在随机读写情况下,该项目每秒可处理请求数(QPS):24.39 W,每秒可处理读请求数:18.41 W。(项目作者说明,但笔者测试不相同)

提供接口

🔹 insert_element 插入数据

🔸 delete_element 删除数据

🔹 search_element 查询数据

🔸 update_element 更新数据 (笔者添加)

🔹 display_list 打印跳跃表

🔸 dump_file 数据落盘

🔹 load_file 加载数据

🔸 size 返回数据规模

🔹 clear 清空跳表 (笔者添加)

项目地址Skiplist-CPP: A tiny KV storage based on skiplist written in C++ language

补充说明 :该项目作者:程序员 Carl,公众号:代码随想录。笔者对其代码进行学习,并尝试修改。修改的内容见下文。


2 项目代码

所有实现代码位于 skiplist.h 头文件中,CPP 文件中 include 它就可以使用。内部关键在于定义了 SkipList 结构,以及 Node 结构。

2.1 头文件

1
2
3
4
5
6
#include <iostream> 
#include <cstdlib>
#include <cmath>
#include <cstring>
#include <mutex>
#include <fstream>

2.2 基本定义

1
2
3
#define STORE_FILE "store/dumpFile"
std::mutex mtx, mtx1;
std::string delimiter = ":";

说明 🌞

🔹 宏定义数据落盘位置,以及加载数据位置;

🔸 定义互斥量锁,用于写操作中的临界区;

🔹 定义分隔符,用于加载数据是识别 KeyValue


2.3 Node

2.3.1 Node 定义

Node 结构是 SkipList 实现的基础。其定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
template<typename K, typename V> 
class Node {
// ~ Node 节点定义
public:
// 构造函数
Node() {}
Node(K k, V v, int);
// 析构函数
~Node();
// 键值对操作相关的成员函数
K get_key() const; // 取 Key
V get_value() const; // 取 value
void set_value(V); // 设定 value

// 前向指针数组,内部存前向指针,指向下一个 Node
Node<K, V> **forward;
// 节点层数
int node_level;
private:
// 数据成员
K key;
V value;
};

说明 🌞

🔹 私有数据成员 keyvalue ,公有数据成员 forwardnode_level

🔸 成员函数:构造函数,析构函数,键值对操作函数

注意 🙋‍♂

forward 是一个指针数组,其元素是指向 Node 的指针。


2.3.2 构造函数和析构函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
template<typename K, typename V> 
Node<K, V>::Node(const K k, const V v, int level) {
this->key = k;
this->value = v;
this->node_level = level;
// forward 大小为 level + 1
this->forward = new Node<K, V>*[level + 1]; // 分配内存
memset(this->forward, 0, sizeof(Node<K, V>*)*(level + 1)); // 初始化
};

template<typename K, typename V>
Node<K, V>::~Node() {
delete []forward; // 释放内存
};

2.3.3 键值对操作函数

1
2
3
4
5
6
7
8
9
10
11
12
13
template<typename K, typename V> 
K Node<K, V>::get_key() const {
return key;
};

template<typename K, typename V>
V Node<K, V>::get_value() const {
return value;
};
template<typename K, typename V>
void Node<K, V>::set_value(V value) {
this->value = value;
};

2.4 SkipList

2.4.1 SkipList 定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
template <typename K, typename V> 
class SkipList {
public:
// 构造和析构函数,创建 Node 节点函数
SkipList(int);
~SkipList();
Node<K, V>* create_node(K, V, int);

// 增删改查操作函数
int insert_element(K, V); // 增
int delete_element(K); // 删
int update_element(K, V, bool); // 改
bool search_element(K); // 查

void display_list(); // 打印跳表
void clear(); // 清空跳表
int size(); // 返回跳表节点数(不包含头节点)

// 数据落盘和数据加载
void dump_file();
void load_file();

// 获得随机层高函数
int get_random_level();

private:
// 数据加载相关函数, 用来区分 key 和 value
void get_key_value_from_string(const std::string& str, std::string* key, std::string* value);
bool is_valid_string(const std::string& str);

private:
int _max_level; // 跳表层数上限
int _skip_list_level; // 当前跳表的最高层
int _element_count; // 跳表中节点数
Node<K, V> *_header; // 跳表中头节点指针

// file operator
std::ofstream _file_writer;
std::ifstream _file_reader;
};

说明 🌞

🔹 私有数据成员 _max_level, _skip_list_level, _element_count, header, _file_writer, file_reader

🔸 成员函数:构造函数,析构函数,创建节点函数,增删改查操作函数,清空跳表,数据落盘和加载数据及其相关函数,打印跳表函数,返回跳表节点数函数,获得随机层高函数 。


2.4.2 构造函数、析构函数、节点构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
template<typename K, typename V> 
SkipList<K, V>::SkipList(int max_level) {
this->_max_level = max_level;
this->_skip_list_level = 0;
this->_element_count = 0;
K k; V v;
this->_header = new Node<K, V>(k, v, _max_level);
};

template<typename K, typename V>
SkipList<K, V>::~SkipList() {

if (_file_writer.is_open()) {
_file_writer.close();
}
if (_file_reader.is_open()) {
_file_reader.close();
}
delete _header;
}

template<typename K, typename V>
Node<K, V>* SkipList<K, V>::create_node(const K k, const V v, int level) {
Node<K, V> *n = new Node<K, V>(k, v, level);
return n;
}

2.4.3 获得随机层高函数以及返回跳表大小函数

1
2
3
4
5
6
7
8
9
10
11
12
13
template<typename K, typename V>
int SkipList<K, V>::get_random_level(){
int k = 1;
while (rand() % 2) {
k++;
}
k = (k < _max_level) ? k : _max_level; // 最大不超过上限
return k;
};
template<typename K, typename V>
int SkipList<K, V>::size() {
return _element_count; // 会随着节点的添加, 删除, 更新改变
}

注意 🙋 层高的选择一般根据 幂次定律 (power law),越大的数出现的概率越小 。


2.4.4 insert_element()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
template<typename K, typename V>
int SkipList<K, V>::insert_element(const K key, const V value) {
mtx.lock(); // 写操作,加锁
Node<K, V> *current = this->_header; // 从头节点遍历

// update 是一个指针数组,数组内存放指针,指向 node 节点,其索引代表层
Node<K, V> *update[_max_level + 1]; // update 的大小 >= forward
memset(update, 0, sizeof(Node<K, V>*)*(_max_level + 1)); // 初始化

// 从最高层开始遍历
for(int i = _skip_list_level; i >= 0; i--) {
// 只要当前节点非空,且 key 小于目标, 就会向后遍历
while(current->forward[i] != NULL && current->forward[i]->get_key() < key) {
current = current->forward[i]; // 节点向后移动
}
update[i] = current; // update[i] 记录当前层最后符合要求的节点
}

// 遍历到 level 0 说明到达最底层了,forward[0]指向的就是跳表下一个邻近节点
current = current->forward[0];
// 注意此时 current->get_key() >= key !!!

// 1. 插入元素已经存在
if (current != NULL && current->get_key() == key) {
std::cout << "key: " << key << ", exists" << std::endl;
mtx.unlock();
return -1; // 插入元素已经存在,返回 -1,插入失败
}
// 2. 如果当前 current 不存在,或者 current->get_key > key
if (current == NULL || current->get_key() != key ) {
// 随机生成层的高度,也即 forward[] 大小
int random_level = get_random_level();

// 如果新添加的节点层高大于当前跳表层高,则需要更新 update 数组
// 将原本[_skip_list_level random_level]范围内的NULL改为_header
if (random_level > _skip_list_level) {
for (int i = _skip_list_level + 1; i < random_level + 1; i++) {
update[i] = _header;
}
_skip_list_level = random_level; // 最后更新跳表层高
}
// 创建节点,并进行插入操作
Node<K, V>* inserted_node = create_node(key, value, random_level);
// 该操作等价于:
// new_node->next = pre_node->next;
// pre_node->next = new_node; 只不过是逐层进行
for (int i = 0; i <= random_level; i++) {
inserted_node->forward[i] = update[i]->forward[i];
update[i]->forward[i] = inserted_node;
}
std::cout << "Successfully inserted key: " << key << ", value: " << value << std::endl;
_element_count ++; // 更新节点数
}
mtx.unlock();
return 0; // 返回 0,插入成功
}

说明 🌞 :代码详细解释见注释。

注意 🙋‍♂

🔸 update 数组存在于所有 操作中,起至关重要的作用。update[i] 记录第 i 层最后符合要求的节点

🔹 所有 操作(增删改)都需要判断,1️⃣ 当前 key 存在,2️⃣ 当前 key 不存在,这两种情况。


2.4.5 delete_element()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
template<typename K, typename V> 
int SkipList<K, V>::delete_element(K key) {
// 笔者修改了其返回值,如果返回 0 删除成功,返回 -1 删除失败
// 操作同 insert_element
mtx.lock();
Node<K, V> *current = this->_header;
Node<K, V> *update[_max_level + 1];
memset(update, 0, sizeof(Node<K, V>*)*(_max_level + 1));

for (int i = _skip_list_level; i >= 0; i--) {
while (current->forward[i] !=NULL && current->forward[i]->get_key() < key) {
current = current->forward[i];
}
update[i] = current;
}
current = current->forward[0];

// 1. 非空,且 key 为目标值
if (current != NULL && current->get_key() == key) {
// 从最底层开始删除 update->forward 指向的节点,即目标节点
for (int i = 0; i <= _skip_list_level; i++) {
// 如果 update[i] 已经不指向 current 说明 i 的上层也不会指向 current
// 也说明了被删除节点层高 i - 1。直接退出循环即可
if (update[i]->forward[i] != current)
break;
// 删除操作,等价于 node->next = node->next->next
update[i]->forward[i] = current->forward[i];
}
// 因为可能删除的元素它的层数恰好是当前跳跃表的最大层数
// 所以此时需要重新确定 _skip_list_level,通过头节点判断
while (_skip_list_level > 0 && _header->forward[_skip_list_level] == 0) {
_skip_list_level --;
}

std::cout << "Successfully deleted key : "<< key << std::endl;
_element_count --;
mtx.unlock();
return 0; // 返回值 0 说明成功删除
}
// 2. 笔者添加了没有该键时的情况,打印输出提示
else {
std::cout << key << " is not exist, please check your input !\n";
mtx.unlock();
return -1; // 返回值 -1 说明没有该键值
}
}

说明 🌞 :代码详细解释见注释。


2.4.6 update_element()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 笔者添加了修改值的操作
// 1. 如果当前键存在,更新值
// 2. 如果当前键不存在,通过 flag 指示是否创建该键 (默认false)
// 2.1 flag = true :创建 key value
// 2.2 flag = false : 返回键不存在
// 返回值 1 表示更新成功, 返回值 0 表示创建成功, 返回值 -1 表示更新失败且创建失败
template<typename K, typename V>
int SkipList<K, V>::update_element(const K key, const V value, bool flag = false) {
// 同 insert,delete 操作
mtx1.lock(); // 插入操作,加锁, 使用 mtx1
Node<K, V> *current = this->_header;
Node<K, V> *update[_max_level + 1];
memset(update, 0, sizeof(Node<K, V>*)*(_max_level + 1));
for(int i = _skip_list_level; i >= 0; i--) {
while(current->forward[i] != NULL && current->forward[i]->get_key() < key) {
current = current->forward[i];
}
update[i] = current;
}
current = current->forward[0];
// 1. 插入元素已经存在
if (current != NULL && current->get_key() == key) {
std::cout << "key: " << key << ", exists" << std::endl;
std::cout << "old value : " << current->get_value() << " --> "; // ~ 打印 old value
current->set_value(value); // 重新设置 value, 并打印输出。
std::cout << "new value : " << current->get_value() << std::endl;
mtx1.unlock();
return 1; // 插入元素已经存在,只是修改操作,返回 1 说明更新成功
}
// 2. 如果插入的元素不存在
// 2.1 flag = true,允许更新创建操作,则使用 insert_element 添加
if (flag) {
SkipList<K, V>::insert_element(key, value);
mtx1.unlock();
return 0; // 说明 key 不存在,但是创建了它
}
// 2.1 flag = false, 不允许更新创建操作, 打印提示信息
else {
std::cout << key << " is not exist, please check your input !\n";
mtx1.unlock();
return -1; // 表示 key 不存在,并且不被允许创建
}
}

说明 🌞 :代码详细解释见注释。

注意 🙋‍♂ :笔者添加了修改值的操作,并提供了两种类型的操作

🔹 硬更新:如果键不存在,直接打印提示信息 flag = false

🔸 软更新:如果键不存在,允许创建它 flag = true


2.4.7 search_element()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
template<typename K, typename V> 
bool SkipList<K, V>::search_element(K key) {
std::cout << "search_element..." << std::endl;
Node<K, V> *current = _header;
// 从最高层开始遍历,找到最底层中最后一个满足小于key的节点
for (int i = _skip_list_level; i >= 0; i--) {
while (current->forward[i] && current->forward[i]->get_key() < key) {
current = current->forward[i];
}
}
current = current->forward[0]; // 该操作后 current->get_key >= key 或者 null
// 找到
if (current != NULL && current->get_key() == key) {
std::cout << "Found key: " << key << ", value: " << current->get_value() << std::endl;
return true;
}
// 没找到
std::cout << "Not Found Key: " << key << std::endl;
return false;
}

说明 🌞 :代码详细解释见注释。


2.4.8 打印跳表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
template<typename K, typename V> 
void SkipList<K, V>::display_list() {
std::cout << "\n******** Skip List ********"<<"\n";
// 逐层打印
for (int i = 0; i <= _skip_list_level; i++) {
Node<K, V> *node = this->_header->forward[i];
std::cout << "Level " << i << ": ";
while (node != NULL) {
std::cout << node->get_key() << ":" << node->get_value() << ";";
node = node->forward[i];
}
std::cout << std::endl;
}
}

2.4.9 数据落盘、数据加载函数及其辅助函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// Dump data in memory to file 
template<typename K, typename V>
void SkipList<K, V>::dump_file() {
std::cout << "dump_file..." << std::endl;
_file_writer.open(STORE_FILE); // 打开文件,写操作
Node<K, V> *node = this->_header->forward[0];
// 只写入键值对,放弃层信息
while (node != NULL) {
// 文件写入(key value 以 : 为分隔符),及信息打印
_file_writer << node->get_key() << ":" << node->get_value() << "\n";
std::cout << node->get_key() << ":" << node->get_value() << ";\n";
node = node->forward[0];
}
_file_writer.flush();
_file_writer.close();
return ;
}

// Load data from disk
template<typename K, typename V>
void SkipList<K, V>::load_file() {
_file_reader.open(STORE_FILE); // 打开文件,读操作
std::cout << "load_file..." << std::endl;
std::string line;
// key 与 value 是一个指向 string 对象的指针
std::string* key = new std::string();
std::string* value = new std::string();
while (getline(_file_reader, line)) { // 一行一行写入
get_key_value_from_string(line, key, value); // 辅助函数
if (key->empty() || value->empty()) {
continue;
}
// 重新载入过程使用 insert_element()
// 所以层之间的关系(各节点的层高)可能发生变化, 所以与之前的SkipList不同
insert_element(*key, *value);
std::cout << "key:" << *key << "value:" << *value << std::endl;
}
_file_reader.close();
}
// 辅助函数
template<typename K, typename V>
void SkipList<K, V>::get_key_value_from_string(const std::string& str, std::string* key, std::string* value) {
if(!is_valid_string(str)) {
return;
}
// 分隔符之前的为 key, 分隔符之后的为 value
*key = str.substr(0, str.find(delimiter));
*value = str.substr(str.find(delimiter) + 1, str.length());
}

template<typename K, typename V>
bool SkipList<K, V>::is_valid_string(const std::string& str) {

if (str.empty()) {
return false;
}
// 没有发现分隔符
if (str.find(delimiter) == std::string::npos) {
return false;
}
return true;
}

说明 🌞 :代码详细解释见注释。


2.4.10 clear()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
template<typename K, typename V> 
void SkipList<K, V>::clear() {
std::cout << "clear ..." << std::endl;
Node<K, V> *node = this->_header->forward[0];
// 删除节点
while (node != NULL) {
Node<K, V> *temp = node;
node = node->forward[0];
delete temp;
}
// 重新初始化 _header
for (int i = 0; i <= _max_level; i++) {
this->_header->forward[i] = 0;
}
this->_skip_list_level = 0;
this->_element_count = 0;
}

说明 🌞 :代码详细解释见注释。

注意 🙋‍♂ :清空除了头节点以外的节点并且头节点 forward 初始化;


3 项目测试

3.1 skiplist.h 测试

针对所有提供的 API 进行测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
#include <iostream>
#include "skiplist.h"
#define FILE_PATH "./store/dumpFile"

int main() {
SkipList<std::string, std::string> skipList(6);
// insert 测试
std::cout << "--- insert 测试 ---" << std::endl;
skipList.insert_element("1", " one");
skipList.insert_element("2", " two");
skipList.insert_element("3", " three");
skipList.insert_element("abc", " 测试1");
skipList.insert_element("中文", " 测试2");

std::cout << "skipList size after insert_element(): " << skipList.size() << std::endl; // 5

// search 测试
std::cout << "--- search 测试 ---" << std::endl;
skipList.search_element("3");
skipList.search_element("中文");
skipList.search_element("4");

// delete 测试
std::cout << "--- delete 测试 ---" << std::endl;
skipList.delete_element("3"); // 成功
skipList.delete_element("5"); // 失败
std::cout << "skipList size after delete_element(): " << skipList.size() << std::endl; // 4

// update 测试
std::cout << "--- update 测试 ---" << std::endl;
skipList.update_element("abc", " update 测试");
skipList.update_element("5", " update_false 测试");
skipList.update_element("5", " update_true 测试", true);
std::cout << "skipList size after update_element(): " << skipList.size() << std::endl; // 5

// dump_file 测试
std::cout << "--- dump_file 测试 ---" << std::endl;
skipList.dump_file();

// display 测试
std::cout << "--- display 测试 ---" << std::endl;
skipList.display_list();

// clear 测试
std::cout << "--- clear 测试 ---" << std::endl;
skipList.clear();
std::cout << "skipList size after clear(): " << skipList.size() << std::endl; // 0

// load_file 测试
std::cout << "--- load_file 测试 ---" << std::endl;
skipList.load_file();
std::cout << "skipList size after load_file(): " << skipList.size() << std::endl; // 4
skipList.display_list();

return 0;
}

测试结果 ⭐️

🔹 插入操作:

🔸 查找操作:

🔹 更新操作:

🔸 删除操作:

🔹 数据落盘操作:

🔸 打印跳表操作:

🔹 清空跳表操作:

🔸 数据加载操作:

🔹 加载后的跳表:

注意 🙋‍♂ :

1️⃣ 数据落盘只是存储了键值对;

2️⃣ 数据加载是按照文件中的键值对重新进行 insert 操作;

3️⃣ 数据加载后的跳表节点数和内容相同,但是层数和之间的关系可能变化。


3.2 增删改查 QPS 测试

对增删改查进行测试。测试方法:设定固定的操作数,设置层高为 18,采用随机插入数据,看各个操作运行时间。

具体代码见:stress-test/stress_test.cpp

运行设备 :阿里云服务器 ECS 共享型 n4 (1核2G, 1 M 带宽)

运行结果

结果统计

具体操作\数据规模10 w50 w100 w
插入0.099330 s0.840242 s2.09992 s
删除0.084905 s0.777721 s1.97519 s
修改0.106549 s0.971254 s2.42625 s
查看0.089328 s0.862964 s2.15155 s

以 100 w 数据结果计算 QPS:

🔹 插入 :47.62 w 🔸 删除 :50. 63 w 🔹 修改 :41.22 w 🔸 查看 :46.48 w

😲 不知为什么,比项目 README 文档中测试的结果要快一倍。



----------- 本文结束 -----------




0%