Skip to content

Commit b8c3075

Browse files
committed
Fix bug in flushed_lsn reporting
1 parent c1f3c26 commit b8c3075

File tree

6 files changed

+80
-58
lines changed

6 files changed

+80
-58
lines changed

contrib/mmts/multimaster--1.0.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ AS 'MODULE_PATHNAME','mtm_get_nodes_state'
3535
LANGUAGE C;
3636

3737
CREATE TYPE mtm.cluster_state AS ("status" text, "disabledNodeMask" bigint, "disconnectedNodeMask" bigint, "catchUpNodeMask" bigint, "liveNodes" integer, "allNodes" integer, "nActiveQueries" integer, "nPendingQueries" integer, "queueSize" bigint, "transCount" bigint, "timeShift" bigint, "recoverySlot" integer,
38-
"xidHashSize" bigint, "gidHashSize" bigint, "oldestSnapshot" bigint, "configChanges" integer);
38+
"xidHashSize" bigint, "gidHashSize" bigint, "oldestXid" integer, "configChanges" integer);
3939

4040
CREATE FUNCTION mtm.get_cluster_state() RETURNS mtm.cluster_state
4141
AS 'MODULE_PATHNAME','mtm_get_cluster_state'

contrib/mmts/multimaster.c

Lines changed: 73 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ static int MtmMinRecoveryLag;
200200
static int MtmMaxRecoveryLag;
201201
static int Mtm2PCPrepareRatio;
202202
static int Mtm2PCMinTimeout;
203+
static int MtmGcPeriod;
203204
static bool MtmIgnoreTablesWithoutPk;
204205

205206
static ExecutorFinish_hook_type PreviousExecutorFinishHook;
@@ -342,16 +343,20 @@ csn_t MtmTransactionSnapshot(TransactionId xid)
342343
Snapshot MtmGetSnapshot(Snapshot snapshot)
343344
{
344345
snapshot = PgGetSnapshotData(snapshot);
345-
RecentGlobalDataXmin = RecentGlobalXmin = Mtm->oldestXid;//MtmAdjustOldestXid(RecentGlobalDataXmin);
346+
RecentGlobalDataXmin = RecentGlobalXmin = Mtm->oldestXid;
346347
return snapshot;
347348
}
348349

349350

350351
TransactionId MtmGetOldestXmin(Relation rel, bool ignoreVacuum)
351352
{
352353
TransactionId xmin = PgGetOldestXmin(NULL, false); /* consider all backends */
353-
xmin = MtmAdjustOldestXid(xmin);
354-
return xmin;
354+
if (TransactionIdIsValid(xmin)) {
355+
MtmLock(LW_EXCLUSIVE);
356+
xmin = MtmAdjustOldestXid(xmin);
357+
MtmUnlock();
358+
}
359+
return xmin;
355360
}
356361

