Skip to content

Commit 151af9e

Browse files
committed
Fix bug in deadlock detection algorithm
1 parent 480cda0 commit 151af9e

File tree

5 files changed

+44
-24
lines changed

5 files changed

+44
-24
lines changed

contrib/multimaster/bgwpool.c

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,23 @@
1212

1313
typedef struct
1414
{
15-
BgwPool* pool;
15+
BgwPoolConstructor constructor;
1616
int id;
17-
} BgwExecutorCtx;
17+
} BgwPoolExecutorCtx;
1818

1919
static void BgwPoolMainLoop(Datum arg)
2020
{
21-
BgwExecutorCtx* ctx = (BgwExecutorCtx*)arg;
21+
BgwPoolExecutorCtx* ctx = (BgwPoolExecutorCtx*)arg;
2222
int id = ctx->id;
23-
BgwPool* pool = ctx->pool;
23+
BgwPool* pool = ctx->constructor();
2424
int size;
2525
void* work;
2626

27+
BackgroundWorkerUnblockSignals();
2728
BackgroundWorkerInitializeConnection(pool->dbname, NULL);
2829

30+
elog(WARNING, "Start background worker %d", id);
31+
2932
while(true) {
3033
PGSemaphoreLock(&pool->available);
3134
SpinLockAcquire(&pool->lock);
@@ -52,11 +55,9 @@ static void BgwPoolMainLoop(Datum arg)
5255
}
5356
}
5457

55-
BgwPool* BgwPoolCreate(BgwExecutor executor, char const* dbname, size_t queueSize, int nWorkers)
58+
void BgwPoolInit(BgwPool* pool, BgwPoolExecutor executor, char const* dbname, size_t queueSize)
5659
{
57-
int i;
58-
BackgroundWorker worker;
59-
BgwPool* pool = (BgwPool*)ShmemAlloc(queueSize + sizeof(BgwPool));
60+
pool->queue = (char*)ShmemAlloc(queueSize);
6061
pool->executor = executor;
6162
PGSemaphoreCreate(&pool->available);
6263
PGSemaphoreCreate(&pool->overflow);
@@ -68,22 +69,27 @@ BgwPool* BgwPoolCreate(BgwExecutor executor, char const* dbname, size_t queueSiz
6869
pool->tail = 0;
6970
pool->size = queueSize;
7071
strcpy(pool->dbname, dbname);
72+
}
73+
74+
void BgwPoolStart(int nWorkers, BgwPoolConstructor constructor)
75+
{
76+
int i;
77+
BackgroundWorker worker;
7178

7279
MemSet(&worker, 0, sizeof(BackgroundWorker));
7380
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
7481
worker.bgw_start_time = BgWorkerStart_ConsistentState;
7582
worker.bgw_main = BgwPoolMainLoop;
7683
worker.bgw_restart_time = 10; /* Wait 10 seconds for restart before crash */
77-
84+
7885
for (i = 0; i < nWorkers; i++) {
79-
BgwExecutorCtx* ctx = (BgwExecutorCtx*)malloc(sizeof(BgwExecutorCtx));
86+
BgwPoolExecutorCtx* ctx = (BgwPoolExecutorCtx*)malloc(sizeof(BgwPoolExecutorCtx));
8087
snprintf(worker.bgw_name, BGW_MAXLEN, "bgw_pool_worker_%d", i+1);
8188
ctx->id = i;
82-
ctx->pool = pool;
89+
ctx->constructor = constructor;
8390
worker.bgw_main_arg = (Datum)ctx;
8491
RegisterBackgroundWorker(&worker);
8592
}
86-
return pool;
8793
}
8894

8995
void BgwPoolExecute(BgwPool* pool, void* work, size_t size)

contrib/multimaster/bgwpool.h

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,33 @@
11
#ifndef __BGWPOOL_H__
22
#define __BGWPOOL_H__
33

4-
typedef void(*BgwExecutor)(int id, void* work, size_t size);
4+
#include "storage/s_lock.h"
5+
#include "storage/spin.h"
6+
#include "storage/pg_sema.h"
7+
8+
typedef void(*BgwPoolExecutor)(int id, void* work, size_t size);
59

610
#define MAX_DBNAME_LEN 30
711

812
typedef struct
913
{
10-
BgwExecutor executor;
14+
BgwPoolExecutor executor;
1115
volatile slock_t lock;
1216
PGSemaphoreData available;
1317
PGSemaphoreData overflow;
1418
size_t head;
1519
size_t tail;
1620
size_t size;
17-
bool producerBlocked;
18-
char dbname[MAX_DBNAME_LEN];
19-
char queue[1];
21+
bool producerBlocked;
22+
char dbname[MAX_DBNAME_LEN];
23+
char* queue;
2024
} BgwPool;
2125

22-
extern BgwPool* BgwPoolCreate(BgwExecutor executor, char const* dbname, size_t queueSize, int nWorkers);
26+
typedef BgwPool*(*BgwPoolConstructor)(void);
27+
28+
extern void BgwPoolStart(int nWorkers, BgwPoolConstructor constructor);
29+
30+
extern void BgwPoolInit(BgwPool* pool, BgwPoolExecutor executor, char const* dbname, size_t queueSize);
2331

2432
extern void BgwPoolExecute(BgwPool* pool, void* work, size_t size);
2533

contrib/multimaster/dtmd/src/ddd.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ void addSubgraph(Graph* graph, nodeid_t node_id, xid_t* xids, int n_xids)
116116
if (--e->dst->nIncomingEdges == 0 && l2_list_is_empty(&e->dst->outgoingEdges)) {
117117
freeVertex(graph, e->dst);
118118
}
119-
if (e->src->nIncomingEdges == 0 && l2_list_is_empty(&e->src->outgoingEdges)) {
119+
if (e->dst != e->src && e->src->nIncomingEdges == 0 && l2_list_is_empty(&e->src->outgoingEdges)) {
120120
freeVertex(graph, e->src);
121121
}
122122
freeEdge(graph, e);

contrib/multimaster/multimaster.c

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ typedef struct
6161
int nNodes;
6262
pg_atomic_uint32 nReceivers;
6363
bool initialized;
64-
BgwPool* pool;
64+
BgwPool pool;
6565
} DtmState;
6666

6767
typedef struct
@@ -104,6 +104,7 @@ static void DtmBackgroundWorker(Datum arg);
104104

105105
static void MMMarkTransAsLocal(TransactionId xid);
106106
static void MMExecutor(int id, void* work, size_t size);
107+
static BgwPool* MMPoolConstructor(void);
107108

108109
static shmem_startup_hook_type prev_shmem_startup_hook;
109110

@@ -716,7 +717,7 @@ static void DtmInitialize()
716717
dtm->nNodes = MMNodes;
717718
pg_atomic_write_u32(&dtm->nReceivers, 0);
718719
dtm->initialized = false;
719-
dtm->pool = BgwPoolCreate(MMExecutor, MMDatabaseName, MMQueueSize, MMWorkers);
720+
BgwPoolInit(&dtm->pool, MMExecutor, MMDatabaseName, MMQueueSize);
720721
RegisterXactCallback(DtmXactCallback, NULL);
721722
}
722723
LWLockRelease(AddinShmemInitLock);
@@ -953,6 +954,7 @@ _PG_init(void)
953954
if (MMNodes < 2) {
954955
elog(ERROR, "Multimaster should have at least two nodes");
955956
}
957+
BgwPoolStart(MMWorkers, MMPoolConstructor);
956958

