Skip to content

Commit 8c1b0b7

Browse files
committed
Merge branch 'twophase_decoding'
2 parents 27bba5c + 26dd907 commit 8c1b0b7

File tree

5 files changed

+96
-9
lines changed

5 files changed

+96
-9
lines changed

contrib/mmts/arbiter.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -555,7 +555,7 @@ static bool MtmRecovery()
555555
}
556556
}
557557
}
558-
return recorvered;
558+
return recovered;
559559
}
560560
#endif
561561

@@ -602,8 +602,8 @@ static void MtmTransReceiver(Datum arg)
602602
do {
603603
struct timeval tv;
604604
events = inset;
605-
tv.tv_sec = MtmKeepAliveTimeout/USEC;
606-
tv.tv_usec = MtmKeepAliveTimeout%USEC;
605+
tv.tv_sec = MtmKeepaliveTimeout/USEC;
606+
tv.tv_usec = MtmKeepaliveTimeout%USEC;
607607
n = select(max_fd+1, &events, NULL, NULL, &tv);
608608
} while (n < 0 && MtmRecovery());
609609

contrib/mmts/multimaster.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,13 @@
2929
typedef uint64 csn_t; /* commit serial number */
3030
#define INVALID_CSN ((csn_t)-1)
3131

32+
#define PGLOGICAL_COMMIT 0x00
33+
#define PGLOGICAL_PREPARE 0x01
34+
#define PGLOGICAL_COMMIT_PREPARED 0x02
35+
#define PGLOGICAL_ABORT_PREPARED 0x03
36+
37+
#define PGLOGICAL_XACT_EVENT(flags) (flags & 0x03)
38+
3239
typedef uint64 timestamp_t;
3340

3441
/* Identifier of global transaction */

contrib/mmts/pglogical_apply.c

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -464,9 +464,61 @@ read_rel(StringInfo s, LOCKMODE mode)
464464
}
465465

466466
static void
467-
process_remote_commit(StringInfo s)
467+
process_remote_commit(StringInfo in)
468468
{
469-
CommitTransactionCommand();
469+
XLogRecPtr commit_lsn;
470+
XLogRecPtr end_lsn;
471+
TimestampTz commit_time;
472+
uint8 flags;
473+
const char *gid;
474+
475+
/* read flags */
476+
flags = pq_getmsgbyte(in);
477+
478+
/* read fields */
479+
commit_lsn = pq_getmsgint64(in);
480+
end_lsn = pq_getmsgint64(in);
481+
commit_time = pq_getmsgint64(in);
482+
483+
if (PGLOGICAL_XACT_EVENT(flags) != PGLOGICAL_COMMIT)
484+
gid = pq_getmsgstring(in);
485+
486+
switch(PGLOGICAL_XACT_EVENT(flags))
487+
{
488+
case PGLOGICAL_COMMIT:
489+
{
490+
if (IsTransactionState())
491+
CommitTransactionCommand();
492+
break;
493+
}
494+
case PGLOGICAL_PREPARE:
495+
{
496+
/* prepare TBLOCK_INPROGRESS state for PrepareTransactionBlock() */
497+
BeginTransactionBlock();
498+
CommitTransactionCommand();
499+
StartTransactionCommand();
500+
/* PREPARE itself */
501+
PrepareTransactionBlock(gid);
502+
CommitTransactionCommand();
503+
break;
504+
}
505+
case PGLOGICAL_COMMIT_PREPARED:
506+
{
507+
StartTransactionCommand();
508+
FinishPreparedTransaction(gid, true);
509+
CommitTransactionCommand();
510+
break;
511+
}
512+
case PGLOGICAL_ABORT_PREPARED:
513+
{
514+
StartTransactionCommand();
515+
FinishPreparedTransaction(gid, false);
516+
CommitTransactionCommand();
517+
break;
518+
}
519+
default:
520+
Assert(false);
521+
}
470522
}
471523

472524
static void

contrib/mmts/pglogical_proto.c

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,37 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
127127
ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
128128
{
129129
PGLogicalProtoMM* mm = (PGLogicalProtoMM*)data->api;
130-
if (!mm->isLocal) {
131-
pq_sendbyte(out, 'C'); /* sending COMMIT */
132-
}
130+
uint8 flags = 0;
131+
132+
if (mm->isLocal)
133+
return;
134+
135+
pq_sendbyte(out, 'C'); /* sending COMMIT */
136+
137+
if (txn->xact_action == XLOG_XACT_COMMIT)
138+
flags = PGLOGICAL_COMMIT;
139+
else if (txn->xact_action == XLOG_XACT_PREPARE)
140+
flags = PGLOGICAL_PREPARE;
141+
else if (txn->xact_action == XLOG_XACT_COMMIT_PREPARED)
142+
flags = PGLOGICAL_COMMIT_PREPARED;
143+
else if (txn->xact_action == XLOG_XACT_ABORT_PREPARED)
144+
flags = PGLOGICAL_ABORT_PREPARED;
145+
else
146+
Assert(false);
147+
148+
/* send the flags field */
149+
pq_sendbyte(out, flags);
150+
151+
/* send fixed fields */
152+
pq_sendint64(out, commit_lsn);
153+
pq_sendint64(out, txn->end_lsn);
154+
pq_sendint64(out, txn->commit_time);
155+
156+
if (txn->xact_action == XLOG_XACT_PREPARE ||
157+
txn->xact_action == XLOG_XACT_COMMIT_PREPARED ||
158+
txn->xact_action == XLOG_XACT_ABORT_PREPARED)
159+
pq_sendstring(out, txn->gid);
160+
133161
}
134162

135163
/*

contrib/mmts/tests/reinit-mm.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
n_nodes=3
2-
export PATH=~/postgres_cluster/dist/bin/:$PATH
2+
export PATH=~/code/postgres_cluster/install/bin/:$PATH
33
ulimit -c unlimited
44
pkill -9 postgres
55
pkill -9 arbiter

0 commit comments

Comments
 (0)