357362
bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
@@ -446,53 +451,50 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
446451
static TransactionId
447452
MtmAdjustOldestXid(TransactionId xid)
448453
{
449-
if (TransactionIdIsValid(xid)) {
450-
MtmTransState *ts, *prev = NULL;
451-
int i;
452-
453-
MtmLock(LW_EXCLUSIVE);
454-
ts = (MtmTransState*)hash_search(MtmXid2State, &xid, HASH_FIND, NULL);
455-
if (ts != NULL) {
456-
csn_t oldestSnapshot = ts->snapshot;
457-
Mtm->nodes[MtmNodeId-1].oldestSnapshot = oldestSnapshot;
458-
for (i = 0; i < Mtm->nAllNodes; i++) {
459-
if (!BIT_CHECK(Mtm->disabledNodeMask, i)
460-
&& Mtm->nodes[i].oldestSnapshot < oldestSnapshot)
461-
{
462-
oldestSnapshot = Mtm->nodes[i].oldestSnapshot;
463-
}
464-
}
465-
oldestSnapshot -= MtmVacuumDelay*USECS_PER_SEC;
466-
467-
for (ts = Mtm->transListHead;
468-
ts != NULL
469-
&& ts->csn < oldestSnapshot
470-
&& TransactionIdPrecedes(ts->xid, xid)
471-
&& (ts->status == TRANSACTION_STATUS_COMMITTED ||
472-
ts->status == TRANSACTION_STATUS_ABORTED);
473-
prev = ts, ts = ts->next)
454+
int i;
455+
MtmTransState *prev = NULL;
456+
MtmTransState *ts = (MtmTransState*)hash_search(MtmXid2State, &xid, HASH_FIND, NULL);
457+
MTM_LOG1("%d: MtmAdjustOldestXid(%d): snapshot=%ld, csn=%ld, status=%d", MyProcPid, xid, ts != NULL ? ts->snapshot : 0, ts != NULL ? ts->csn : 0, ts != NULL ? ts->status : -1);
458+
Mtm->gcCount = 0;
459+
if (ts != NULL) {
460+
csn_t oldestSnapshot = ts->snapshot;
461+
Mtm->nodes[MtmNodeId-1].oldestSnapshot = oldestSnapshot;
462+
for (i = 0; i < Mtm->nAllNodes; i++) {
463+
if (!BIT_CHECK(Mtm->disabledNodeMask, i)
464+
&& Mtm->nodes[i].oldestSnapshot < oldestSnapshot)
474465
{
475-
if (prev != NULL) {
476-
/* Remove information about too old transactions */
477-
hash_search(MtmXid2State, &prev->xid, HASH_REMOVE, NULL);
478-
}
466+
oldestSnapshot = Mtm->nodes[i].oldestSnapshot;
479467
}
480-
}
481-
if (MtmUseDtm)
468+
}
469+
oldestSnapshot -= MtmVacuumDelay*USECS_PER_SEC;
470+
471+
for (ts = Mtm->transListHead;
472+
ts != NULL
473+
&& ts->csn < oldestSnapshot
474+
&& TransactionIdPrecedes(ts->xid, xid)
475+
&& (ts->status == TRANSACTION_STATUS_COMMITTED ||
476+
ts->status == TRANSACTION_STATUS_ABORTED);
477+
prev = ts, ts = ts->next)
482478
{
483479
if (prev != NULL) {
484-
Mtm->transListHead = prev;
485-
Mtm->oldestXid = xid = prev->xid;
486-
} else if (TransactionIdPrecedes(Mtm->oldestXid, xid)) {
487-
xid = Mtm->oldestXid;
488-
}
489-
} else {
490-
if (prev != NULL) {
491-
Mtm->transListHead = prev;
480+
/* Remove information about too old transactions */
481+
hash_search(MtmXid2State, &prev->xid, HASH_REMOVE, NULL);
492482
}
493483
}
494-
MtmUnlock();
495-
}
484+
}
485+
if (MtmUseDtm)
486+
{
487+
if (prev != NULL) {
488+
Mtm->transListHead = prev;
489+
Mtm->oldestXid = xid = prev->xid;
490+
} else if (TransactionIdPrecedes(Mtm->oldestXid, xid)) {
491+
xid = Mtm->oldestXid;
492+
}
493+
} else {
494+
if (prev != NULL) {
495+
Mtm->transListHead = prev;
496+
}
497+
}
496498
return xid;
497499
}
498500
/*
@@ -614,7 +616,12 @@ static void
614616
MtmBeginTransaction(MtmCurrentTrans* x)
615617
{
616618
if (x->snapshot == INVALID_CSN) {
617-
MtmLock(LW_EXCLUSIVE);
619+
TransactionId xmin = (Mtm->gcCount >= MtmGcPeriod) ? PgGetOldestXmin(NULL, false) : InvalidTransactionId; /* Get oldest xmin outside critical section */
620+
621+
MtmLock(LW_EXCLUSIVE);
622+
if (TransactionIdIsValid(xmin) && Mtm->gcCount >= MtmGcPeriod) {
623+
MtmAdjustOldestXid(xmin);
624+
}
618625
x->xid = GetCurrentTransactionIdIfAny();
619626
x->isReplicated = false;
620627
x->isDistributed = MtmIsUserTransaction();
@@ -690,7 +697,6 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
690697
}
691698

