Skip to content

Commit c19018d

Browse files
committed
Add bgwpool
1 parent 13682d8 commit c19018d

File tree

8 files changed

+150
-112
lines changed

8 files changed

+150
-112
lines changed

contrib/multimaster/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
MODULE_big = multimaster
2-
OBJS = multimaster.o receiver_raw.o decoder_raw.o libdtm.o sockhub/sockhub.o
2+
OBJS = multimaster.o receiver_raw.o decoder_raw.o libdtm.o bytebuf.o bgwpool.o sockhub/sockhub.o
33

44
EXTENSION = multimaster
55
DATA = multimaster--1.0.sql

contrib/multimaster/bgwpool.c

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,13 @@
1+
#include "postgres.h"
2+
#include "fmgr.h"
3+
#include "miscadmin.h"
4+
#include "postmaster/postmaster.h"
5+
#include "postmaster/bgworker.h"
6+
#include "storage/s_lock.h"
7+
#include "storage/spin.h"
8+
#include "storage/pg_sema.h"
9+
#include "storage/shmem.h"
10+
111
#include "bgwpool.h"
212

313
typedef struct
@@ -6,7 +16,7 @@ typedef struct
616
int id;
717
} BgwExecutorCtx;
818

9-
static void BgwMainLoop(Datum arg)
19+
static void BgwPoolMainLoop(Datum arg)
1020
{
1121
BgwExecutorCtx* ctx = (BgwExecutorCtx*)arg;
1222
int id = ctx->id;
@@ -19,34 +29,34 @@ static void BgwMainLoop(Datum arg)
1929
while(true) {
2030
PGSemaphoreLock(&pool->available);
2131
SpinLockAcquire(&pool->lock);
22-
Assert(pool->head != pool->tail);
23-
size = (int*)&pool->queue[pool->head];
24-
void* work = palloc(len);
32+
size = *(int*)&pool->queue[pool->head];
33+
Assert(size < pool->size);
34+
work = palloc(size);
2535
if (pool->head + size + 4 > pool->size) {
2636
memcpy(work, pool->queue, size);
27-
pool->head = (size & 3) & ~3;
37+
pool->head = INTALIGN(size);
2838
} else {
2939
memcpy(work, &pool->queue[pool->head+4], size);
30-
pool->head += 4 + ((size & 3) & ~3);
40+
pool->head += 4 + INTALIGN(size);
3141
}
3242
if (pool->size == pool->head) {
3343
pool->head = 0;
3444
}
3545
if (pool->producerBlocked) {
36-
PGSemaphoreUnlock(&pool->overflow);
3746
pool->producerBlocked = false;
47+
PGSemaphoreUnlock(&pool->overflow);
3848
}
3949
SpinLockRelease(&pool->lock);
4050
pool->executor(id, work, size);
4151
pfree(work);
4252
}
4353
}
4454

