Skip to content

Commit 33e709e

Browse files
committed
Update multimaster
1 parent 96f798c commit 33e709e

File tree

13 files changed

+766
-179
lines changed

13 files changed

+766
-179
lines changed

contrib/multimaster/decoder_raw.c

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -99,16 +99,20 @@ decoder_raw_shutdown(LogicalDecodingContext *ctx)
9999
}
100100

101101
/* BEGIN callback */
102+
static TransactionId lastXid;
103+
102104
static void
103105
decoder_raw_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
104106
{
105107
DecoderRawData *data = ctx->output_plugin_private;
106-
108+
Assert(lastXid != txn->xid);
109+
lastXid = txn->xid;
107110
if (MMIsLocalTransaction(txn->xid)) {
111+
XTM_INFO("Skip local transaction %u\n", txn->xid);
108112
data->isLocal = true;
109113
} else {
110114
OutputPluginPrepareWrite(ctx, true);
111-
elog(WARNING, "Send transation to %u to replica", txn->xid);
115+
XTM_INFO("Send transaction %u to replica\n", txn->xid);
112116
appendStringInfo(ctx->out, "BEGIN %u;", txn->xid);
113117
OutputPluginWrite(ctx, true);
114118
data->isLocal = false;
@@ -122,10 +126,12 @@ decoder_raw_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
122126
{
123127
DecoderRawData *data = ctx->output_plugin_private;
124128
if (!data->isLocal) {
125-
elog(WARNING, "Send commit of %u to replica", txn->xid);
129+
XTM_INFO("Send commit of transaction %u to replica\n", txn->xid);
126130
OutputPluginPrepareWrite(ctx, true);
127131
appendStringInfoString(ctx->out, "COMMIT;");
128132
OutputPluginWrite(ctx, true);
133+
} else {
134+
XTM_INFO("Skip commit of transaction %u\n", txn->xid);
129135
}
130136
}
131137

@@ -291,7 +297,7 @@ print_where_clause(StringInfo s,
291297
int key;
292298

293299
/* Use all the values associated with the index */
294-
indexRel = index_open(relation->rd_replidindex, ShareLock);
300+
indexRel = index_open(relation->rd_replidindex, AccessShareLock);
295301
for (key = 0; key < indexRel->rd_index->indnatts; key++)
296302
{
297303
int relattr = indexRel->rd_index->indkey.values[key];
@@ -477,9 +483,10 @@ decoder_raw_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
477483

478484
data = ctx->output_plugin_private;
479485
if (data->isLocal) {
486+
XTM_INFO("Skip action %d in transaction %u\n", change->action, txn->xid);
480487
return;
481488
}
482-
elog(WARNING, "Send action %d in transaction %u to replica", change->action, txn->xid);
489+
XTM_INFO("Send action %d in transaction %u to replica\n", change->action, txn->xid);
483490

484491
/* Avoid leaking memory by using and resetting our own context */
485492
old = MemoryContextSwitchTo(data->context);

contrib/multimaster/dtmd/src/main.c

Lines changed: 31 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -357,13 +357,13 @@ static void onvote(client_t client, int argc, xid_t *argv, int vote) {
357357
// Check the arguments
358358
xid_t xid = argv[1];
359359
bool wait = argv[2];
360-
360+
#if 0
361361
CHECK(
362362
CLIENT_XID(client) == xid,
363363
client,
364364
"VOTE: voting for a transaction not participated in"
365365
);
366-
366+
#endif
367367
Transaction *t = find_transaction(xid);
368368
if (t == NULL) {
369369
shout(
@@ -392,6 +392,10 @@ static void onvote(client_t client, int argc, xid_t *argv, int vote) {
392392
client,
393393
"VOTE: transaction failed to abort O_o"
394394
);
395+
shout(
396+
"[%d] VOTE: abort xid %u\n",
397+
CLIENT_ID(client), xid
398+
);
395399

396400
notify_listeners(t, NEGATIVE);
397401
free_transaction(t);
@@ -427,45 +431,43 @@ static void onvote(client_t client, int argc, xid_t *argv, int vote) {
427431
}
428432

429433
static void onsnapshot(client_t client, int argc, xid_t *argv) {
434+
Snapshot snapshot_now;
430435
CHECK(
431436
argc == 2,
432437
client,
433438
"SNAPSHOT: wrong number of arguments"
434439
);
435440

436441
xid_t xid = argv[1];
437-
442+
Snapshot *snap;
438443
Transaction *t = find_transaction(xid);
439444
if (t == NULL) {
440445
shout(
441-
"[%d] SNAPSHOT: xid %u not found\n",
446+
"[%d] SNAPSHOT: xid %u not found: use curent snapshot\n",
442447
CLIENT_ID(client), xid
443448
);
444-
client_message_shortcut(client, RES_FAILED);
445-
return;
446-
}
447-
448-
if (CLIENT_XID(client) == INVALID_XID) {
449-
CLIENT_SNAPSENT(client) = 0;
450-
CLIENT_XID(client) = t->xid;
451-
}
452-
453-
CHECK(
454-
CLIENT_XID(client) == t->xid,
455-
client,
456-
"SNAPSHOT: getting snapshot for a transaction not participated in"
457-
);
458-
459-
assert(CLIENT_SNAPSENT(client) <= t->snapshots_count); // who sent an inexistent snapshot?!
460-
461-
if (CLIENT_SNAPSENT(client) == t->snapshots_count) {
462-
// a fresh snapshot is needed
463-
gen_snapshot(transaction_next_snapshot(t));
464-
}
465-
466-
Snapshot *snap = transaction_snapshot(t, CLIENT_SNAPSENT(client)++);
467-
snap->times_sent += 1; // FIXME: does times_sent get used anywhere? see also 4765234987
468-
449+
gen_snapshot(&snapshot_now);
450+
snap = &snapshot_now;
451+
} else {
452+
if (CLIENT_XID(client) == INVALID_XID) {
453+
CLIENT_SNAPSENT(client) = 0;
454+
CLIENT_XID(client) = t->xid;
455+
}
456+
CHECK(
457+
CLIENT_XID(client) == t->xid,
458+
client,
459+
"SNAPSHOT: getting snapshot for a transaction not participated in"
460+
);
461+
assert(CLIENT_SNAPSENT(client) <= t->snapshots_count); // who sent an inexistent snapshot?!
462+
463+
if (CLIENT_SNAPSENT(client) == t->snapshots_count) {
464+
// a fresh snapshot is needed
465+
gen_snapshot(transaction_next_snapshot(t));
466+
}
467+
468+
snap = transaction_snapshot(t, CLIENT_SNAPSENT(client)++);
469+
snap->times_sent += 1; // FIXME: does times_sent get used anywhere? see also 4765234987
470+
}
469471
xid_t ok = RES_OK;
470472
client_message_start(client); {
471473
client_message_append(client, sizeof(xid_t), &ok);

contrib/multimaster/dtmd/src/transaction.c

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,16 @@ typedef struct list_node_t {
1111

1212
int transaction_status(Transaction *t) {
1313
assert(t->votes_for + t->votes_against <= t->size);
14-
14+
#if 0 /* report ABORTED status immediately: do not wait for all responses */
1515
if (t->votes_for + t->votes_against < t->size) {
1616
return DOUBT;
1717
}
18-
19-
if (t->votes_against) {
20-
return NEGATIVE;
21-
} else {
22-
return POSITIVE;
23-
}
18+
#endif
19+
return t->votes_against
20+
? NEGATIVE
21+
: (t->votes_for == t->size)
22+
? POSITIVE
23+
: DOUBT;
2424
}
2525

2626
void transaction_clear(Transaction *t) {

contrib/multimaster/init.sql

Lines changed: 0 additions & 2 deletions
This file was deleted.

contrib/multimaster/multimaster.c

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,8 @@ static char* DtmHost;
148148
static int DtmPort;
149149
static int DtmBufferSize;
150150

151-
static ExecutorRun_hook_type PreviousExecutorRunHook = NULL;
152-
static void MMExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count);
151+
static ExecutorFinish_hook_type PreviousExecutorFinishHook = NULL;
152+
static void MMExecutorFinish(QueryDesc *queryDesc);
153153
static bool MMIsDistributedTrans;
154154

155155
static BackgroundWorker DtmWorker = {
@@ -160,9 +160,6 @@ static BackgroundWorker DtmWorker = {
160160
DtmBackgroundWorker
161161
};
162162

163-
#define XTM_TRACE(fmt, ...)
164-
#define XTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
165-
//#define XTM_INFO(fmt, ...)
166163

167164
static void DumpSnapshot(Snapshot s, char *name)
168165
{
@@ -235,7 +232,10 @@ static void DtmMergeWithGlobalSnapshot(Snapshot dst)
235232
TransactionId xid;
236233
Snapshot src = &DtmSnapshot;
237234

238-
Assert(TransactionIdIsValid(src->xmin) && TransactionIdIsValid(src->xmax));
235+
if (!(TransactionIdIsValid(src->xmin) && TransactionIdIsValid(src->xmax))) {
236+
PgGetSnapshotData(dst);
237+
return;
238+
}
239239

240240
GetLocalSnapshot:
241241
/*
@@ -667,8 +667,11 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
667667
hash_search(xid_in_doubt, &DtmNextXid, HASH_ENTER, NULL);
668668
LWLockRelease(dtm->hashLock);
669669
if (DtmGlobalSetTransStatus(xid, status, true) != status) {
670-
END_CRIT_SECTION();
670+
DtmNextXid = InvalidTransactionId;
671+
DtmLastSnapshot = NULL;
672+
MMIsDistributedTrans = false;
671673
MarkAsAborted();
674+
END_CRIT_SECTION();
672675
elog(ERROR, "Transaction commit rejected by XTM");
673676
}
674677
XTM_INFO("Commit transaction %d\n", xid);
@@ -763,7 +766,10 @@ DtmXactCallback(XactEvent event, void *arg)
763766
case XACT_EVENT_PRE_COMMIT:
764767
case XACT_EVENT_PARALLEL_PRE_COMMIT:
765768
if (!MMIsDistributedTrans && TransactionIdIsValid(GetCurrentTransactionIdIfAny())) {
769+
XTM_INFO("%d: Will ignore transaction %u\n", getpid(), GetCurrentTransactionIdIfAny());
766770
MMMarkTransAsLocal(GetCurrentTransactionIdIfAny());
771+
} else {
772+
XTM_INFO("%d: Transaction %u will be replicated\n", getpid(), GetCurrentTransactionIdIfAny());
767773
}
768774
break;
769775
case XACT_EVENT_COMMIT:
@@ -794,9 +800,9 @@ DtmXactCallback(XactEvent event, void *arg)
794800
}
795801
DtmNextXid = InvalidTransactionId;
796802
DtmLastSnapshot = NULL;
797-
MMIsDistributedTrans = false;
798-
break;
799-
}
803+
}
804+
MMIsDistributedTrans = false;
805+
break;
800806
default:
801807
break;
802808
}
@@ -933,8 +939,8 @@ _PG_init(void)
933939
prev_shmem_startup_hook = shmem_startup_hook;
934940
shmem_startup_hook = DtmShmemStartup;
935941

936-
PreviousExecutorRunHook = ExecutorRun_hook;
937-
ExecutorRun_hook = MMExecutorRun;
942+
PreviousExecutorFinishHook = ExecutorFinish_hook;
943+
ExecutorFinish_hook = MMExecutorFinish;
938944
}
939945

940946
/*
@@ -944,7 +950,7 @@ void
944950
_PG_fini(void)
945951
{
946952
shmem_startup_hook = prev_shmem_startup_hook;
947-
ExecutorRun_hook = PreviousExecutorRunHook;
953+
ExecutorFinish_hook = PreviousExecutorFinishHook;
948954
}
949955

950956

@@ -1021,6 +1027,7 @@ bool MMIsLocalTransaction(TransactionId xid)
10211027
lt = hash_search(local_trans, &xid, HASH_FIND, NULL);
10221028
if (lt != NULL) {
10231029
result = true;
1030+
Assert(lt->count > 0);
10241031
if (--lt->count == 0) {
10251032
hash_search(local_trans, &xid, HASH_REMOVE, NULL);
10261033
}
@@ -1154,19 +1161,22 @@ mm_stop_replication(PG_FUNCTION_ARGS)
11541161
}
11551162

11561163
static void
1157-
MMExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count)
1164+
MMExecutorFinish(QueryDesc *queryDesc)
11581165
{
11591166
if (MMDoReplication) {
11601167
CmdType operation = queryDesc->operation;
1161-
MMIsDistributedTrans |= operation == CMD_INSERT || operation == CMD_UPDATE || operation == CMD_DELETE;
1168+
EState *estate = queryDesc->estate;
1169+
if (estate->es_processed != 0) {
1170+
MMIsDistributedTrans |= operation == CMD_INSERT || operation == CMD_UPDATE || operation == CMD_DELETE;
1171+
}
11621172
}
1163-
if (PreviousExecutorRunHook != NULL)
1173+
if (PreviousExecutorFinishHook != NULL)
11641174
{
1165-
PreviousExecutorRunHook(queryDesc, direction, count);
1175+
PreviousExecutorFinishHook(queryDesc);
11661176
}
11671177
else
11681178
{
1169-
standard_ExecutorRun(queryDesc, direction, count);
1179+
standard_ExecutorFinish(queryDesc);
11701180
}
11711181
}
11721182

contrib/multimaster/multimaster.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
#ifndef __MULTIMASTER_H__
22
#define __MULTIMASTER_H__
33

4+
#define XTM_TRACE(fmt, ...)
5+
#define XTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
6+
//#define XTM_INFO(fmt, ...)
7+
48
extern int MMStartReceivers(char* nodes, int node_id);
59
extern void MMBeginTransaction(void);
610
extern void MMJoinTransaction(TransactionId xid);

contrib/multimaster/receiver_raw.c

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,8 @@ receiver_raw_main(Datum main_arg)
210210
PGconn *conn;
211211
PGresult *res;
212212
bool insideTrans = false;
213-
213+
bool rollbackTransaction = false;
214+
214215
/* Register functions for SIGTERM/SIGHUP management */
215216
pqsignal(SIGHUP, receiver_raw_sighup);
216217
pqsignal(SIGTERM, receiver_raw_sigterm);
@@ -414,37 +415,47 @@ receiver_raw_main(Datum main_arg)
414415
int rc = sscanf(stmt + 6, "%u", &xid);
415416
Assert(rc == 1);
416417
Assert(!insideTrans);
417-
elog(WARNING, "Receiver begin transaction %u", xid);
418418
SetCurrentStatementStartTimestamp();
419+
MMJoinTransaction(xid);
419420
StartTransactionCommand();
420421
SPI_connect();
421422
PushActiveSnapshot(GetTransactionSnapshot());
422-
MMJoinTransaction(xid);
423423
insideTrans = true;
424+
rollbackTransaction = false;
424425
} else if (strncmp(stmt, "COMMIT;", 7) == 0) {
425-
elog(WARNING, "Receiver commit transaction");
426426
Assert(insideTrans);
427427
insideTrans = false;
428428
SPI_finish();
429429
PopActiveSnapshot();
430-
CommitTransactionCommand();
431-
} else {
430+
if (rollbackTransaction) {
431+
AbortCurrentTransaction();
432+
} else {
433+
CommitTransactionCommand();
434+
}
435+
} else if (!rollbackTransaction) {
432436
Assert(insideTrans);
433437
/* Execute query */
434-
rc = SPI_execute(stmt, false, 0);
435-
436-
if (rc == SPI_OK_INSERT)
437-
ereport(LOG, (errmsg("%s: INSERT received correctly: %s",
438-
worker_name, stmt)));
439-
else if (rc == SPI_OK_UPDATE)
440-
ereport(LOG, (errmsg("%s: UPDATE received correctly: %s",
441-
worker_name, stmt)));
442-
else if (rc == SPI_OK_DELETE)
443-
ereport(LOG, (errmsg("%s: DELETE received correctly: %s",
444-
worker_name, stmt)));
445-
else
446-
ereport(LOG, (errmsg("%s: Error when applying change: %s",
447-
worker_name, stmt)));
438+
PG_TRY();
439+
{
440+
rc = SPI_execute(stmt, false, 0);
441+
if (rc == SPI_OK_INSERT)
442+
ereport(LOG, (errmsg("%s: INSERT received correctly: %s",
443+
worker_name, stmt)));
444+
else if (rc == SPI_OK_UPDATE)
445+
ereport(LOG, (errmsg("%s: UPDATE received correctly: %s",
446+
worker_name, stmt)));
447+
else if (rc == SPI_OK_DELETE)
448+
ereport(LOG, (errmsg("%s: DELETE received correctly: %s",
449+
worker_name, stmt)));
450+
else
451+
ereport(LOG, (errmsg("%s: Error when applying change: %s",
452+
worker_name, stmt)));
453+
}
454+
PG_CATCH();
455+
{
456+
rollbackTransaction = true;
457+
}
458+
PG_END_TRY();
448459
}
449460
/* Update written position */
450461
output_written_lsn = Max(walEnd, output_written_lsn);

contrib/multimaster/tests/core

-90.7 MB
Binary file not shown.

contrib/multimaster/tests/dtmbench

-46.8 KB
Binary file not shown.

0 commit comments

Comments
 (0)