Skip to content

Commit 5c4fd94

Browse files
committed
Add bgwpool
1 parent dce42c7 commit 5c4fd94

File tree

5 files changed

+173
-6
lines changed

5 files changed

+173
-6
lines changed

contrib/multimaster/bgwpool.c

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
#include "bgwpool.h"
2+
3+
typedef struct
4+
{
5+
BgwPool* pool;
6+
int id;
7+
} BgwExecutorCtx;
8+
9+
static void BgwMainLoop(Datum arg)
10+
{
11+
BgwExecutorCtx* ctx = (BgwExecutorCtx*)arg;
12+
int id = ctx->id;
13+
BgwPool* pool = ctx->pool;
14+
int size;
15+
void* work;
16+
17+
BackgroundWorkerInitializeConnection(pool->dbname, NULL);
18+
19+
while(true) {
20+
PGSemaphoreLock(&pool->available);
21+
SpinLockAcquire(&pool->lock);
22+
Assert(pool->head != pool->tail);
23+
size = (int*)&pool->buf[pool->head];
24+
void* work = palloc(len);
25+
if (pool->head + size + 4 > pool->size) {
26+
memcpy(work, pool->buf, size);
27+
pool->head = (size & 3) & ~3;
28+
} else {
29+
memcpy(work, &pool->buf[pool->head+4], size);
30+
pool->head += 4 + ((size & 3) & ~3);
31+
}
32+
if (pool->size == pool->head) {
33+
pool->head = 0;
34+
}
35+
if (pool->producerBlocked) {
36+
PGSemaphoreUnlock(&pool->overflow);
37+
pool->producerBlocked = false;
38+
}
39+
SpinLockRelease(&pool->lock);
40+
pool->executor(id, work, size);
41+
pfree(work);
42+
}
43+
}
44+
45+
BGWPool* BgwPoolCreate(BgwExecutor executor, char const* dbname, size_t bufSize, size_t nWorkers);
46+
{
47+
int i;
48+
BackgroundWorker worker;
49+
BGWPool* pool = (BGWPool*)ShmemAlloc(bufSize + sizeof(BGWPool));
50+
pool->executor = executor;
51+
PGSemaphoreCreate(&pool->available);
52+
PGSemaphoreCreate(&pool->overflow);
53+
PGSemaphoreReset(&pool->available);
54+
PGSemaphoreReset(&pool->overflow);
55+
SpinLockInit(&pool->lock);
56+
pool->producerBlocked = false;
57+
pool->head = 0;
58+
pool->tail = 0;
59+
pool->size = bufSize;
60+
strcpy(pool->dbname, dbname);
61+
62+
MemSet(&worker, 0, sizeof(BackgroundWorker));
63+
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
64+
worker.bgw_start_time = BgWorkerStart_ConsistentState;
65+
worker.bgw_main = BgwPoolMainLoop;
66+
worker.bgw_restart_time = 10; /* Wait 10 seconds for restart before crash */
67+
68+
for (i = 0; i < nWorkers; i++) {
69+
BgwExecutorCtx* ctx = (BgwExecutorCtx*)malloc(sizeof(BgwExecutorCtx));
70+
snprintf(worker.bgw_name, BGW_MAXLEN, "bgw_pool_worker_%d", i+1);
71+
ctx->id = i;
72+
ctx->pool = pool;
73+
worker.bgw_main_arg = (Datum)ctx;
74+
RegisterBackgroundWorker(&worker);
75+
}
76+
return pool;
77+
}
78+
79+
void BgwPoolExecute(BgwPool* pool, void* work, size_t size);
80+
{
81+
Assert(size+4 <= pool->size);
82+
83+
SpinLockAcquire(&pool->lock);
84+
while (true) {
85+
if ((pool->head < pool->tail && pool->size - pool->tail < size + 4 && pool->head < size)
86+
|| (pool->head > pool->tail && pool->head - pool->tail < size + 4))
87+
{
88+
pool->producerBlocked = true;
89+
SpinLockRelease(&pool->lock);
90+
PGSemaphoreLock(&pool->overflow);
91+
SpinLockAcquire(&pool->lock);
92+
} else {
93+
*(int*)&pool->buf[pool->tail] = size;
94+
if (pool->size - pool->tail >= size + 4) {
95+
memcpy(&pool->buf[pool->tail+4], work, size);
96+
pool->tail += 4 + (size+3) & ~3;
97+
} else {
98+
memcpy(pool->buf, work, size);
99+
pool->tail = (size+3) & ~3;
100+
}
101+
PGSemaphoreUnlock(&pool->available);
102+
}
103+
}
104+
}
105+

