Skip to content

Commit 2159e69

Browse files
committed
Implement a key-value table in shared memory with getter, setter and list methods.
1 parent 8f21156 commit 2159e69

File tree

6 files changed

+269
-0
lines changed

6 files changed

+269
-0
lines changed

contrib/raftable/Makefile

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
MODULES = raftable
2+
EXTENSION = raftable
3+
DATA = raftable--1.0.sql
4+
5+
ifdef USE_PGXS
6+
PG_CONFIG = pg_config
7+
PGXS := $(shell $(PG_CONFIG) --pgxs)
8+
include $(PGXS)
9+
else
10+
subdir = contrib/raftable
11+
top_builddir = ../..
12+
include $(top_builddir)/src/Makefile.global
13+
include $(top_srcdir)/contrib/contrib-global.mk
14+
endif

contrib/raftable/README

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
This extension allows you to have a key-value table replicated between several
2+
Postgres instances over Raft protocol.
3+
4+
C API:
5+
/* Gets value by key. Returns the value or NULL if not found. */
6+
char *raftable_get(int key);
7+
8+
/*
9+
* Adds/updates value by key. Returns when the value gets replicated.
10+
* Storing NULL will delete the item from the table.
11+
*/
12+
void raftable_set(int key, char *value);
13+
14+
/*
15+
* Iterates over all items in the table, calling func(key, value, arg)
16+
* for each of them.
17+
*/
18+
void raftable_every(void (*func)(int, char *, void *), void *arg);
19+
20+
SQL API:
21+
-- set
22+
raftable(key int, value text);
23+
24+
-- get
25+
raftable(key int) returns text;
26+
27+
-- list
28+
raftable() returns table (key int, value text);

contrib/raftable/raftable--1.0.sql

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
2+
\echo Use "CREATE EXTENSION raftable" to load this file. \quit
3+
4+
-- get
5+
CREATE FUNCTION raftable(key int)
6+
RETURNS text
7+
AS 'MODULE_PATHNAME','raftable_sql_get'
8+
LANGUAGE C;
9+
10+
-- set
11+
CREATE FUNCTION raftable(key int, value text)
12+
RETURNS void
13+
AS 'MODULE_PATHNAME','raftable_sql_set'
14+
LANGUAGE C;
15+
16+
-- list
17+
CREATE FUNCTION raftable()
18+
RETURNS table (key int, value text)
19+
AS 'MODULE_PATHNAME','raftable_sql_list'
20+
LANGUAGE C;

