Skip to content

Commit 3bb92c1

Browse files
committed
Fix bugs in 3PC implementation
1 parent 37624c4 commit 3bb92c1

File tree

7 files changed

+16
-13
lines changed

7 files changed

+16
-13
lines changed

contrib/mmts/multimaster.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1311,6 +1311,7 @@ void MtmSend2PCMessage(MtmTransState* ts, MtmMessageCode cmd)
13111311

13121312
if (MtmIsCoordinator(ts)) {
13131313
int i;
1314+
Assert(false); // All broadcasts are now done through logical decoding
13141315
for (i = 0; i < Mtm->nAllNodes; i++)
13151316
{
13161317
if (BIT_CHECK(ts->participantsMask & ~Mtm->disabledNodeMask & ~ts->votedMask, i))
@@ -3228,9 +3229,10 @@ bool MtmFilterTransaction(char* record, int size)
32283229
origin_node != 0 &&
32293230
(Mtm->status == MTM_RECOVERY || origin_node == replication_node));
32303231

3231-
switch(PGLOGICAL_XACT_EVENT(flags))
3232+
switch (PGLOGICAL_XACT_EVENT(flags))
32323233
{
32333234
case PGLOGICAL_PREPARE:
3235+
case PGLOGICAL_PRECOMMIT_PREPARED:
32343236
case PGLOGICAL_ABORT_PREPARED:
32353237
gid = pq_getmsgstring(&s);
32363238
break;

contrib/mmts/multimaster.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
#define MTM_TXTRACE(tx, event)
4040
#else
4141
#define MTM_TXTRACE(tx, event) \
42-
fprintf(stderr, "[MTM_TXTRACE], %s, %lld, %s, %d\n", tx->gid, (long long)MtmGetSystemTime(), event, getpid())
42+
fprintf(stderr, "[MTM_TXTRACE], %s, %lld, %s, %d\n", tx->gid, (long long)MtmGetSystemTime(), event, MyProcPid)
4343
#endif
4444

4545
#define MULTIMASTER_NAME "multimaster"

contrib/mmts/pglogical_apply.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -625,7 +625,9 @@ process_remote_commit(StringInfo in)
625625
{
626626
case PGLOGICAL_PRECOMMIT_PREPARED:
627627
{
628+
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
628629
gid = pq_getmsgstring(in);
630+
MTM_LOG2("%d: PGLOGICAL_PRECOMMIT_PREPARED %s", MyProcPid, gid);
629631
MtmPrecommitTransaction(gid);
630632
return;
631633
}

contrib/mmts/pglogical_proto.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,8 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
192192
else
193193
Assert(false);
194194

195-
if (flags == PGLOGICAL_COMMIT || flags == PGLOGICAL_PREPARE || flags == PGLOGICAL_PRECOMMIT_PREPARED) {
195+
if (flags == PGLOGICAL_COMMIT || flags == PGLOGICAL_PREPARE) {
196+
/* COMMIT and PREPARE are preceded by BEGIN, which set MtmIsFilteredTxn flag */
196197
if (MtmIsFilteredTxn) {
197198
Assert(MtmTransactionRecords == 0);
198199
return;

src/backend/access/transam/twophase.c

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -216,10 +216,10 @@ TwoPhaseShmemSize(void)
216216
/* Need the fixed struct, the array of pointers, and the GTD structs */
217217
size = offsetof(TwoPhaseStateData, prepXacts);
218218
size = add_size(size, mul_size(max_prepared_xacts,
219-
sizeof(GlobalTransaction)));
219+
sizeof(GlobalTransaction)*2));
220220
size = MAXALIGN(size);
221221
size = add_size(size, mul_size(max_prepared_xacts,
222-
sizeof(GlobalTransactionData)*2));
222+
sizeof(GlobalTransactionData)));
223223

224224
return size;
225225
}
@@ -247,9 +247,9 @@ TwoPhaseShmemInit(void)
247247
gxacts = (GlobalTransaction)
248248
((char *) TwoPhaseState +
249249
MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) +
250-
sizeof(GlobalTransaction) * max_prepared_xacts));
250+
sizeof(GlobalTransaction) * 2 * max_prepared_xacts));
251251

