Skip to content

Commit 5c6c0eb

Browse files
committed
Merge branch 'master' of github.com:postgrespro/postgres_cluster
2 parents 700ffed + a380fe6 commit 5c6c0eb

File tree

5 files changed

+22
-11
lines changed

5 files changed

+22
-11
lines changed

contrib/mmts/arbiter.c

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,7 @@ static void MtmOpenConnections()
386386
static bool MtmSendToNode(int node, void const* buf, int size)
387387
{
388388
while (sockets[node] < 0 || !MtmWriteSocket(sockets[node], buf, size)) {
389-
elog(WARNING, "Arbiter failed to write socket: %d", errno);
389+
elog(WARNING, "Arbiter failed to write to node %d: %d", node+1, errno);
390390
if (sockets[node] >= 0) {
391391
close(sockets[node]);
392392
}
@@ -395,6 +395,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
395395
MtmOnNodeDisconnect(node+1);
396396
return false;
397397
}
398+
elog(NOTICE, "Arbiter restablish connection with node %d", node+1);
398399
}
399400
return true;
400401
}
@@ -688,6 +689,8 @@ static void MtmTransReceiver(Datum arg)
688689
if ((~msg->disabledNodeMask & Mtm->disabledNodeMask) != 0) {
689690
/* Coordinator's disabled mask is wider than of this node: so reject such transaction to avoid
690691
commit on smaller subset of nodes */
692+
elog(WARNING, "Coordinator of distributed transaction see less nodes than node %d: %lx instead of %lx",
693+
msg->node, Mtm->disabledNodeMask, msg->disabledNodeMask);
691694
ts->status = TRANSACTION_STATUS_ABORTED;
692695
MtmAdjustSubtransactions(ts);
693696
}

contrib/mmts/multimaster.c

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -637,7 +637,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
637637
x->isPrepared = false;
638638
x->isTransactionBlock = IsTransactionBlock();
639639
/* Application name can be cahnged usnig PGAPPNAME environment variable */
640-
if (x->isDistributed && Mtm->status != MTM_ONLINE && strcmp(application_name, MULTIMASTER_ADMIN) != 0) {
640+
if (!IsBackgroundWorker && x->isDistributed && Mtm->status != MTM_ONLINE && strcmp(application_name, MULTIMASTER_ADMIN) != 0) {
641641
/* reject all user's transactions at offline cluster */
642642
MtmUnlock();
643643
elog(ERROR, "Multimaster node is not online: current status %s", MtmNodeStatusMnem[Mtm->status]);
@@ -673,7 +673,7 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
673673

674674
if (Mtm->disabledNodeMask != 0) {
675675
MtmRefreshClusterStatus(true);
676-
if (Mtm->status != MTM_ONLINE) {
676+
if (!IsBackgroundWorker && Mtm->status != MTM_ONLINE) {
677677
elog(ERROR, "Abort current transaction because this cluster node is not online");
678678
}
679679
}
@@ -683,7 +683,9 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
683683
/*
684684
* Check if there is global multimaster lock preventing new transaction from commit to make a chance to wal-senders to catch-up
685685
*/
686-
MtmCheckClusterLock();
686+
if (!x->isReplicated) {
687+
MtmCheckClusterLock();
688+
}
687689

688690
ts = hash_search(MtmXid2State, &x->xid, HASH_ENTER, NULL);
689691
ts->status = TRANSACTION_STATUS_IN_PROGRESS;
@@ -998,16 +1000,15 @@ bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN)
9981000
if (slotLSN == walLSN) {
9991001
if (BIT_CHECK(Mtm->nodeLockerMask, nodeId-1)) {
10001002
elog(WARNING,"Node %d is caught-up", nodeId);
1001-
BIT_CLEAR(Mtm->disabledNodeMask, nodeId-1);
10021003
BIT_CLEAR(Mtm->walSenderLockerMask, MyWalSnd - WalSndCtl->walsnds);
10031004
BIT_CLEAR(Mtm->nodeLockerMask, nodeId-1);
10041005
Mtm->nLockers -= 1;
10051006
} else {
10061007
elog(WARNING,"Node %d is caugth-up without locking cluster", nodeId);
10071008
/* We are lucky: caugth-up without locking cluster! */
1008-
Mtm->nNodes += 1;
1009-
BIT_CLEAR(Mtm->disabledNodeMask, nodeId-1);
10101009
}
1010+
BIT_CLEAR(Mtm->disabledNodeMask, nodeId-1);
1011+
Mtm->nNodes += 1;
10111012
caughtUp = true;
10121013
} else if (!BIT_CHECK(Mtm->nodeLockerMask, nodeId-1)
10131014
&& slotLSN + MtmMinRecoveryLag > walLSN)
@@ -2250,7 +2251,12 @@ MtmExecutorFinish(QueryDesc *queryDesc)
22502251

22512252
void MtmExecute(void* work, int size)
22522253
{
2253-
BgwPoolExecute(&Mtm->pool, work, size);
2254+
if (Mtm->status == MTM_RECOVERY) {
2255+
/* During recovery apply changes sequentially to preserve commit order */
2256+
MtmExecutor(0, work, size);
2257+
} else {
2258+
BgwPoolExecute(&Mtm->pool, work, size);
2259+
}
22542260
}
22552261

22562262
static BgwPool*

contrib/mmts/multimaster.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
#include "pglogical_output/hooks.h"
99

1010
#define MTM_TUPLE_TRACE(fmt, ...)
11-
#if 0
11+
#if 1
1212
#define MTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
1313
#define MTM_TRACE(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
1414
#else

contrib/mmts/pglogical_proto.c

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,6 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
147147
return;
148148
}
149149
}
150-
151150
pq_sendbyte(out, 'C'); /* sending COMMIT */
152151

153152
MTM_INFO("PGLOGICAL_SEND commit: event=%d, gid=%s, commit_lsn=%lx, txn->end_lsn=%lx, xlog=%lx\n", flags, txn->gid, commit_lsn, txn->end_lsn, GetXLogInsertRecPtr());

src/backend/replication/logical/decode.c

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -488,7 +488,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
488488
{
489489
XLogRecPtr origin_lsn = InvalidXLogRecPtr;
490490
TimestampTz commit_time = parsed->xact_time;
491-
XLogRecPtr origin_id = XLogRecGetOrigin(buf->record);
491+
RepOriginId origin_id = XLogRecGetOrigin(buf->record);
492492
int i;
493493

494494
if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
@@ -541,6 +541,9 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
541541
(parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) ||
542542
FilterByOrigin(ctx, origin_id))
543543
{
544+
elog(WARNING, "%d: WAL-SENDER ignore record %lx with origin %d: SnapBuildXactNeedsSkip=%d, FilterByOrigin=%d",
545+
getpid(), buf->origptr, origin_id,
546+
SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr), FilterByOrigin(ctx, origin_id));
544547
for (i = 0; i < parsed->nsubxacts; i++)
545548
{
546549
ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr);

0 commit comments

Comments
 (0)