contrib/raftable/raftable.c

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
/*
2+
* raftable.c
3+
*
4+
* A key-value table replicated over Raft.
5+
*
6+
*/
7+
8+
#include "postgres.h"
9+
#include "utils/builtins.h"
10+
#include "utils/hsearch.h"
11+
#include "storage/lwlock.h"
12+
#include "storage/ipc.h"
13+
#include "funcapi.h"
14+
#include "access/htup_details.h"
15+
#include "miscadmin.h"
16+
17+
#include "raftable.h"
18+
19+
#define RAFTABLE_SHMEM_SIZE (16 * 1024)
20+
#define RAFTABLE_HASH_SIZE (127)
21+
#define RAFTABLE_VALUE_LEN 64
22+
23+
void _PG_init(void);
24+
void _PG_fini(void);
25+
26+
PG_MODULE_MAGIC;
27+
28+
PG_FUNCTION_INFO_V1(raftable_sql_get);
29+
PG_FUNCTION_INFO_V1(raftable_sql_set);
30+
PG_FUNCTION_INFO_V1(raftable_sql_list);
31+
32+
static HTAB *data;
33+
static LWLockId datalock;
34+
static shmem_startup_hook_type PreviousShmemStartupHook;
35+
36+
typedef struct RaftableEntry
37+
{
38+
int key;
39+
char value[RAFTABLE_VALUE_LEN];
40+
} RaftableEntry;
41+
42+
Datum
43+
raftable_sql_get(PG_FUNCTION_ARGS)
44+
{
45+
RaftableEntry *entry;
46+
int key = PG_GETARG_INT32(0);
47+
48+
LWLockAcquire(datalock, LW_SHARED);
49+
entry = hash_search(data, &key, HASH_FIND, NULL);
50+
51+
if (entry)
52+
{
53+
text *t = cstring_to_text(entry->value);
54+
LWLockRelease(datalock);
55+
PG_RETURN_TEXT_P(t);
56+
}
57+
else
58+
{
59+
LWLockRelease(datalock);
60+
PG_RETURN_NULL();
61+
}
62+
}
63+
64+
Datum
65+
raftable_sql_set(PG_FUNCTION_ARGS)
66+
{
67+
int key = PG_GETARG_INT32(0);
68+
69+
LWLockAcquire(datalock, LW_EXCLUSIVE);
70+
if (PG_ARGISNULL(1))
71+
hash_search(data, &key, HASH_REMOVE, NULL);
72+
else
73+
{
74+
RaftableEntry *entry = hash_search(data, &key, HASH_ENTER, NULL);
75+
entry->key = key;
76+
text_to_cstring_buffer(PG_GETARG_TEXT_P(1), entry->value, sizeof(entry->value));
77+
}
78+
LWLockRelease(datalock);
79+
80+
PG_RETURN_VOID();
81+
}
82+
83+
Datum
84+
raftable_sql_list(PG_FUNCTION_ARGS)
85+
{
86+
FuncCallContext *funcctx;
87+
MemoryContext oldcontext;
88+
HASH_SEQ_STATUS *scan;
89+
RaftableEntry *entry;
90+
91+
if (SRF_IS_FIRSTCALL())
92+
{
93+
TypeFuncClass tfc;
94+
funcctx = SRF_FIRSTCALL_INIT();
95+
96+
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
97+
98+
tfc = get_call_result_type(fcinfo, NULL, &funcctx->tuple_desc);
99+
Assert(tfc == TYPEFUNC_COMPOSITE);
100+
funcctx->tuple_desc = BlessTupleDesc(funcctx->tuple_desc);
101+
102+
scan = (HASH_SEQ_STATUS *)palloc(sizeof(HASH_SEQ_STATUS));
103+
LWLockAcquire(datalock, LW_SHARED);
104+
hash_seq_init(scan, data);
105+
106+
MemoryContextSwitchTo(oldcontext);
107+
108+
funcctx->user_fctx = scan;
109+
}
110+
111+
funcctx = SRF_PERCALL_SETUP();
112+
scan = funcctx->user_fctx;
113+
114+
if ((entry = (RaftableEntry *)hash_seq_search(scan)))
115+
{
116+
HeapTuple tuple;
117+
Datum vals[2];
118+
bool isnull[2];
119+
120+
vals[0] = Int32GetDatum(entry->key);
121+
vals[1] = CStringGetTextDatum(entry->value);
122+
isnull[0] = isnull[1] = false;
123+
124+
tuple = heap_form_tuple(funcctx->tuple_desc, vals, isnull);
125+
SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple));
126+
}
127+
else
128+
{
129+
LWLockRelease(datalock);
130+
SRF_RETURN_DONE(funcctx);
131+
}
132+
133+
}
134+
135+
static uint32 raftable_hash_fn(const void *key, Size keysize)
136+
{
137+
return (uint32)*(int*)key;
138+
}
139+
140+
static int raftable_match_fn(const void *key1, const void *key2, Size keysize)
141+
{
142+
return *(int*)key1 != *(int*)key2;
143+
}
144+
145+
static void startup_shmem(void)
146+
{
147+
HASHCTL info;
148+
149+
if (PreviousShmemStartupHook){
150+
PreviousShmemStartupHook();
151+
}
152+
153+
datalock = LWLockAssign();
154+
155+
info.keysize = sizeof(int);
156+
info.entrysize = sizeof(RaftableEntry);
157+
info.hash = raftable_hash_fn;
158+
info.match = raftable_match_fn;
159+
160+
data = ShmemInitHash(
161+
"raftable",
162+
RAFTABLE_HASH_SIZE, RAFTABLE_HASH_SIZE,
163+
&info,
164+
HASH_ELEM | HASH_FUNCTION | HASH_COMPARE
165+
);
166+
}
167+
168+
void
169+
_PG_init(void)
170+
{
171+
if (!process_shared_preload_libraries_in_progress)
172+
elog(ERROR, "please add 'raftable' to shared_preload_libraries list");
173+
RequestAddinShmemSpace(RAFTABLE_SHMEM_SIZE);
174+
RequestAddinLWLocks(1);
175+
176+
PreviousShmemStartupHook = shmem_startup_hook;
177+
shmem_startup_hook = startup_shmem;
178+
}
179+
180+
void
181+
_PG_fini(void)
182+
{
183+
shmem_startup_hook = PreviousShmemStartupHook;
184+
}

contrib/raftable/raftable.control

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
comment = 'Raftable'
2+
default_version = '1.0'
3+
module_pathname = '$libdir/raftable'
4+
relocatable = true

contrib/raftable/raftable.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
#ifndef __RAFTABLE_H__
2+
#define __RAFTABLE_H__
3+
4+
/* Gets value by key. Returns the value or NULL if not found. */
5+
char *raftable_get(int key);
6+
7+
/*
8+
* Adds/updates value by key. Returns when the value gets replicated.
9+
* Storing NULL will delete the item from the table.
10+
*/
11+
void raftable_set(int key, char *value);
12+
13+
/*
14+
* Iterates over all items in the table, calling func(key, value, arg)
15+
* for each of them.
16+
*/
17+
void raftable_every(void (*func)(int, char *, void *), void *arg);
18+
19+
#endif

0 commit comments

Comments
 (0)