Skip to content

Commit ae7c2a5

Browse files
添加batch实现
1 parent 918d7ad commit ae7c2a5

File tree

4 files changed

+220
-25
lines changed

4 files changed

+220
-25
lines changed

demo/batch_test/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ cmake_minimum_required(VERSION 3.9)
77
add_executable(write_batch
88
write_batch_main.cpp
99
write_batch_demo.cpp
10-
write_batch_internal_demo.cpp)
10+
write_batch_internal_demo.cpp
11+
../../util/status.cc)
1112

1213
#target_link_libraries(test
1314
# PUBLIC leveldb)

demo/batch_test/write_batch_demo.cpp

Lines changed: 181 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,38 +3,196 @@
33
//
44

55
#include "write_batch_demo.h"
6+
#include "write_batch_internal_demo.h"
67

7-
using namespace WRITE_BATCH_DEMO;
8+
namespace WRITE_BATCH_DEMO {
89

9-
WriteBatch::WriteBatch() {
10-
Clear();
11-
}
10+
// 最高位为1一直到非1停止就是长度的编码
11+
// 能这样编码是因为Key值都是字符串,能保证不会大于128
12+
// 这里将低位放到前面是有特殊用以的,当长度大于128时,我们只需要按照最高位是否是128进行查找
13+
// 当找到最高位为非128时,说明找到了所有的长度字段,这时包含最后一个非128开头的字符在内所有的字符就是这次的长度字段
14+
// 这种歌编码方式用在网络发送,数据序列化中非常有用
15+
// 可以说是序列化数组中同时存储长度和其他字符的完美解决方案
16+
char *EncodeVarint32(char *dst, uint32_t v) {
17+
// Operate on characters as unsigneds
18+
uint8_t *ptr = reinterpret_cast<uint8_t *>(dst);
19+
static const int B = 128;
20+
if (v < (1 << 7)) {
21+
*(ptr++) = v;
22+
} else if (v < (1 << 14)) {
23+
*(ptr++) = v | B;
24+
*(ptr++) = v >> 7;
25+
} else if (v < (1 << 21)) {
26+
*(ptr++) = v | B;
27+
*(ptr++) = (v >> 7) | B;
28+
*(ptr++) = v >> 14;
29+
} else if (v < (1 << 28)) {
30+
*(ptr++) = v | B;
31+
*(ptr++) = (v >> 7) | B;
32+
*(ptr++) = (v >> 14) | B;
33+
*(ptr++) = v >> 21;
34+
} else {
35+
*(ptr++) = v | B;
36+
*(ptr++) = (v >> 7) | B;
37+
*(ptr++) = (v >> 14) | B;
38+
*(ptr++) = (v >> 21) | B;
39+
*(ptr++) = v >> 28;
40+
}
41+
return reinterpret_cast<char *>(ptr);
42+
}
1243

13-
WriteBatch::~WriteBatch() = default;
44+
void PutVarint32(std::string *dst, uint32_t v) {
45+
char buf[5];
46+
char *ptr = EncodeVarint32(buf, v);
47+
dst->append(buf, ptr - buf);
48+
}
1449

15-
void WriteBatch::Put(const leveldb::Slice &key, const leveldb::Slice &value) {
50+
void PutLengthPrefixedSlice(std::string *dst, const leveldb::Slice &value) {
51+
PutVarint32(dst, value.size());
52+
dst->append(value.data(), value.size());
53+
}
1654

17-
}
55+
// 解码
56+
const char *GetVarint32PtrFallback(const char *p, const char *limit,
57+
uint32_t *value) {
58+
uint32_t result = 0;
59+
for (uint32_t shift = 0; shift <= 28 && p < limit; shift += 7) {
60+
uint32_t byte = *(reinterpret_cast<const uint8_t *>(p));
61+
p++;
62+
if (byte & 128) {
63+
// More bytes are present
64+
result |= ((byte & 127) << shift);
65+
} else {
66+
result |= (byte << shift);
67+
*value = result;
68+
return reinterpret_cast<const char *>(p);
69+
}
70+
}
71+
return nullptr;
72+
}
1873

19-
void WriteBatch::Delete(const leveldb::Slice &key) {
74+
inline const char *GetVarint32Ptr(const char *p, const char *limit,
75+
uint32_t *value) {
76+
if (p < limit) {
77+
uint32_t result = *(reinterpret_cast<const uint8_t *>(p));
78+
if ((result & 128) == 0) {
79+
*value = result;
80+
return p + 1;
81+
}
82+
}
83+
return GetVarint32PtrFallback(p, limit, value);
84+
}
2085

21-
}
86+
bool GetVarint32(leveldb::Slice *input, uint32_t *value) {
87+
const char *p = input->data();
88+
const char *limit = p + input->size();
89+
const char *q = GetVarint32Ptr(p, limit, value);
90+
if (q == nullptr) {
91+
return false;
92+
} else {
93+
*input = leveldb::Slice(q, limit - q);
94+
return true;
95+
}
96+
}
2297

23-
void WriteBatch::Clear() {
24-
// 清空对象
25-
rep_.clear();
26-
// 预先设置为指定的头长度
27-
rep_.resize(kHeader);
28-
}
98+
bool GetLengthPrefixedSlice(leveldb::Slice *input, leveldb::Slice *result) {
99+
uint32_t len;
100+
if (WRITE_BATCH_DEMO::GetVarint32(input, &len) && input->size() >= len) {
101+
*result = leveldb::Slice(input->data(), len);
102+
input->remove_prefix(len);
103+
return true;
104+
} else {
105+
return false;
106+
}
107+
}
29108

30-
size_t WriteBatch::ApproximateSize() const {
31-
return rep_.size();
32-
}
33109

34-
void WriteBatch::Append(const WriteBatch &source) {
110+
WriteBatch::WriteBatch() {
111+
//Clear();
112+
}
35113

36-
}
114+
WriteBatch::~WriteBatch() = default;
37115

38-
leveldb::Status WriteBatch::Iterate(WriteBatch::Handler *handler) const {
39-
return leveldb::Status();
40-
}
116+
void WriteBatch::Put(const leveldb::Slice &key, const leveldb::Slice &value) {
117+
// 每次put, batch的计数加1
118+
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
119+
// kTypeValue 是put Deletion 是删除
120+
rep_.push_back(static_cast<char>(leveldb::kTypeValue));
121+
// 找到所有
122+
WRITE_BATCH_DEMO::PutLengthPrefixedSlice(&rep_, key);
123+
WRITE_BATCH_DEMO::PutLengthPrefixedSlice(&rep_, value);
124+
}
125+
126+
void WriteBatch::Delete(const leveldb::Slice &key) {
127+
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
128+
rep_.push_back(static_cast<char>(leveldb::kTypeDeletion));
129+
WRITE_BATCH_DEMO::PutLengthPrefixedSlice(&rep_, key);
130+
}
131+
132+
void WriteBatch::Clear() {
133+
// 清空对象
134+
rep_.clear();
135+
// 预先设置为指定的头长度
136+
rep_.resize(kHeader);
137+
}
138+
139+
size_t WriteBatch::ApproximateSize() const {
140+
return rep_.size();
141+
}
142+
143+
void WriteBatch::Append(const WriteBatch &source) {
144+
WriteBatchInternal::Append(this, &source);
145+
}
146+
147+
// 通过,Handler将具体业务和实现分离开来,这样WriteBatch就不需要关心具体业务的东西
148+
// 只需要处理序列化的Put和Get即可,具体的功能交给Handler实现
149+
leveldb::Status WriteBatch::Iterate(Handler *handler) const {
150+
// 1. 将rep_ 转化为 Slice切片以备后面使用
151+
leveldb::Slice input(rep_);
152+
153+
if (input.size() < kHeader) {
154+
return leveldb::Status::Corruption("malformed WriteBatch (too small)");
155+
}
156+
157+
// 2. 剔除序列化相关的无用数据
158+
leveldb::Slice key, value;
159+
int32_t found = 0;
160+
161+
// 3. 循环处理所有rep_中的事务(key value值)
162+
while (!input.empty()) {
163+
// 执行次数增加
164+
found ++;
165+
// 取出tag,用于判断接下来需要处理数据的类型
166+
char tag = input[0];
167+
input.remove_prefix(1);
168+
169+
switch (tag) {
170+
case leveldb::kTypeValue:
171+
if (WRITE_BATCH_DEMO::GetLengthPrefixedSlice(&input, &key) &&
172+
WRITE_BATCH_DEMO::GetLengthPrefixedSlice(&input, &value)) {
173+
handler->Put(key, value);
174+
} else {
175+
return leveldb::Status::Corruption("Bad WriteBatch Put operation");
176+
}
177+
break;
178+
case leveldb::kTypeDeletion:
179+
if (WRITE_BATCH_DEMO::GetLengthPrefixedSlice(&input, &key)) {
180+
handler->Delete(key);
181+
} else {
182+
return leveldb::Status::Corruption("bad WriteBatch Delete operation");
183+
}
184+
break;
185+
default:
186+
return leveldb::Status::Corruption("unknown WriteBatch tag");
187+
188+
}
189+
}
190+
191+
if (found != WriteBatchInternal::Count(this)) {
192+
return leveldb::Status::Corruption("WriteBatch has wrong count");
193+
} else {
194+
return leveldb::Status::OK();
195+
}
196+
}
197+
198+
}