692699
MtmLock(LW_EXCLUSIVE);
693-
694700
/*
695701
* Check if there is global multimaster lock preventing new transaction from commit to make a chance to wal-senders to catch-up.
696702
* Only "own" transactions are blacked. Transactions replicated from other nodes (including recovered transaction) should be proceeded
@@ -716,8 +722,10 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
716722

717723
x->isPrepared = true;
718724
x->csn = ts->csn;
719-
725+
720726
Mtm->transCount += 1;
727+
Mtm->gcCount += 1;
728+
721729
MtmTransactionListAppend(ts);
722730
MtmAddSubtransactions(ts, subxids, ts->nSubxids);
723731
MTM_LOG3("%d: MtmPrePrepareTransaction prepare commit of %d (gtid.xid=%d, gtid.node=%d, CSN=%ld)",
@@ -1466,8 +1474,9 @@ static void MtmInitialize()
14661474
Mtm->transListHead = NULL;
14671475
Mtm->transListTail = &Mtm->transListHead;
14681476
Mtm->nReceivers = 0;
1469-
Mtm->timeShift = 0;
1477+
Mtm->timeShift = 0;
14701478
Mtm->transCount = 0;
1479+
Mtm->gcCount = 0;
14711480
Mtm->nConfigChanges = 0;
14721481
Mtm->localTablesHashLoaded = false;
14731482
for (i = 0; i < MtmNodes; i++) {
@@ -1600,6 +1609,21 @@ _PG_init(void)
16001609
if (!process_shared_preload_libraries_in_progress)
16011610
return;
16021611

1612+
DefineCustomIntVariable(
1613+
"multimaster.gc_period",
1614+
"Number of distributed transactions after which garbage collection is started",
1615+
"Multimaster is building xid->csn hash map which has to be cleaned to avoid hash overflow. This parameter specifies interval of invoking garbage collector for this map",
1616+
&MtmGcPeriod,
1617+
MTM_HASH_SIZE/10,
1618+
1,
1619+
INT_MAX,
1620+
PGC_BACKEND,
1621+
0,
1622+
NULL,
1623+
NULL,
1624+
NULL
1625+
);
1626+
16031627
DefineCustomIntVariable(
16041628
"multimaster.max_nodes",
16051629
"Maximal number of cluster nodes",
@@ -2339,7 +2363,7 @@ mtm_get_cluster_state(PG_FUNCTION_ARGS)
23392363
values[11] = Int32GetDatum(Mtm->recoverySlot);
23402364
values[12] = Int64GetDatum(hash_get_num_entries(MtmXid2State));
23412365
values[13] = Int64GetDatum(hash_get_num_entries(MtmGid2State));
2342-
values[14] = Int64GetDatum(Mtm->oldestSnapshot);
2366+
values[14] = Int32GetDatum(Mtm->oldestXid);
23432367
values[15] = Int32GetDatum(Mtm->nConfigChanges);
23442368

23452369
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(desc, values, nulls)));

contrib/mmts/multimaster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ typedef struct
180180
MtmTransState** transListTail; /* Tail of L1 list of all finished transactionds, used to append new elements.
181181
This list is expected to be in CSN ascending order, by strict order may be violated */
182182
uint64 transCount; /* Counter of transactions perfromed by this node */
183+
uint64 gcCount; /* Number of global transactions performed since last GC */
183184
BgwPool pool; /* Pool of background workers for applying logical replication patches */
184185
MtmNodeInfo nodes[1]; /* [Mtm->nAllNodes]: per-node data */
185186
} MtmState;

contrib/mmts/pglogical_apply.c

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,6 @@ static void process_remote_insert(StringInfo s, Relation rel);
7575
static void process_remote_update(StringInfo s, Relation rel);
7676
static void process_remote_delete(StringInfo s, Relation rel);
7777

78-
static int MtmReplicationNode;
79-
8078
/*
8179
* Search the index 'idxrel' for a tuple identified by 'skey' in 'rel'.
8280
*
@@ -481,8 +479,8 @@ static void
481479
MtmBeginSession(void)
482480
{
483481
char slot_name[MULTIMASTER_MAX_SLOT_NAME_SIZE];
484-
MtmLockNode(MtmReplicationNode);
485-
sprintf(slot_name, MULTIMASTER_SLOT_PATTERN, MtmReplicationNode);
482+
MtmLockNode(MtmReplicationNodeId);
483+
sprintf(slot_name, MULTIMASTER_SLOT_PATTERN, MtmReplicationNodeId);
486484
Assert(replorigin_session_origin == InvalidRepOriginId);
487485
replorigin_session_origin = replorigin_by_name(slot_name, false);
488486
MTM_LOG3("%d: Begin setup replorigin session: %d", MyProcPid, replorigin_session_origin);
@@ -498,7 +496,7 @@ MtmEndSession(bool unlock)
498496
replorigin_session_origin = InvalidRepOriginId;
499497
replorigin_session_reset();
500498
if (unlock) {
501-
MtmUnlockNode(MtmReplicationNode);
499+
MtmUnlockNode(MtmReplicationNodeId);
502500
}
503501
MTM_LOG3("%d: End reset replorigin session: %d", MyProcPid, replorigin_session_origin);
504502
}
@@ -513,7 +511,7 @@ process_remote_commit(StringInfo in)
513511
XLogRecPtr end_lsn;
514512
/* read flags */
515513
flags = pq_getmsgbyte(in);
516-
MtmReplicationNode = pq_getmsgbyte(in);
514+
MtmReplicationNodeId = pq_getmsgbyte(in);
517515

518516
/* read fields */
519517
replorigin_session_origin_lsn = pq_getmsgint64(in); /* commit_lsn */

contrib/mmts/pglogical_receiver.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ static volatile sig_atomic_t got_sighup = false;
4747

4848
/* GUC variables */
4949
static int receiver_idle_time = 0;
50-
static bool receiver_sync_mode = false;
50+
static bool receiver_sync_mode = true; /* We need sync mode to have up-to-date values of catalog_xmin in replication slots */
5151

5252
/* Worker name */
5353
static char worker_proc[BGW_MAXLEN];

contrib/mmts/tests/dtmacid.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,6 @@ int main (int argc, char* argv[])
285285
for (int i = 0; i < cfg.nReaders; i++) {
286286
readers[i].wait();
287287
nSelects += readers[i].selects;
288-
nTransactions += writers[i].transactions;
289288
}
290289

291290
time_t elapsed = getCurrentTime() - start;

0 commit comments

Comments
 (0)