957959
if (DtmBufferSize != 0)
958960
{
@@ -1203,7 +1205,6 @@ static void MMExecutor(int id, void* work, size_t size)
12031205
SPI_finish();
12041206
PopActiveSnapshot();
12051207
if (rc != SPI_OK_INSERT && rc != SPI_OK_UPDATE && rc != SPI_OK_DELETE) {
1206-
FlushErrorState();
12071208
ereport(LOG, (errmsg("Executor %d: failed to apply transaction %u",
12081209
id, xid)));
12091210
AbortCurrentTransaction();
@@ -1213,6 +1214,7 @@ static void MMExecutor(int id, void* work, size_t size)
12131214
}
12141215
PG_CATCH();
12151216
{
1217+
FlushErrorState();
12161218
if (rc == SPI_ERROR_TRANSACTION) {
12171219
SPI_finish();
12181220
PopActiveSnapshot();
@@ -1224,6 +1226,10 @@ static void MMExecutor(int id, void* work, size_t size)
12241226

12251227
extern void MMExecute(void* work, int size)
12261228
{
1227-
BgwPoolExecute(dtm->pool, work, size);
1229+
BgwPoolExecute(&dtm->pool, work, size);
12281230
}
12291231

1232+
static BgwPool* MMPoolConstructor(void)
1233+
{
1234+
return &dtm->pool;
1235+
}

contrib/pg_dtm/dtmd/src/ddd.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ void addSubgraph(Graph* graph, nodeid_t node_id, xid_t* xids, int n_xids)
116116
if (--e->dst->nIncomingEdges == 0 && l2_list_is_empty(&e->dst->outgoingEdges)) {
117117
freeVertex(graph, e->dst);
118118
}
119-
if (e->src->nIncomingEdges == 0 && l2_list_is_empty(&e->src->outgoingEdges)) {
119+
if (e->dst != e->src && e->src->nIncomingEdges == 0 && l2_list_is_empty(&e->src->outgoingEdges)) {
120120
freeVertex(graph, e->src);
121121
}
122122
freeEdge(graph, e);

0 commit comments

Comments
 (0)