Skip to content

Commit 458222b

Browse files
committed
Fix deadlock detection
1 parent 757c245 commit 458222b

File tree

4 files changed

+52
-30
lines changed

4 files changed

+52
-30
lines changed

contrib/mmts/multimaster.c

Lines changed: 39 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -668,6 +668,29 @@ MtmBeginTransaction(MtmCurrentTrans* x)
668668
}
669669
}
670670

671+
672+
static MtmTransState*
673+
MtmCreateTransState(MtmCurrentTrans* x)
674+
{
675+
bool found;
676+
MtmTransState* ts = hash_search(MtmXid2State, &x->xid, HASH_ENTER, &found);
677+
if (!found) {
678+
ts->status = TRANSACTION_STATUS_IN_PROGRESS;
679+
ts->snapshot = x->snapshot;
680+
if (TransactionIdIsValid(x->gtid.xid)) {
681+
Assert(x->gtid.node != MtmNodeId);
682+
ts->gtid = x->gtid;
683+
} else {
684+
/* I am coordinator of transaction */
685+
ts->gtid.xid = x->xid;
686+
ts->gtid.node = MtmNodeId;
687+
}
688+
}
689+
return ts;
690+
}
691+
692+
693+
671694
/*
672695
* Prepare transaction for two-phase commit.
673696
* This code is executed by PRE_PREPARE hook before PREPARE message is sent to replicas by logical replication
@@ -676,7 +699,7 @@ static void
676699
MtmPrePrepareTransaction(MtmCurrentTrans* x)
677700
{
678701
MtmTransState* ts;
679-
TransactionId *subxids;
702+
TransactionId* subxids;
680703

681704
if (!x->isDistributed) {
682705
return;
@@ -704,14 +727,12 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
704727
MtmCheckClusterLock();
705728
}
706729

707-
ts = hash_search(MtmXid2State, &x->xid, HASH_ENTER, NULL);
708-
ts->status = TRANSACTION_STATUS_IN_PROGRESS;
730+
ts = MtmCreateTransState(x);
709731
/*
710732
* Invalid CSN prevent replication of transaction by logical replication
711733
*/
712734
ts->snapshot = x->isReplicated || !x->containsDML ? INVALID_CSN : x->snapshot;
713735
ts->csn = MtmAssignCSN();
714-
ts->gtid = x->gtid;
715736
ts->procno = MyProc->pgprocno;
716737
ts->nVotes = 1; /* I am voted myself */
717738
ts->votingCompleted = false;
@@ -723,15 +744,6 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
723744
x->csn = ts->csn;
724745

725746
Mtm->transCount += 1;
726-
727-
if (TransactionIdIsValid(x->gtid.xid)) {
728-
Assert(x->gtid.node != MtmNodeId);
729-
ts->gtid = x->gtid;
730-
} else {
731-
/* I am coordinator of transaction */
732-
ts->gtid.xid = x->xid;
733-
ts->gtid.node = MtmNodeId;
734-
}
735747
MtmTransactionListAppend(ts);
736748
MtmAddSubtransactions(ts, subxids, ts->nSubxids);
737749
MTM_TRACE("%d: MtmPrePrepareTransaction prepare commit of %d (gtid.xid=%d, gtid.node=%d, CSN=%ld)\n",
@@ -845,7 +857,9 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
845857
MtmTransactionListAppend(ts);
846858
}
847859
MtmSendNotificationMessage(ts, MSG_ABORTED); /* send notification to coordinator */
848-
}
860+
} else if (x->status == TRANSACTION_STATUS_ABORTED && x->isReplicated && !x->isPrepared) {
861+
hash_search(MtmXid2State, &x->xid, HASH_REMOVE, NULL);
862+
}
849863
MtmUnlock();
850864
}
851865
MtmResetTransaction(x);
@@ -869,28 +883,32 @@ void MtmSendNotificationMessage(MtmTransState* ts, MtmMessageCode cmd)
869883