demo/batch_test/write_batch_demo.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ namespace WRITE_BATCH_DEMO {
2424
public:
2525
class LEVELDB_EXPORT Handler {
2626
public:
27-
virtual ~Handler();
27+
virtual ~Handler() = default;
2828

2929
virtual void Put(const leveldb::Slice &key, const leveldb::Slice &value) = 0;
3030

demo/batch_test/write_batch_main.cpp

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,46 @@
55

66
#include "write_batch_demo.h"
77

8+
using namespace WRITE_BATCH_DEMO;
9+
10+
11+
class HandlerDemo : public WRITE_BATCH_DEMO::WriteBatch::Handler {
12+
public:
13+
~HandlerDemo() override = default;
14+
15+
void Put(const leveldb::Slice &key, const leveldb::Slice &value) override {
16+
std::cout << "put : " << key.ToString() << " : " << value.ToString() << std::endl;
17+
}
18+
19+
void Delete(const leveldb::Slice &key) override {
20+
std::cout << "put : " << key.ToString() << std::endl;
21+
22+
}
23+
};
24+
825

926
int main(int argc, char **argv) {
1027

1128

29+
int x = 12;
30+
31+
WRITE_BATCH_DEMO::WriteBatch batch;
32+
HandlerDemo handler;
33+
34+
//
35+
auto key = leveldb::Slice("xiaoming");
36+
auto value = leveldb::Slice("21");
37+
batch.Put(key, value);
38+
//batch.Put(leveldb::Slice("xiaohong"), leveldb::Slice("21"));
39+
//batch.Put(leveldb::Slice("wanger"), leveldb::Slice("21"));
40+
//batch.Put(leveldb::Slice("daxiong"), leveldb::Slice("21"));
41+
//
42+
//batch.Delete("xiaoming");
43+
//
44+
//batch.Iterate(&handler);
45+
46+
47+
1248

1349
return 0;
1450
}

0 commit comments

Comments
 (0)