Skip to content

Commit bf78c9e

Browse files
committed
Merge branch 'master' of github.com:postgrespro/postgres_cluster
2 parents fdbeae3 + 37a2e58 commit bf78c9e

File tree

3 files changed

+23
-15
lines changed

3 files changed

+23
-15
lines changed

contrib/mmts/multimaster.c

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
typedef struct {
7474
TransactionId xid; /* local transaction ID */
7575
GlobalTransactionId gtid; /* global transaction ID assigned by coordinator of transaction */
76+
bool isTwoPhase; /* user level 2PC */
7677
bool isReplicated; /* transaction on replica */
7778
bool isDistributed; /* transaction performed INSERT/UPDATE/DELETE and has to be replicated to other nodes */
7879
bool isPrepared; /* transaction is perpared at first stage of 2PC */
@@ -722,6 +723,7 @@ MtmResetTransaction()
722723
x->gtid.xid = InvalidTransactionId;
723724
x->isDistributed = false;
724725
x->isPrepared = false;
726+
x->isTwoPhase = false;
725727
x->status = TRANSACTION_STATUS_UNKNOWN;
726728
}
727729

@@ -749,6 +751,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
749751
x->isReplicated = MtmIsLogicalReceiver;
750752
x->isDistributed = MtmIsUserTransaction();
751753
x->isPrepared = false;
754+
x->isTwoPhase = false;
752755
x->isTransactionBlock = IsTransactionBlock();
753756
/* Application name can be changed usnig PGAPPNAME environment variable */
754757
if (x->isDistributed && Mtm->status != MTM_ONLINE && strcmp(application_name, MULTIMASTER_ADMIN) != 0) {
@@ -909,8 +912,7 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
909912
Assert(ts != NULL);
910913
//if (x->gid[0]) MTM_LOG1("Preparing transaction %d (%s) at %ld", x->xid, x->gid, MtmGetCurrentTime());
911914
if (!MtmIsCoordinator(ts) || Mtm->status == MTM_RECOVERY) {
912-
bool found;
913-
MtmTransMap* tm = (MtmTransMap*)hash_search(MtmGid2State, x->gid, HASH_ENTER, &found);
915+
MtmTransMap* tm = (MtmTransMap*)hash_search(MtmGid2State, x->gid, HASH_ENTER, NULL);
914916
Assert(x->gid[0]);
915917
tm->state = ts;
916918
ts->votingCompleted = true;
@@ -928,8 +930,13 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
928930
time_t transTimeout = Max(MSEC_TO_USEC(Mtm2PCMinTimeout), (ts->csn - ts->snapshot)*Mtm2PCPrepareRatio/100);
929931
int result = 0;
930932
int nConfigChanges = Mtm->nConfigChanges;
931-
932933
timestamp_t start = MtmGetSystemTime();
934+
935+
if (x->isTwoPhase) {
936+
MtmTransMap* tm = (MtmTransMap*)hash_search(MtmGid2State, x->gid, HASH_ENTER, NULL);
937+
tm->state = ts;
938+
}
939+
933940
/* Wait votes from all nodes until: */
934941
while (!ts->votingCompleted /* all nodes voted */
935942
&& nConfigChanges == Mtm->nConfigChanges /* configarion is changed */
@@ -985,7 +992,7 @@ MtmAbortPreparedTransaction(MtmCurrentTrans* x)
985992
MtmLock(LW_EXCLUSIVE);
986993
tm = (MtmTransMap*)hash_search(MtmGid2State, x->gid, HASH_REMOVE, NULL);
987994
if (tm == NULL) {
988-
elog(WARNING, "Global transaciton ID %s is not found", x->gid);
995+
elog(WARNING, "Global transaciton ID '%s' is not found", x->gid);
989996
} else {
990997
Assert(tm->state != NULL);
991998
MTM_LOG1("Abort prepared transaction %d with gid='%s'", x->xid, x->gid);
@@ -1268,7 +1275,7 @@ void MtmAbortTransaction(MtmTransState* ts)
12681275
Assert(MtmLockCount != 0); /* should be invoked with exclsuive lock */
12691276
if (ts->status != TRANSACTION_STATUS_ABORTED) {
12701277
if (ts->status == TRANSACTION_STATUS_COMMITTED) {
1271-
elog(WARNING, "Attempt to rollback already committed transaction %d (%s)", ts->xid, ts->gid);
1278+
elog(LOG, "Attempt to rollback already committed transaction %d (%s)", ts->xid, ts->gid);
12721279
} else {
12731280
MTM_LOG1("Rollback active transaction %d:%d (local xid %d) status %d", ts->gtid.node, ts->gtid.xid, ts->xid, ts->status);
12741281
ts->status = TRANSACTION_STATUS_ABORTED;
@@ -3804,11 +3811,10 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
38043811
}
38053812
break;
38063813
case TRANS_STMT_PREPARE:
3807-
elog(ERROR, "Two phase commit is not supported by multimaster");
3808-
break;
38093814
case TRANS_STMT_COMMIT_PREPARED:
38103815
case TRANS_STMT_ROLLBACK_PREPARED:
3811-
skipCommand = true;
3816+
MtmTx.isTwoPhase = true;
3817+
strcpy(MtmTx.gid, stmt->gid);
38123818
break;
38133819
default:
38143820
break;
@@ -3966,8 +3972,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
39663972
standard_ProcessUtility(parsetree, queryString, context,
39673973
params, dest, completionTag);
39683974
}
3969-
3970-
if (MtmTx.isDistributed && XactIsoLevel != XACT_REPEATABLE_READ && !MtmVolksWagenMode) {
3975+
if (!MtmVolksWagenMode && MtmTx.isDistributed && XactIsoLevel != XACT_REPEATABLE_READ) {
39713976
elog(ERROR, "Isolation level %s is not supported by multimaster", isoLevelStr[XactIsoLevel]);
39723977
}
39733978

@@ -4175,7 +4180,7 @@ MtmDetectGlobalDeadLockFortXid(TransactionId xid)
41754180
}
41764181
MtmGetGtid(xid, &gtid);
41774182
hasDeadlock = MtmGraphFindLoop(&graph, &gtid);
4178-
elog(WARNING, "Distributed deadlock check by backend %d for %u:%u = %d", MyProcPid, gtid.node, gtid.xid, hasDeadlock);
4183+
elog(LOG, "Distributed deadlock check by backend %d for %u:%u = %d", MyProcPid, gtid.node, gtid.xid, hasDeadlock);
41794184
if (!hasDeadlock) {
41804185
/* There is no deadlock loop in graph, but deadlock can be caused by lack of apply workers: if all of them are busy, then some transactions
41814186
* can not be appied just because there are no vacant workers and it cause additional dependency between transactions which is not

src/backend/storage/lmgr/predicate.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3901,6 +3901,7 @@ CheckForSerializableConflictOut(bool visible, Relation relation,
39013901
* is going on with it.
39023902
*/
39033903
htsvResult = HeapTupleSatisfiesVacuum(tuple, TransactionXmin, buffer);
3904+
elog(LOG, "HeapTupleSatisfiesVacuum(%d) = %d, TransactionXmin=%d", HeapTupleHeaderGetXmin(tuple->t_data), htsvResult, TransactionXmin);
39043905
switch (htsvResult)
39053906
{
39063907
case HEAPTUPLE_LIVE:
@@ -3939,7 +3940,7 @@ CheckForSerializableConflictOut(bool visible, Relation relation,
39393940
xid = InvalidTransactionId;
39403941
}
39413942
Assert(TransactionIdIsValid(xid));
3942-
Assert(TransactionIdFollowsOrEquals(xid, TransactionXmin));
3943+
//Assert(TransactionIdFollowsOrEquals(xid, TransactionXmin));
39433944

39443945
/*
39453946
* Find top level xid. Bail out if xid is too early to be a conflict, or

src/backend/utils/time/tqual.c

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1131,8 +1131,9 @@ HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot,
11311131
if (!TransactionIdDidCommit(HeapTupleHeaderGetRawXmax(tuple)))
11321132
{
11331133
/* it must have aborted or crashed */
1134-
SetHintBits(tuple, buffer, HEAP_XMAX_INVALID,
1135-
InvalidTransactionId);
1134+
if (!TransactionIdIsInProgress(HeapTupleHeaderGetRawXmax(tuple)))
1135+
SetHintBits(tuple, buffer, HEAP_XMAX_INVALID,
1136+
InvalidTransactionId);
11361137
return true;
11371138
}
11381139

@@ -1271,8 +1272,9 @@ HeapTupleSatisfiesVacuum(HeapTuple htup, TransactionId OldestXmin,
12711272
* Okay, the inserter committed, so it was good at some point. Now what
12721273
* about the deleting transaction?
12731274
*/
1274-
if (tuple->t_infomask & HEAP_XMAX_INVALID)
1275+
if (tuple->t_infomask & HEAP_XMAX_INVALID) {
12751276
return HEAPTUPLE_LIVE;
1277+
}
12761278

12771279
if (HEAP_XMAX_IS_LOCKED_ONLY(tuple->t_infomask))
12781280
{

0 commit comments

Comments
 (0)