870884
void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t globalSnapshot)
871885
{
886+
MtmTx.gtid = *gtid;
887+
MtmTx.xid = GetCurrentTransactionId();
888+
MtmTx.isReplicated = true;
889+
MtmTx.isDistributed = true;
890+
MtmTx.containsDML = true;
891+
872892
if (globalSnapshot != INVALID_CSN) {
873893
MtmLock(LW_EXCLUSIVE);
874894
MtmSyncClock(globalSnapshot);
895+
MtmTx.snapshot = globalSnapshot;
896+
if (Mtm->status != MTM_RECOVERY) {
897+
MtmCreateTransState(&MtmTx); /* we need local->remote xid mapping for deadlock detection */
898+
}
875899
MtmUnlock();
876900
} else {
877901
globalSnapshot = MtmTx.snapshot;
878902
}
879903
if (!TransactionIdIsValid(gtid->xid)) {
880904
/* In case of recovery InvalidTransactionId is passed */
881905
if (Mtm->status != MTM_RECOVERY) {
882-
elog(PANIC, "Node %d tries to recover node %d which is in %s mode", MtmReplicationNodeId, MtmNodeId, MtmNodeStatusMnem[Mtm->status]);
906+
elog(PANIC, "Node %d tries to recover node %d which is in %s mode", gtid->node, MtmNodeId, MtmNodeStatusMnem[Mtm->status]);
883907
}
884908
} else if (Mtm->status == MTM_RECOVERY) {
885909
/* When recovery is completed we get normal transaction ID and switch to normal mode */
886910
MtmRecoveryCompleted();
887911
}
888-
MtmTx.gtid = *gtid;
889-
MtmTx.xid = GetCurrentTransactionId();
890-
MtmTx.snapshot = globalSnapshot;
891-
MtmTx.isReplicated = true;
892-
MtmTx.isDistributed = true;
893-
MtmTx.containsDML = true;
894912
}
895913

896914
void MtmSetCurrentTransactionGID(char const* gid)

contrib/mmts/t/001_basic_recovery.pl

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,15 @@ sub PostgresNode::inet_connstr {
3131
my @nodes = ();
3232
my $pgconf_common = qq(
3333
listen_addresses = '127.0.0.1'
34-
max_prepared_transactions = 10
35-
max_worker_processes = 10
34+
max_prepared_transactions = 200
35+
max_connections = 200
36+
max_worker_processes = 100
3637
max_wal_senders = 10
3738
max_replication_slots = 10
3839
wal_level = logical
40+
wal_sender_timeout = 0
3941
shared_preload_libraries = 'raftable,multimaster'
40-
multimaster.workers=4
42+
multimaster.workers=10
4143
multimaster.queue_size=10485760 # 10mb
4244
);
4345

contrib/mmts/t/002_dtmbench.pl

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,15 +60,16 @@ sub allocate_ports
6060
listen_addresses = '$host'
6161
unix_socket_directories = ''
6262
port = $pgport
63-
max_prepared_transactions = 1000
64-
max_worker_processes = 10
63+
max_prepared_transactions = 200
64+
max_connections = 200
65+
max_worker_processes = 100
6566
wal_level = logical
6667
fsync = off
6768
max_wal_senders = 10
6869
wal_sender_timeout = 0
6970
max_replication_slots = 10
7071
shared_preload_libraries = 'raftable,multimaster'
71-
multimaster.workers = 4
72+
multimaster.workers = 10
7273
multimaster.queue_size = 10485760 # 10mb
7374
multimaster.node_id = $id
7475
multimaster.conn_strings = '$mm_connstr'

contrib/mmts/t/003_pgbench.pl

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,15 +62,16 @@ sub allocate_ports
6262
listen_addresses = '$host'
6363
unix_socket_directories = ''
6464
port = $pgport
65-
max_prepared_transactions = 1000
66-
max_worker_processes = 10
65+
max_prepared_transactions = 200
66+
max_connections = 200
67+
max_worker_processes = 100
6768
wal_level = logical
6869
fsync = off
6970
max_wal_senders = 10
7071
wal_sender_timeout = 0
7172
max_replication_slots = 10
7273
shared_preload_libraries = 'raftable,multimaster'
73-
multimaster.workers = 4
74+
multimaster.workers = 10
7475
multimaster.queue_size = 10485760 # 10mb
7576
multimaster.node_id = $id
7677
multimaster.conn_strings = '$mm_connstr'

0 commit comments

Comments
 (0)