3
3
//
4
4
5
5
#include " write_batch_demo.h"
6
+ #include " write_batch_internal_demo.h"
6
7
7
- using namespace WRITE_BATCH_DEMO ;
8
+ namespace WRITE_BATCH_DEMO {
8
9
9
- WriteBatch::WriteBatch () {
10
- Clear ();
11
- }
10
+ // 最高位为1一直到非1停止就是长度的编码
11
+ // 能这样编码是因为Key值都是字符串,能保证不会大于128
12
+ // 这里将低位放到前面是有特殊用以的,当长度大于128时,我们只需要按照最高位是否是128进行查找
13
+ // 当找到最高位为非128时,说明找到了所有的长度字段,这时包含最后一个非128开头的字符在内所有的字符就是这次的长度字段
14
+ // 这种歌编码方式用在网络发送,数据序列化中非常有用
15
+ // 可以说是序列化数组中同时存储长度和其他字符的完美解决方案
16
+ char *EncodeVarint32 (char *dst, uint32_t v) {
17
+ // Operate on characters as unsigneds
18
+ uint8_t *ptr = reinterpret_cast <uint8_t *>(dst);
19
+ static const int B = 128 ;
20
+ if (v < (1 << 7 )) {
21
+ *(ptr++) = v;
22
+ } else if (v < (1 << 14 )) {
23
+ *(ptr++) = v | B;
24
+ *(ptr++) = v >> 7 ;
25
+ } else if (v < (1 << 21 )) {
26
+ *(ptr++) = v | B;
27
+ *(ptr++) = (v >> 7 ) | B;
28
+ *(ptr++) = v >> 14 ;
29
+ } else if (v < (1 << 28 )) {
30
+ *(ptr++) = v | B;
31
+ *(ptr++) = (v >> 7 ) | B;
32
+ *(ptr++) = (v >> 14 ) | B;
33
+ *(ptr++) = v >> 21 ;
34
+ } else {
35
+ *(ptr++) = v | B;
36
+ *(ptr++) = (v >> 7 ) | B;
37
+ *(ptr++) = (v >> 14 ) | B;
38
+ *(ptr++) = (v >> 21 ) | B;
39
+ *(ptr++) = v >> 28 ;
40
+ }
41
+ return reinterpret_cast <char *>(ptr);
42
+ }
12
43
13
- WriteBatch::~WriteBatch () = default ;
44
+ void PutVarint32 (std::string *dst, uint32_t v) {
45
+ char buf[5 ];
46
+ char *ptr = EncodeVarint32 (buf, v);
47
+ dst->append (buf, ptr - buf);
48
+ }
14
49
15
- void WriteBatch::Put (const leveldb::Slice &key, const leveldb::Slice &value) {
50
+ void PutLengthPrefixedSlice (std::string *dst, const leveldb::Slice &value) {
51
+ PutVarint32 (dst, value.size ());
52
+ dst->append (value.data (), value.size ());
53
+ }
16
54
17
- }
55
+ // 解码
56
+ const char *GetVarint32PtrFallback (const char *p, const char *limit,
57
+ uint32_t *value) {
58
+ uint32_t result = 0 ;
59
+ for (uint32_t shift = 0 ; shift <= 28 && p < limit; shift += 7 ) {
60
+ uint32_t byte = *(reinterpret_cast <const uint8_t *>(p));
61
+ p++;
62
+ if (byte & 128 ) {
63
+ // More bytes are present
64
+ result |= ((byte & 127 ) << shift);
65
+ } else {
66
+ result |= (byte << shift);
67
+ *value = result;
68
+ return reinterpret_cast <const char *>(p);
69
+ }
70
+ }
71
+ return nullptr ;
72
+ }
18
73
19
- void WriteBatch::Delete (const leveldb::Slice &key) {
74
+ inline const char *GetVarint32Ptr (const char *p, const char *limit,
75
+ uint32_t *value) {
76
+ if (p < limit) {
77
+ uint32_t result = *(reinterpret_cast <const uint8_t *>(p));
78
+ if ((result & 128 ) == 0 ) {
79
+ *value = result;
80
+ return p + 1 ;
81
+ }
82
+ }
83
+ return GetVarint32PtrFallback (p, limit, value);
84
+ }
20
85
21
- }
86
+ bool GetVarint32 (leveldb::Slice *input, uint32_t *value) {
87
+ const char *p = input->data ();
88
+ const char *limit = p + input->size ();
89
+ const char *q = GetVarint32Ptr (p, limit, value);
90
+ if (q == nullptr ) {
91
+ return false ;
92
+ } else {
93
+ *input = leveldb::Slice (q, limit - q);
94
+ return true ;
95
+ }
96
+ }
22
97
23
- void WriteBatch::Clear () {
24
- // 清空对象
25
- rep_.clear ();
26
- // 预先设置为指定的头长度
27
- rep_.resize (kHeader );
28
- }
98
+ bool GetLengthPrefixedSlice (leveldb::Slice *input, leveldb::Slice *result) {
99
+ uint32_t len;
100
+ if (WRITE_BATCH_DEMO::GetVarint32 (input, &len) && input->size () >= len) {
101
+ *result = leveldb::Slice (input->data (), len);
102
+ input->remove_prefix (len);
103
+ return true ;
104
+ } else {
105
+ return false ;
106
+ }
107
+ }
29
108
30
- size_t WriteBatch::ApproximateSize () const {
31
- return rep_.size ();
32
- }
33
109
34
- void WriteBatch::Append (const WriteBatch &source) {
110
+ WriteBatch::WriteBatch () {
111
+ // Clear();
112
+ }
35
113
36
- }
114
+ WriteBatch::~WriteBatch () = default ;
37
115
38
- leveldb::Status WriteBatch::Iterate (WriteBatch::Handler *handler) const {
39
- return leveldb::Status ();
40
- }
116
+ void WriteBatch::Put (const leveldb::Slice &key, const leveldb::Slice &value) {
117
+ // 每次put, batch的计数加1
118
+ WriteBatchInternal::SetCount (this , WriteBatchInternal::Count (this ) + 1 );
119
+ // kTypeValue 是put Deletion 是删除
120
+ rep_.push_back (static_cast <char >(leveldb::kTypeValue ));
121
+ // 找到所有
122
+ WRITE_BATCH_DEMO::PutLengthPrefixedSlice (&rep_, key);
123
+ WRITE_BATCH_DEMO::PutLengthPrefixedSlice (&rep_, value);
124
+ }
125
+
126
+ void WriteBatch::Delete (const leveldb::Slice &key) {
127
+ WriteBatchInternal::SetCount (this , WriteBatchInternal::Count (this ) + 1 );
128
+ rep_.push_back (static_cast <char >(leveldb::kTypeDeletion ));
129
+ WRITE_BATCH_DEMO::PutLengthPrefixedSlice (&rep_, key);
130
+ }
131
+
132
+ void WriteBatch::Clear () {
133
+ // 清空对象
134
+ rep_.clear ();
135
+ // 预先设置为指定的头长度
136
+ rep_.resize (kHeader );
137
+ }
138
+
139
+ size_t WriteBatch::ApproximateSize () const {
140
+ return rep_.size ();
141
+ }
142
+
143
+ void WriteBatch::Append (const WriteBatch &source) {
144
+ WriteBatchInternal::Append (this , &source);
145
+ }
146
+
147
+ // 通过,Handler将具体业务和实现分离开来,这样WriteBatch就不需要关心具体业务的东西
148
+ // 只需要处理序列化的Put和Get即可,具体的功能交给Handler实现
149
+ leveldb::Status WriteBatch::Iterate (Handler *handler) const {
150
+ // 1. 将rep_ 转化为 Slice切片以备后面使用
151
+ leveldb::Slice input (rep_);
152
+
153
+ if (input.size () < kHeader ) {
154
+ return leveldb::Status::Corruption (" malformed WriteBatch (too small)" );
155
+ }
156
+
157
+ // 2. 剔除序列化相关的无用数据
158
+ leveldb::Slice key, value;
159
+ int32_t found = 0 ;
160
+
161
+ // 3. 循环处理所有rep_中的事务(key value值)
162
+ while (!input.empty ()) {
163
+ // 执行次数增加
164
+ found ++;
165
+ // 取出tag,用于判断接下来需要处理数据的类型
166
+ char tag = input[0 ];
167
+ input.remove_prefix (1 );
168
+
169
+ switch (tag) {
170
+ case leveldb::kTypeValue :
171
+ if (WRITE_BATCH_DEMO::GetLengthPrefixedSlice (&input, &key) &&
172
+ WRITE_BATCH_DEMO::GetLengthPrefixedSlice (&input, &value)) {
173
+ handler->Put (key, value);
174
+ } else {
175
+ return leveldb::Status::Corruption (" Bad WriteBatch Put operation" );
176
+ }
177
+ break ;
178
+ case leveldb::kTypeDeletion :
179
+ if (WRITE_BATCH_DEMO::GetLengthPrefixedSlice (&input, &key)) {
180
+ handler->Delete (key);
181
+ } else {
182
+ return leveldb::Status::Corruption (" bad WriteBatch Delete operation" );
183
+ }
184
+ break ;
185
+ default :
186
+ return leveldb::Status::Corruption (" unknown WriteBatch tag" );
187
+
188
+ }
189
+ }
190
+
191
+ if (found != WriteBatchInternal::Count (this )) {
192
+ return leveldb::Status::Corruption (" WriteBatch has wrong count" );
193
+ } else {
194
+ return leveldb::Status::OK ();
195
+ }
196
+ }
197
+
198
+ }
0 commit comments