252-
TwoPhaseState->hashTable = (GlobalTransaction*)&gxacts[max_prepared_xacts];
252+
TwoPhaseState->hashTable = &TwoPhaseState->prepXacts[max_prepared_xacts];
253253

254254
for (i = 0; i < max_prepared_xacts; i++)
255255
{
@@ -696,9 +696,7 @@ void SetPrepareTransactionState(char const* gid, char const* state)
696696
strcpy(gxact->state_3pc, state);
697697
EndPrepare(gxact);
698698

699-
/* Unlock GXact */
700-
gxact->locking_backend = InvalidBackendId;
701-
MyLockedGxact = NULL;
699+
PostPrepare_Twophase();
702700
}
703701

704702
/* Working status for pg_prepared_xact */

src/backend/replication/logical/reorderbuffer.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1346,7 +1346,7 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
13461346
*
13471347
* We currently can only decode a transaction's contents in when their commit
13481348
* record is read because that's currently the only place where we know about
1349-
* cache invalidations. Thus, once a toplevel commit is read, we iterate over
1349+
* cache invalidati ons. Thus, once a toplevel commit is read, we iterate over
13501350
* the top and subtransactions (using a k-way merge) and replay the changes in
13511351
* lsn order.
13521352
*/
@@ -1734,7 +1734,7 @@ ReorderBufferCommitBareXact(ReorderBuffer *rb, TransactionId xid,
17341734
txn->origin_lsn = origin_lsn;
17351735
txn->xact_action = rb->xact_action;
17361736
strcpy(txn->gid, rb->gid);
1737-
*txn->gid = '\0';
1737+
*txn->state_3pc = '\0';
17381738

17391739
rb->commit(rb, txn, commit_lsn);
17401740
}

src/include/catalog/pg_proc.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3057,7 +3057,7 @@ DATA(insert OID = 1371 ( pg_lock_status PGNSP PGUID 12 1 1000 0 0 f f f f t t
30573057
DESCR("view system lock information");
30583058
DATA(insert OID = 2561 ( pg_blocking_pids PGNSP PGUID 12 1 0 0 0 f f f f t f v s 1 0 1007 "23" _null_ _null_ _null_ _null_ _null_ pg_blocking_pids _null_ _null_ _null_ ));
30593059
DESCR("get array of PIDs of sessions blocking specified backend PID");
3060-
DATA(insert OID = 1065 ( pg_prepared_xact PGNSP PGUID 12 1 1000 0 0 f f f f t t v s 0 0 2249 "" "{28,25,1184,26,26,25}" "{o,o,o,o,o,0}" "{transaction,gid,prepared,ownerid,dbid,state_3pc}" _null_ _null_ pg_prepared_xact _null_ _null_ _null_ ));
3060+
DATA(insert OID = 1065 ( pg_prepared_xact PGNSP PGUID 12 1 1000 0 0 f f f f t t v s 0 0 2249 "" "{28,25,1184,26,26,25}" "{o,o,o,o,o,o}" "{transaction,gid,prepared,ownerid,dbid,state3pc}" _null_ _null_ pg_prepared_xact _null_ _null_ _null_ ));
30613061
DESCR("view two-phase transactions");
30623062
DATA(insert OID = 3445 ( pg_precommit_prepared PGNSP PGUID 12 1 0 0 0 f f f f t f v s 2 0 2278 "2275,2275" _null_ _null_ _null_ _null_ _null_ pg_precommit_prepared _null_ _null_ _null_ ));
30633063
DESCR("alter state of prepared transaction (used for 3pc)");

0 commit comments

Comments
 (0)