Skip to content

Commit 3307c9a

Browse files
committed
Switch to varchar keys and arbitrary-length values in raftable.
1 parent 2159e69 commit 3307c9a

File tree

2 files changed

+221
-54
lines changed

2 files changed

+221
-54
lines changed

contrib/raftable/raftable--1.0.sql

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,19 @@
22
\echo Use "CREATE EXTENSION raftable" to load this file. \quit
33

44
-- get
5-
CREATE FUNCTION raftable(key int)
5+
CREATE FUNCTION raftable(key varchar(64))
66
RETURNS text
77
AS 'MODULE_PATHNAME','raftable_sql_get'
88
LANGUAGE C;
99

1010
-- set
11-
CREATE FUNCTION raftable(key int, value text)
11+
CREATE FUNCTION raftable(key varchar(64), value text)
1212
RETURNS void
1313
AS 'MODULE_PATHNAME','raftable_sql_set'
1414
LANGUAGE C;
1515

1616
-- list
1717
CREATE FUNCTION raftable()
18-
RETURNS table (key int, value text)
18+
RETURNS table (key varchar(64), value text)
1919
AS 'MODULE_PATHNAME','raftable_sql_list'
2020
LANGUAGE C;

contrib/raftable/raftable.c

Lines changed: 218 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,36 @@
1616

1717
#include "raftable.h"
1818

19-
#define RAFTABLE_SHMEM_SIZE (16 * 1024)
19+
#define RAFTABLE_KEY_LEN (64)
20+
#define RAFTABLE_BLOCK_LEN (256)
21+
#define RAFTABLE_BLOCKS (4096)
22+
#define RAFTABLE_BLOCK_MEM (RAFTABLE_BLOCK_LEN * (RAFTABLE_BLOCKS - 1) + sizeof(RaftableBlockMem))
2023
#define RAFTABLE_HASH_SIZE (127)
21-
#define RAFTABLE_VALUE_LEN 64
24+
#define RAFTABLE_SHMEM_SIZE ((1024 * 1024) + RAFTABLE_BLOCK_MEM)
25+
26+
typedef struct RaftableBlock
27+
{
28+
struct RaftableBlock *next;
29+
char data[RAFTABLE_BLOCK_LEN - sizeof(void*)];
30+
} RaftableBlock;
31+
32+
typedef struct RaftableKey
33+
{
34+
char data[RAFTABLE_KEY_LEN];
35+
} RaftableKey;
36+
37+
typedef struct RaftableEntry
38+
{
39+
RaftableKey key;
40+
int len;
41+
RaftableBlock *value;
42+
} RaftableEntry;
43+
44+
typedef struct RaftableBlockMem
45+
{
46+
RaftableBlock *free_blocks;
47+
RaftableBlock blocks[1];
48+
} RaftableBlockMem;
2249

2350
void _PG_init(void);
2451
void _PG_fini(void);
@@ -29,53 +56,183 @@ PG_FUNCTION_INFO_V1(raftable_sql_get);
2956
PG_FUNCTION_INFO_V1(raftable_sql_set);
3057
PG_FUNCTION_INFO_V1(raftable_sql_list);
3158

32-
static HTAB *data;
33-
static LWLockId datalock;
59+
static HTAB *hashtable;
60+
static LWLockId hashlock;
61+
62+
static RaftableBlockMem *blockmem;
63+
static LWLockId blocklock;
64+
3465
static shmem_startup_hook_type PreviousShmemStartupHook;
3566