45-
BGWPool* BgwPoolCreate(BgwExecutor executor, char const* dbname, size_t queueSize, size_t nWorkers);
55+
BgwPool* BgwPoolCreate(BgwExecutor executor, char const* dbname, size_t queueSize, int nWorkers)
4656
{
4757
int i;
4858
BackgroundWorker worker;
49-
BGWPool* pool = (BGWPool*)ShmemAlloc(queueSize + sizeof(BGWPool));
59+
BgwPool* pool = (BgwPool*)ShmemAlloc(queueSize + sizeof(BgwPool));
5060
pool->executor = executor;
5161
PGSemaphoreCreate(&pool->available);
5262
PGSemaphoreCreate(&pool->overflow);
@@ -76,13 +86,13 @@ BGWPool* BgwPoolCreate(BgwExecutor executor, char const* dbname, size_t queueSiz
7686
return pool;
7787
}
7888

79-
void BgwPoolExecute(BgwPool* pool, void* work, size_t size);
89+
void BgwPoolExecute(BgwPool* pool, void* work, size_t size)
8090
{
8191
Assert(size+4 <= pool->size);
8292

8393
SpinLockAcquire(&pool->lock);
8494
while (true) {
85-
if ((pool->head < pool->tail && pool->size - pool->tail < size + 4 && pool->head < size)
95+
if ((pool->head <= pool->tail && pool->size - pool->tail < size + 4 && pool->head < size)
8696
|| (pool->head > pool->tail && pool->head - pool->tail < size + 4))
8797
{
8898
pool->producerBlocked = true;
@@ -93,13 +103,18 @@ void BgwPoolExecute(BgwPool* pool, void* work, size_t size);
93103
*(int*)&pool->queue[pool->tail] = size;
94104
if (pool->size - pool->tail >= size + 4) {
95105
memcpy(&pool->queue[pool->tail+4], work, size);
96-
pool->tail += 4 + (size+3) & ~3;
106+
pool->tail += 4 + INTALIGN(size);
97107
} else {
98108
memcpy(pool->queue, work, size);
99-
pool->tail = (size+3) & ~3;
109+
pool->tail = INTALIGN(size);
110+
}
111+
if (pool->tail == pool->size) {
112+
pool->tail = 0;
100113
}
101114
PGSemaphoreUnlock(&pool->available);
115+
break;
102116
}
103117
}
118+
SpinLockRelease(&pool->lock);
104119
}
105120

contrib/multimaster/bgwpool.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ typedef struct
1919
char queue[1];
2020
} BgwPool;
2121

22-
BgwPool* BgwPoolCreate(BgwExecutor executor, char const* dbname, size_t queueSize, int nWorkers);
22+
extern BgwPool* BgwPoolCreate(BgwExecutor executor, char const* dbname, size_t queueSize, int nWorkers);
2323

24-
void BgwPoolExecute(BgwPool* pool, void* work, size_t size);
24+
extern void BgwPoolExecute(BgwPool* pool, void* work, size_t size);
2525

2626
#endif

contrib/multimaster/bytebuf.c

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
#include "postgres.h"
2+
#include "bytebuf.h"
3+
4+
#define INIT_BUF_SIZE 1024
5+
6+
void ByteBufferAlloc(ByteBuffer* buf)
7+
{
8+
buf->size = INIT_BUF_SIZE;
9+
buf->data = palloc(buf->size);
10+
buf->used = 0;
11+
}
12+
13+
void ByteBufferAppend(ByteBuffer* buf, void* data, int len)
14+
{
15+
if (buf->used + len > buf->size) {
16+
buf->size = buf->used + len > buf->size*2 ? buf->used + len : buf->size*2;
17+
buf->data = (char*)repalloc(buf->data, buf->size);
18+
}
19+
memcpy(&buf->data[buf->used], data, len);
20+
buf->used += len;
21+
}
22+
23+
void ByteBufferAppendInt32(ByteBuffer* buf, int data)
24+
{
25+
ByteBufferAppend(buf, &data, sizeof data);
26+
}
27+
28+
void ByteBufferFree(ByteBuffer* buf)
29+
{
30+
pfree(buf->data);
31+
}
32+
33+
void ByteBufferReset(ByteBuffer* buf)
34+
{
35+
buf->used = 0;
36+
}

contrib/multimaster/bytebuf.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#ifndef __BYTEBUF_H__
2+
#define __BYTEBUF_H__
3+
4+
typedef struct
5+
{
6+
char* data;
7+
int size;
8+
int used;
9+
} ByteBuffer;
10+
11+
extern void ByteBufferAlloc(ByteBuffer* buf);
12+
extern void ByteBufferAppend(ByteBuffer* buf, void* data, int len);
13+
extern void ByteBufferAppendInt32(ByteBuffer* buf, int data);
14+
extern void ByteBufferFree(ByteBuffer* buf);
15+
extern void ByteBufferReset(ByteBuffer* buf);
16+
17+
#endif

contrib/multimaster/multimaster.c

Lines changed: 49 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include "access/xlog.h"
2929
#include "storage/proc.h"
3030
#include "storage/procarray.h"
31+
#include "executor/spi.h"
3132
#include "executor/executor.h"
3233
#include "access/twophase.h"
3334
#include "utils/guc.h"
@@ -48,6 +49,7 @@
4849

4950
#include "libdtm.h"
5051
#include "multimaster.h"
52+
#include "bgwpool.h"
5153

5254
typedef struct
5355
{
@@ -59,16 +61,9 @@ typedef struct
5961
int nNodes;
6062
pg_atomic_uint32 nReceivers;
6163
bool initialized;
62-
64+
BgwPool* pool;
6365
} DtmState;
6466

65-
typedef struct
66-
{
67-
char* data;
68-
int size;
69-
int used;
70-
} ByteBuffer;
71-
7267
typedef struct
7368
{
7469
TransactionId xid;
@@ -107,12 +102,8 @@ static bool TransactionIdIsInDoubt(TransactionId xid);
107102
static void DtmShmemStartup(void);
108103
static void DtmBackgroundWorker(Datum arg);
109104

110-
static void ByteBufferAlloc(ByteBuffer* buf);
111-
static void ByteBufferAppend(ByteBuffer* buf, void* data, int len);
112-
static void ByteBufferAppendInt32(ByteBuffer* buf, int data);
113-
static void ByteBufferFree(ByteBuffer* buf);
114-
115105
static void MMMarkTransAsLocal(TransactionId xid);
106+
static void MMExecutor(int id, void* work, size_t size);
116107

117108
static shmem_startup_hook_type prev_shmem_startup_hook;
118109

@@ -139,6 +130,7 @@ static TransactionManager DtmTM = {
139130
};
140131

141132
bool MMDoReplication;
133+
char* MMDatabaseName;
142134

143135
static char* MMConnStrs;
144136
static int MMNodeId;
@@ -147,8 +139,8 @@ static int MMQueueSize;
147139
static int MMWorkers;
148140

149141
static char* DtmHost;
150-
static int DtmPort;
151-
static int DtmBufferSize;
142+
static int DtmPort;
143+
static int DtmBufferSize;
152144

153145
static ExecutorFinish_hook_type PreviousExecutorFinishHook = NULL;
154146
static void MMExecutorFinish(QueryDesc *queryDesc);
@@ -1092,33 +1084,6 @@ void DtmBackgroundWorker(Datum arg)
10921084
ShubLoop(&shub);
10931085
}
10941086

1095-
static void ByteBufferAlloc(ByteBuffer* buf)
1096-
{
1097-
buf->size = 1024;
1098-
buf->data = palloc(buf->size);
1099-
buf->used = 0;
1100-
}
1101-
1102-
static void ByteBufferAppend(ByteBuffer* buf, void* data, int len)
1103-
{
1104-
if (buf->used + len > buf->size) {
1105-
buf->size = buf->used + len > buf->size*2 ? buf->used + len : buf->size*2;
1106-
buf->data = (char*)repalloc(buf->data, buf->size);
1107-
}
1108-
memcpy(&buf->data[buf->used], data, len);
1109-
buf->used += len;
1110-
}
1111-
1112-
static void ByteBufferAppendInt32(ByteBuffer* buf, int data)
1113-
{
1114-
ByteBufferAppend(buf, &data, sizeof data);
1115-
}
1116-
1117-
static void ByteBufferFree(ByteBuffer* buf)
1118-
{
1119-
pfree(buf->data);
1120-
}
1121-
11221087
static void DtmSerializeLock(PROCLOCK* proclock, void* arg)
11231088
{
11241089
ByteBuffer* buf = (ByteBuffer*)arg;
@@ -1219,3 +1184,45 @@ MMExecutorFinish(QueryDesc *queryDesc)
12191184
}
12201185
}
12211186