contrib/multimaster/bgwpool.h

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
#ifndef __BGWPOOL_H__
2+
#define __BGWPOOL_H__
3+
4+
typedef void(*BgwExecutor)(int id, void* work, size_t size);
5+
6+
#define MAX_DBNAME_LEN 30
7+
8+
typedef struct
9+
{
10+
BgwExecutor executor;
11+
volatile slock_t lock;
12+
PGSemaphoreData available;
13+
PGSemaphoreData overflow;
14+
size_t head;
15+
size_t tail;
16+
size_t size;
17+
bool producerBlocked;
18+
char dbname[MAX_DBNAME_LEN];
19+
char queue[1];
20+
} BgwPool;
21+
22+
BgwPool* BgwPoolCreate(BgwExecutor executor, char const* dbname, size_t queueSize, int nWorkers);
23+
24+
void BgwPoolExecute(BgwPool* pool, void* work, size_t size);
25+
26+
#endif

contrib/multimaster/multimaster.c

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ typedef struct
5959
int nNodes;
6060
pg_atomic_uint32 nReceivers;
6161
bool initialized;
62+
6263
} DtmState;
6364

6465
typedef struct
@@ -142,6 +143,8 @@ bool MMDoReplication;
142143
static char* MMConnStrs;
143144
static int MMNodeId;
144145
static int MMNodes;
146+
static int MMQueueSize;
147+
static int MMWorkers;
145148

146149
static char* DtmHost;
147150
static int DtmPort;
@@ -837,6 +840,36 @@ _PG_init(void)
837840
RequestAddinShmemSpace(DTM_SHMEM_SIZE);
838841
RequestAddinLWLocks(2);
839842

843+
DefineCustomIntVariable(
844+
"multimaster.workers",
845+
"Number of multimaster executor workers per node",
846+
NULL,
847+
&MMWorkers,
848+
8,
849+
1,
850+
INT_MAX,
851+
PGC_BACKEND,
852+
0,
853+
NULL,
854+
NULL,
855+
NULL
856+
);
857+
858+
DefineCustomIntVariable(
859+
"multimaster.queue_size",
860+
"Multimaster queue size",
861+
NULL,
862+
&MMQueueSize,
863+
1024*1024,
864+
1024,
865+
INT_MAX,
866+
PGC_BACKEND,
867+
0,
868+
NULL,
869+
NULL,
870+
NULL
871+
);
872+
840873
DefineCustomIntVariable(
841874
"multimaster.local_xid_reserve",
842875
"Number of XIDs reserved by node for local transactions",
@@ -927,6 +960,8 @@ _PG_init(void)
927960
if (MMNodes < 2) {
928961
elog(ERROR, "Multimaster should have at least two nodes");
929962
}
963+
dtm->pool = BgwPoolCreate(MMExecutor, MMDatabaseName, MMQueueSize, MMWorkers);
964+
930965
if (DtmBufferSize != 0)
931966
{
932967
DtmGlobalConfig(NULL, DtmPort, Unix_socket_directories);

contrib/multimaster/multimaster.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,6 @@ extern void MMJoinTransaction(TransactionId xid);
1111
extern bool MMIsLocalTransaction(TransactionId xid);
1212
extern void MMReceiverStarted(void);
1313

14+
extern char const* MMDatabaseName;
15+
1416
#endif

contrib/multimaster/receiver_raw.c

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ static volatile sig_atomic_t got_sigterm = false;
4747
static volatile sig_atomic_t got_sighup = false;
4848

4949
/* GUC variables */
50-
static char *receiver_database;
5150
static int receiver_idle_time = 1;
5251
static bool receiver_sync_mode = true;
5352

@@ -215,7 +214,7 @@ receiver_raw_main(Datum main_arg)
215214
BackgroundWorkerUnblockSignals();
216215

217216
/* Connect to a database */
218-
BackgroundWorkerInitializeConnection(receiver_database, NULL);
217+
BackgroundWorkerInitializeConnection(MMDatabaseName, NULL);
219218

220219
/* Establish connection to remote server */
221220
conn = PQconnectdb(args->receiver_conn_string);
@@ -561,17 +560,17 @@ int MMStartReceivers(char* conns, int node_id)
561560
}
562561
if (++i != node_id) {
563562
ReceiverArgs* ctx = (ReceiverArgs*)malloc(sizeof(ReceiverArgs));
564-
if (receiver_database == NULL) {
563+
if (MMDatabaseName == NULL) {
565564
char* dbname = strstr(conn_str, "dbname=");
566565
char* eon;
567566
int len;
568567
Assert(dbname != NULL);
569568
dbname += 7;
570569
eon = strchr(dbname, ' ');
571570
len = eon - dbname;
572-
receiver_database = (char*)malloc(len + 1);
573-
memcpy(receiver_database, dbname, len);
574-
receiver_database[len] = '\0';
571+
MMDatabaseName = (char*)malloc(len + 1);
572+
memcpy(MMDatabaseName, dbname, len);
573+
MMDatabaseName[len] = '\0';
575574
}
576575
*p = '\0';
577576
ctx->receiver_conn_string = conn_str;

0 commit comments

Comments
 (0)