36-
typedef struct RaftableEntry
67+
static RaftableBlock *block_alloc(void)
3768
{
38-
int key;
39-
char value[RAFTABLE_VALUE_LEN];
40-
} RaftableEntry;
69+
RaftableBlock *result;
70+
71+
LWLockAcquire(blocklock, LW_EXCLUSIVE);
72+
73+
result = blockmem->free_blocks;
74+
if (result == NULL)
75+
elog(ERROR, "raftable memory limit hit");
76+
77+
78+
blockmem->free_blocks = blockmem->free_blocks->next;
79+
result->next = NULL;
80+
LWLockRelease(blocklock);
81+
return result;
82+
}
83+
84+
static void block_free(RaftableBlock *block)
85+
{
86+
RaftableBlock *new_free_head = block;
87+
Assert(block != NULL);
88+
while (block->next != NULL) {
89+
block = block->next;
90+
}
91+
LWLockAcquire(blocklock, LW_EXCLUSIVE);
92+
block->next = blockmem->free_blocks;
93+
blockmem->free_blocks = new_free_head;
94+
LWLockRelease(blocklock);
95+
}
96+
97+
98+
static text *entry_to_text(RaftableEntry *e)
99+
{
100+
char *cursor, *buf;
101+
RaftableBlock *block;
102+
text *t;
103+
int len;
104+
105+
buf = palloc(e->len + 1);
106+
cursor = buf;
107+
108+
block = e->value;
109+
len = e->len;
110+
while (block != NULL)
111+
{
112+
int tocopy = len;
113+
if (tocopy > sizeof(block->data))
114+
tocopy = sizeof(block->data);
115+
116+
memcpy(cursor, block->data, tocopy);
117+
cursor += tocopy;
118+
len -= tocopy;
119+
120+
Assert(cursor - buf <= e->len);
121+
block = block->next;
122+
}
123+
Assert(cursor - buf == e->len);
124+
*cursor = '\0';
125+
t = cstring_to_text_with_len(buf, e->len);
126+
pfree(buf);
127+
return t;
128+
}
129+
130+
static void text_to_entry(RaftableEntry *e, text *t)
131+
{
132+
char *buf, *cursor;
133+
int len;
134+
RaftableBlock *block;
135+
136+
buf = text_to_cstring(t);
137+
cursor = buf;
138+
len = strlen(buf);
139+
e->len = len;
140+
141+
if (e->len > 0)
142+
{
143+
if (e->value == NULL)
144+
e->value = block_alloc();
145+
Assert(e->value != NULL);
146+
}
147+
else
148+
{
149+
if (e->value != NULL)
150+
{
151+
block_free(e->value);
152+
e->value = NULL;
153+
}
154+
}
155+
156+
block = e->value;
157+
while (len > 0)
158+
{
159+
int tocopy = len;
160+
if (tocopy > sizeof(block->data))
161+
{
162+
tocopy = sizeof(block->data);
163+
if (block->next == NULL)
164+
block->next = block_alloc();
165+
}
166+
else
167+
{
168+
if (block->next != NULL)
169+
{
170+
block_free(block->next);
171+
block->next = NULL;
172+
}
173+
}
174+
175+
memcpy(block->data, cursor, tocopy);
176+
cursor += tocopy;
177+
len -= tocopy;
178+
179+
block = block->next;
180+
}
181+
182+
pfree(buf);
183+
Assert(block == NULL);
184+
}
41185

42186
Datum
43187
raftable_sql_get(PG_FUNCTION_ARGS)
44188
{
45189
RaftableEntry *entry;
46-
int key = PG_GETARG_INT32(0);
190+
RaftableKey key;
191+
text_to_cstring_buffer(PG_GETARG_TEXT_P(0), key.data, sizeof(key.data));
47192

48-
LWLockAcquire(datalock, LW_SHARED);
49-
entry = hash_search(data, &key, HASH_FIND, NULL);
193+
LWLockAcquire(hashlock, LW_SHARED);
194+
entry = hash_search(hashtable, &key, HASH_FIND, NULL);
50195

51196
if (entry)
52197
{
53-
text *t = cstring_to_text(entry->value);
54-
LWLockRelease(datalock);
198+
text *t = entry_to_text(entry);
199+
LWLockRelease(hashlock);
55200
PG_RETURN_TEXT_P(t);
56201
}
57202
else
58203
{
59-
LWLockRelease(datalock);
204+
LWLockRelease(hashlock);
60205
PG_RETURN_NULL();
61206
}
62207
}
63208

64209
Datum
65210
raftable_sql_set(PG_FUNCTION_ARGS)
66211
{
67-
int key = PG_GETARG_INT32(0);
212+
RaftableKey key;
213+
text_to_cstring_buffer(PG_GETARG_TEXT_P(0), key.data, sizeof(key.data));
68214

69-
LWLockAcquire(datalock, LW_EXCLUSIVE);
215+
LWLockAcquire(hashlock, LW_EXCLUSIVE);
70216
if (PG_ARGISNULL(1))
71-
hash_search(data, &key, HASH_REMOVE, NULL);
217+
{
218+
RaftableEntry *entry = hash_search(hashtable, key.data, HASH_FIND, NULL);
219+
if ((entry != NULL) && (entry->len > 0))
220+
block_free(entry->value);
221+
hash_search(hashtable, key.data, HASH_REMOVE, NULL);
222+
}
72223
else
73224
{
74-
RaftableEntry *entry = hash_search(data, &key, HASH_ENTER, NULL);
75-
entry->key = key;
76-
text_to_cstring_buffer(PG_GETARG_TEXT_P(1), entry->value, sizeof(entry->value));
225+
bool found;
226+
RaftableEntry *entry = hash_search(hashtable, key.data, HASH_ENTER, &found);
227+
if (!found)
228+
{
229+
entry->key = key;
230+
entry->value = NULL;
231+
entry->len = 0;
232+
}
233+
text_to_entry(entry, PG_GETARG_TEXT_P(1));
77234
}
78-
LWLockRelease(datalock);
235+
LWLockRelease(hashlock);
79236

80237
PG_RETURN_VOID();
81238
}
@@ -96,12 +253,15 @@ raftable_sql_list(PG_FUNCTION_ARGS)
96253
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
97254

