16
16
17
17
#include "raftable.h"
18
18
19
- #define RAFTABLE_SHMEM_SIZE (16 * 1024)
19
+ #define RAFTABLE_KEY_LEN (64)
20
+ #define RAFTABLE_BLOCK_LEN (256)
21
+ #define RAFTABLE_BLOCKS (4096)
22
+ #define RAFTABLE_BLOCK_MEM (RAFTABLE_BLOCK_LEN * (RAFTABLE_BLOCKS - 1) + sizeof(RaftableBlockMem))
20
23
#define RAFTABLE_HASH_SIZE (127)
21
- #define RAFTABLE_VALUE_LEN 64
24
+ #define RAFTABLE_SHMEM_SIZE ((1024 * 1024) + RAFTABLE_BLOCK_MEM)
25
+
26
+ typedef struct RaftableBlock
27
+ {
28
+ struct RaftableBlock * next ;
29
+ char data [RAFTABLE_BLOCK_LEN - sizeof (void * )];
30
+ } RaftableBlock ;
31
+
32
+ typedef struct RaftableKey
33
+ {
34
+ char data [RAFTABLE_KEY_LEN ];
35
+ } RaftableKey ;
36
+
37
+ typedef struct RaftableEntry
38
+ {
39
+ RaftableKey key ;
40
+ int len ;
41
+ RaftableBlock * value ;
42
+ } RaftableEntry ;
43
+
44
+ typedef struct RaftableBlockMem
45
+ {
46
+ RaftableBlock * free_blocks ;
47
+ RaftableBlock blocks [1 ];
48
+ } RaftableBlockMem ;
22
49
23
50
void _PG_init (void );
24
51
void _PG_fini (void );
@@ -29,53 +56,183 @@ PG_FUNCTION_INFO_V1(raftable_sql_get);
29
56
PG_FUNCTION_INFO_V1 (raftable_sql_set );
30
57
PG_FUNCTION_INFO_V1 (raftable_sql_list );
31
58
32
- static HTAB * data ;
33
- static LWLockId datalock ;
59
+ static HTAB * hashtable ;
60
+ static LWLockId hashlock ;
61
+
62
+ static RaftableBlockMem * blockmem ;
63
+ static LWLockId blocklock ;
64
+
34
65
static shmem_startup_hook_type PreviousShmemStartupHook ;
35
66
36
- typedef struct RaftableEntry
67
+ static RaftableBlock * block_alloc ( void )
37
68
{
38
- int key ;
39
- char value [RAFTABLE_VALUE_LEN ];
40
- } RaftableEntry ;
69
+ RaftableBlock * result ;
70
+
71
+ LWLockAcquire (blocklock , LW_EXCLUSIVE );
72
+
73
+ result = blockmem -> free_blocks ;
74
+ if (result == NULL )
75
+ elog (ERROR , "raftable memory limit hit" );
76
+
77
+
78
+ blockmem -> free_blocks = blockmem -> free_blocks -> next ;
79
+ result -> next = NULL ;
80
+ LWLockRelease (blocklock );
81
+ return result ;
82
+ }
83
+
84
+ static void block_free (RaftableBlock * block )
85
+ {
86
+ RaftableBlock * new_free_head = block ;
87
+ Assert (block != NULL );
88
+ while (block -> next != NULL ) {
89
+ block = block -> next ;
90
+ }
91
+ LWLockAcquire (blocklock , LW_EXCLUSIVE );
92
+ block -> next = blockmem -> free_blocks ;
93
+ blockmem -> free_blocks = new_free_head ;
94
+ LWLockRelease (blocklock );
95
+ }
96
+
97
+
98
+ static text * entry_to_text (RaftableEntry * e )
99
+ {
100
+ char * cursor , * buf ;
101
+ RaftableBlock * block ;
102
+ text * t ;
103
+ int len ;
104
+
105
+ buf = palloc (e -> len + 1 );
106
+ cursor = buf ;
107
+
108
+ block = e -> value ;
109
+ len = e -> len ;
110
+ while (block != NULL )
111
+ {
112
+ int tocopy = len ;
113
+ if (tocopy > sizeof (block -> data ))
114
+ tocopy = sizeof (block -> data );
115
+
116
+ memcpy (cursor , block -> data , tocopy );
117
+ cursor += tocopy ;
118
+ len -= tocopy ;
119
+
120
+ Assert (cursor - buf <= e -> len );
121
+ block = block -> next ;
122
+ }
123
+ Assert (cursor - buf == e -> len );
124
+ * cursor = '\0' ;
125
+ t = cstring_to_text_with_len (buf , e -> len );
126
+ pfree (buf );
127
+ return t ;
128
+ }
129
+
130
+ static void text_to_entry (RaftableEntry * e , text * t )
131
+ {
132
+ char * buf , * cursor ;
133
+ int len ;
134
+ RaftableBlock * block ;
135
+
136
+ buf = text_to_cstring (t );
137
+ cursor = buf ;
138
+ len = strlen (buf );
139
+ e -> len = len ;
140
+
141
+ if (e -> len > 0 )
142
+ {
143
+ if (e -> value == NULL )
144
+ e -> value = block_alloc ();
145
+ Assert (e -> value != NULL );
146
+ }
147
+ else
148
+ {
149
+ if (e -> value != NULL )
150
+ {
151
+ block_free (e -> value );
152
+ e -> value = NULL ;
153
+ }
154
+ }
155
+
156
+ block = e -> value ;
157
+ while (len > 0 )
158
+ {
159
+ int tocopy = len ;
160
+ if (tocopy > sizeof (block -> data ))
161
+ {
162
+ tocopy = sizeof (block -> data );
163
+ if (block -> next == NULL )
164
+ block -> next = block_alloc ();
165
+ }
166
+ else
167
+ {
168
+ if (block -> next != NULL )
169
+ {
170
+ block_free (block -> next );
171
+ block -> next = NULL ;
172
+ }
173
+ }
174
+
175
+ memcpy (block -> data , cursor , tocopy );
176
+ cursor += tocopy ;
177
+ len -= tocopy ;
178
+
179
+ block = block -> next ;
180
+ }
181
+
182
+ pfree (buf );
183
+ Assert (block == NULL );
184
+ }
41
185
42
186
Datum
43
187
raftable_sql_get (PG_FUNCTION_ARGS )
44
188
{
45
189
RaftableEntry * entry ;
46
- int key = PG_GETARG_INT32 (0 );
190
+ RaftableKey key ;
191
+ text_to_cstring_buffer (PG_GETARG_TEXT_P (0 ), key .data , sizeof (key .data ));
47
192
48
- LWLockAcquire (datalock , LW_SHARED );
49
- entry = hash_search (data , & key , HASH_FIND , NULL );
193
+ LWLockAcquire (hashlock , LW_SHARED );
194
+ entry = hash_search (hashtable , & key , HASH_FIND , NULL );
50
195
51
196
if (entry )
52
197
{
53
- text * t = cstring_to_text (entry -> value );
54
- LWLockRelease (datalock );
198
+ text * t = entry_to_text (entry );
199
+ LWLockRelease (hashlock );
55
200
PG_RETURN_TEXT_P (t );
56
201
}
57
202
else
58
203
{
59
- LWLockRelease (datalock );
204
+ LWLockRelease (hashlock );
60
205
PG_RETURN_NULL ();
61
206
}
62
207
}
63
208
64
209
Datum
65
210
raftable_sql_set (PG_FUNCTION_ARGS )
66
211
{
67
- int key = PG_GETARG_INT32 (0 );
212
+ RaftableKey key ;
213
+ text_to_cstring_buffer (PG_GETARG_TEXT_P (0 ), key .data , sizeof (key .data ));
68
214
69
- LWLockAcquire (datalock , LW_EXCLUSIVE );
215
+ LWLockAcquire (hashlock , LW_EXCLUSIVE );
70
216
if (PG_ARGISNULL (1 ))
71
- hash_search (data , & key , HASH_REMOVE , NULL );
217
+ {
218
+ RaftableEntry * entry = hash_search (hashtable , key .data , HASH_FIND , NULL );
219
+ if ((entry != NULL ) && (entry -> len > 0 ))
220
+ block_free (entry -> value );
221
+ hash_search (hashtable , key .data , HASH_REMOVE , NULL );
222
+ }
72
223
else
73
224
{
74
- RaftableEntry * entry = hash_search (data , & key , HASH_ENTER , NULL );
75
- entry -> key = key ;
76
- text_to_cstring_buffer (PG_GETARG_TEXT_P (1 ), entry -> value , sizeof (entry -> value ));
225
+ bool found ;
226
+ RaftableEntry * entry = hash_search (hashtable , key .data , HASH_ENTER , & found );
227
+ if (!found )
228
+ {
229
+ entry -> key = key ;
230
+ entry -> value = NULL ;
231
+ entry -> len = 0 ;
232
+ }
233
+ text_to_entry (entry , PG_GETARG_TEXT_P (1 ));
77
234
}
78
- LWLockRelease (datalock );
235
+ LWLockRelease (hashlock );
79
236
80
237
PG_RETURN_VOID ();
81
238
}
@@ -96,12 +253,15 @@ raftable_sql_list(PG_FUNCTION_ARGS)
96
253
oldcontext = MemoryContextSwitchTo (funcctx -> multi_call_memory_ctx );
97
254
98
255
tfc = get_call_result_type (fcinfo , NULL , & funcctx -> tuple_desc );
99
- Assert (tfc == TYPEFUNC_COMPOSITE );
256
+ if (tfc != TYPEFUNC_COMPOSITE )
257
+ {
258
+ elog (ERROR , "raftable listing function should be composite" );
259
+ }
100
260
funcctx -> tuple_desc = BlessTupleDesc (funcctx -> tuple_desc );
101
261
102
262
scan = (HASH_SEQ_STATUS * )palloc (sizeof (HASH_SEQ_STATUS ));
103
- LWLockAcquire (datalock , LW_SHARED );
104
- hash_seq_init (scan , data );
263
+ LWLockAcquire (hashlock , LW_SHARED );
264
+ hash_seq_init (scan , hashtable );
105
265
106
266
MemoryContextSwitchTo (oldcontext );
107
267
@@ -117,52 +277,59 @@ raftable_sql_list(PG_FUNCTION_ARGS)
117
277
Datum vals [2 ];
118
278
bool isnull [2 ];
119
279
120
- vals [0 ] = Int32GetDatum (entry -> key );
121
- vals [1 ] = CStringGetTextDatum ( entry -> value );
280
+ vals [0 ] = CStringGetTextDatum (entry -> key . data );
281
+ vals [1 ] = PointerGetDatum ( entry_to_text ( entry ) );
122
282
isnull [0 ] = isnull [1 ] = false;
123
283
124
284
tuple = heap_form_tuple (funcctx -> tuple_desc , vals , isnull );
125
285
SRF_RETURN_NEXT (funcctx , HeapTupleGetDatum (tuple ));
126
286
}
127
287
else
128
288
{
129
- LWLockRelease (datalock );
289
+ LWLockRelease (hashlock );
130
290
SRF_RETURN_DONE (funcctx );
131
291
}
132
292
133
293
}
134
294
135
- static uint32 raftable_hash_fn (const void * key , Size keysize )
136
- {
137
- return (uint32 )* (int * )key ;
138
- }
139
-
140
- static int raftable_match_fn (const void * key1 , const void * key2 , Size keysize )
141
- {
142
- return * (int * )key1 != * (int * )key2 ;
143
- }
144
-
145
295
static void startup_shmem (void )
146
296
{
147
- HASHCTL info ;
148
-
149
297
if (PreviousShmemStartupHook ){
150
298
PreviousShmemStartupHook ();
151
299
}
152
300
153
- datalock = LWLockAssign ();
301
+ {
302
+ HASHCTL info ;
303
+ hashlock = LWLockAssign ();
154
304
155
- info .keysize = sizeof (int );
156
- info .entrysize = sizeof (RaftableEntry );
157
- info .hash = raftable_hash_fn ;
158
- info .match = raftable_match_fn ;
305
+ info .keysize = sizeof (RaftableKey );
306
+ info .entrysize = sizeof (RaftableEntry );
159
307
160
- data = ShmemInitHash (
161
- "raftable" ,
162
- RAFTABLE_HASH_SIZE , RAFTABLE_HASH_SIZE ,
163
- & info ,
164
- HASH_ELEM | HASH_FUNCTION | HASH_COMPARE
165
- );
308
+ hashtable = ShmemInitHash (
309
+ "raftable_hashtable" ,
310
+ RAFTABLE_HASH_SIZE , RAFTABLE_HASH_SIZE ,
311
+ & info , HASH_ELEM
312
+ );
313
+ }
314
+
315
+ {
316
+ bool found ;
317
+ int i ;
318
+
319
+ blocklock = LWLockAssign ();
320
+
321
+ blockmem = ShmemInitStruct (
322
+ "raftable_blockmem" ,
323
+ RAFTABLE_BLOCK_MEM ,
324
+ & found
325
+ );
326
+
327
+ for (i = 0 ; i < RAFTABLE_BLOCKS - 1 ; i ++ ) {
328
+ blockmem -> blocks [i ].next = blockmem -> blocks + i + 1 ;
329
+ }
330
+ blockmem -> blocks [RAFTABLE_BLOCKS - 1 ].next = NULL ;
331
+ blockmem -> free_blocks = blockmem -> blocks ;
332
+ }
166
333
}
167
334
168
335
void
@@ -171,7 +338,7 @@ _PG_init(void)
171
338
if (!process_shared_preload_libraries_in_progress )
172
339
elog (ERROR , "please add 'raftable' to shared_preload_libraries list" );
173
340
RequestAddinShmemSpace (RAFTABLE_SHMEM_SIZE );
174
- RequestAddinLWLocks (1 );
341
+ RequestAddinLWLocks (2 );
175
342
176
343
PreviousShmemStartupHook = shmem_startup_hook ;
177
344
shmem_startup_hook = startup_shmem ;
0 commit comments