1187+
static void MMExecutor(int id, void* work, size_t size)
1188+
{
1189+
TransactionId xid = *(TransactionId*)work;
1190+
char* stmts = (char*)work + 4;
1191+
int rc = SPI_ERROR_TRANSACTION;
1192+
MMJoinTransaction(xid);
1193+
1194+
SetCurrentStatementStartTimestamp();
1195+
StartTransactionCommand();
1196+
SPI_connect();
1197+
PushActiveSnapshot(GetTransactionSnapshot());
1198+
1199+
PG_TRY();
1200+
{
1201+
rc = SPI_execute(stmts, false, 0);
1202+
SPI_finish();
1203+
PopActiveSnapshot();
1204+
if (rc != SPI_OK_INSERT && rc != SPI_OK_UPDATE && rc != SPI_OK_DELETE) {
1205+
FlushErrorState();
1206+
ereport(LOG, (errmsg("Executor %d: failed to apply transaction %u",
1207+
id, xid)));
1208+
AbortCurrentTransaction();
1209+
} else {
1210+
CommitTransactionCommand();
1211+
}
1212+
}
1213+
PG_CATCH();
1214+
{
1215+
if (rc == SPI_ERROR_TRANSACTION) {
1216+
SPI_finish();
1217+
PopActiveSnapshot();
1218+
}
1219+
AbortCurrentTransaction();
1220+
}
1221+
PG_END_TRY();
1222+
}
1223+
1224+
extern void MMExecute(void* work, int size)
1225+
{
1226+
BgwPoolExecute(dtm->pool, work, size);
1227+
}
1228+

contrib/multimaster/multimaster.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
#ifndef __MULTIMASTER_H__
22
#define __MULTIMASTER_H__
33

4+
#include "bytebuf.h"
5+
46
#define XTM_TRACE(fmt, ...)
57
#define XTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
68
//#define XTM_INFO(fmt, ...)
@@ -10,7 +12,8 @@ extern void MMBeginTransaction(void);
1012
extern void MMJoinTransaction(TransactionId xid);
1113
extern bool MMIsLocalTransaction(TransactionId xid);
1214
extern void MMReceiverStarted(void);
15+
extern void MMExecute(void* work, int size);
1316

14-
extern char const* MMDatabaseName;
17+
extern char* MMDatabaseName;
1518

1619
#endif

0 commit comments

Comments
 (0)