98255
tfc = get_call_result_type(fcinfo, NULL, &funcctx->tuple_desc);
99-
Assert(tfc == TYPEFUNC_COMPOSITE);
256+
if (tfc != TYPEFUNC_COMPOSITE)
257+
{
258+
elog(ERROR, "raftable listing function should be composite");
259+
}
100260
funcctx->tuple_desc = BlessTupleDesc(funcctx->tuple_desc);
101261

102262
scan = (HASH_SEQ_STATUS *)palloc(sizeof(HASH_SEQ_STATUS));
103-
LWLockAcquire(datalock, LW_SHARED);
104-
hash_seq_init(scan, data);
263+
LWLockAcquire(hashlock, LW_SHARED);
264+
hash_seq_init(scan, hashtable);
105265

106266
MemoryContextSwitchTo(oldcontext);
107267

@@ -117,52 +277,59 @@ raftable_sql_list(PG_FUNCTION_ARGS)
117277
Datum vals[2];
118278
bool isnull[2];
119279

120-
vals[0] = Int32GetDatum(entry->key);
121-
vals[1] = CStringGetTextDatum(entry->value);
280+
vals[0] = CStringGetTextDatum(entry->key.data);
281+
vals[1] = PointerGetDatum(entry_to_text(entry));
122282
isnull[0] = isnull[1] = false;
123283

124284
tuple = heap_form_tuple(funcctx->tuple_desc, vals, isnull);
125285
SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple));
126286
}
127287
else
128288
{
129-
LWLockRelease(datalock);
289+
LWLockRelease(hashlock);
130290
SRF_RETURN_DONE(funcctx);
131291
}
132292

133293
}
134294

135-
static uint32 raftable_hash_fn(const void *key, Size keysize)
136-
{
137-
return (uint32)*(int*)key;
138-
}
139-
140-
static int raftable_match_fn(const void *key1, const void *key2, Size keysize)
141-
{
142-
return *(int*)key1 != *(int*)key2;
143-
}
144-
145295
static void startup_shmem(void)
146296
{
147-
HASHCTL info;
148-
149297
if (PreviousShmemStartupHook){
150298
PreviousShmemStartupHook();
151299
}
152300

153-
datalock = LWLockAssign();
301+
{
302+
HASHCTL info;
303+
hashlock = LWLockAssign();
154304

155-
info.keysize = sizeof(int);
156-
info.entrysize = sizeof(RaftableEntry);
157-
info.hash = raftable_hash_fn;
158-
info.match = raftable_match_fn;
305+
info.keysize = sizeof(RaftableKey);
306+
info.entrysize = sizeof(RaftableEntry);
159307

160-
data = ShmemInitHash(
161-
"raftable",
162-
RAFTABLE_HASH_SIZE, RAFTABLE_HASH_SIZE,
163-
&info,
164-
HASH_ELEM | HASH_FUNCTION | HASH_COMPARE
165-
);
308+
hashtable = ShmemInitHash(
309+
"raftable_hashtable",
310+
RAFTABLE_HASH_SIZE, RAFTABLE_HASH_SIZE,
311+
&info, HASH_ELEM
312+
);
313+
}
314+
315+
{
316+
bool found;
317+
int i;
318+
319+
blocklock = LWLockAssign();
320+
321+
blockmem = ShmemInitStruct(
322+
"raftable_blockmem",
323+
RAFTABLE_BLOCK_MEM,
324+
&found
325+
);
326+
327+
for (i = 0; i < RAFTABLE_BLOCKS - 1; i++) {
328+
blockmem->blocks[i].next = blockmem->blocks + i + 1;
329+
}
330+
blockmem->blocks[RAFTABLE_BLOCKS - 1].next = NULL;
331+
blockmem->free_blocks = blockmem->blocks;
332+
}
166333
}
167334

168335
void
@@ -171,7 +338,7 @@ _PG_init(void)
171338
if (!process_shared_preload_libraries_in_progress)
172339
elog(ERROR, "please add 'raftable' to shared_preload_libraries list");
173340
RequestAddinShmemSpace(RAFTABLE_SHMEM_SIZE);
174-
RequestAddinLWLocks(1);
341+
RequestAddinLWLocks(2);
175342

176343
PreviousShmemStartupHook = shmem_startup_hook;
177344
shmem_startup_hook = startup_shmem;

0 commit comments

Comments
 (0)