Skip to content

Commit ea75de1

Browse files
committed
Prevent recusive broadcast by setting special applicastion name
1 parent 810ca68 commit ea75de1

File tree

3 files changed

+14
-19
lines changed

3 files changed

+14
-19
lines changed

contrib/mmts/multimaster.c

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -800,7 +800,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
800800
* Send notification only if ABORT happens during transaction processing at replicas,
801801
* do not send notification if ABORT is receiver from master
802802
*/
803-
MTM_TRACE("%d: send ABORT notification to coordinator %d\n", MyProcPid, x->gtid.node);
803+
MTM_INFO("%d: send ABORT notification abort transaction %d to coordinator %d\n", MyProcPid, x->gtid.xid, x->gtid.node);
804804
if (ts == NULL) {
805805
Assert(TransactionIdIsValid(x->xid));
806806
ts = hash_search(MtmXid2State, &x->xid, HASH_ENTER, NULL);
@@ -1604,6 +1604,11 @@ MtmSlotMode MtmReceiverSlotMode(int nodeId)
16041604
return Mtm->recoverySlot ? SLOT_CREATE_NEW : SLOT_OPEN_ALWAYS;
16051605
}
16061606

1607+
static bool MtmIsBroadcast()
1608+
{
1609+
return application_name != NULL && strcmp(application_name, MULTIMASTER_BROADCAST_SERVICE) == 0;
1610+
}
1611+
16071612
void MtmRecoverNode(int nodeId)
16081613
{
16091614
if (nodeId <= 0 || nodeId > Mtm->nNodes)
@@ -1613,7 +1618,7 @@ void MtmRecoverNode(int nodeId)
16131618
if (!BIT_CHECK(Mtm->disabledNodeMask, nodeId-1)) {
16141619
elog(ERROR, "Node %d was not disabled", nodeId);
16151620
}
1616-
if (!IsTransactionBlock())
1621+
if (!MtmIsBroadcast())
16171622
{
16181623
MtmBroadcastUtilityStmt(psprintf("select pg_create_logical_replication_slot('" MULTIMASTER_SLOT_PATTERN "', '" MULTIMASTER_NAME "')", nodeId), true);
16191624
}
@@ -1630,7 +1635,7 @@ void MtmDropNode(int nodeId, bool dropSlot)
16301635
}
16311636
BIT_SET(Mtm->disabledNodeMask, nodeId-1);
16321637
Mtm->nNodes -= 1;
1633-
if (!IsTransactionBlock())
1638+
if (!MtmIsBroadcast())
16341639
{
16351640
MtmBroadcastUtilityStmt(psprintf("select mtm.drop_node(%d,%s)", nodeId, dropSlot ? "true" : "false"), true);
16361641
}
@@ -1650,7 +1655,6 @@ MtmReplicationShutdownHook(struct PGLogicalShutdownHookArgs* args)
16501655
static bool
16511656
MtmReplicationTxnFilterHook(struct PGLogicalTxnFilterArgs* args)
16521657
{
1653-
elog(WARNING, "MtmReplicationTxnFilterHook: args->origin_id=%d, MtmReplicationNodeId=%d", args->origin_id, MtmReplicationNodeId);
16541658
return args->origin_id == InvalidRepOriginId || MtmIsRecoveredNode(MtmReplicationNodeId);
16551659
}
16561660

@@ -1797,7 +1801,6 @@ static bool MtmRunUtilityStmt(PGconn* conn, char const* sql, char **errmsg)
17971801
{
17981802
PGresult *result = PQexec(conn, sql);
17991803
int status = PQresultStatus(result);
1800-
char *errstr;
18011804

18021805
bool ret = status == PGRES_COMMAND_OK || status == PGRES_TUPLES_OK;
18031806

@@ -1817,25 +1820,18 @@ static bool MtmRunUtilityStmt(PGconn* conn, char const* sql, char **errmsg)
18171820

18181821
static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
18191822
{
1820-
char* conn_str = pstrdup(MtmConnStrs);
1821-
char* conn_str_end = conn_str + strlen(conn_str);
18221823
int i = 0;
18231824
nodemask_t disabledNodeMask = Mtm->disabledNodeMask;
18241825
int failedNode = -1;
18251826
char const* errorMsg = NULL;
18261827
PGconn **conns = palloc0(sizeof(PGconn*)*MtmNodes);
18271828
char* utility_errmsg;
18281829

1829-
while (conn_str < conn_str_end)
1830+
for (i = 0; i < MtmNodes; i++)
18301831
{
1831-
char* p = strchr(conn_str, ',');
1832-
if (p == NULL) {
1833-
p = conn_str_end;
1834-
}
1835-
*p = '\0';
18361832
if (!BIT_CHECK(disabledNodeMask, i))
18371833
{
1838-
conns[i] = PQconnectdb(conn_str);
1834+
conns[i] = PQconnectdb(psprintf("%s application_name=%s", Mtm->nodes[i].con.connStr, MULTIMASTER_BROADCAST_SERVICE));
18391835
if (PQstatus(conns[i]) != CONNECTION_OK)
18401836
{
18411837
if (ignoreError)
@@ -1847,12 +1843,10 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
18471843
do {
18481844
PQfinish(conns[i]);
18491845
} while (--i >= 0);
1850-
elog(ERROR, "Failed to establish connection '%s' to node %d", conn_str, failedNode);
1846+
elog(ERROR, "Failed to establish connection '%s' to node %d", Mtm->nodes[i].con.connStr, failedNode);
18511847
}
18521848
}
18531849
}
1854-
conn_str = p + 1;
1855-
i += 1;
18561850
}
18571851
Assert(i == MtmNodes);
18581852

contrib/mmts/multimaster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#define MULTIMASTER_MAX_SLOT_NAME_SIZE 16
2727
#define MULTIMASTER_MAX_CONN_STR_SIZE 128
2828
#define MULTIMASTER_MAX_HOST_NAME_SIZE 64
29+
#define MULTIMASTER_BROADCAST_SERVICE "mtm_broadcast"
2930

3031
#define USEC 1000000
3132

contrib/mmts/pglogical_apply.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -936,10 +936,10 @@ void MtmExecutor(int id, void* work, size_t size)
936936
{
937937
EmitErrorReport();
938938
FlushErrorState();
939-
MTM_TRACE("%d: REMOTE begin abort transaction %d\n", MyProcPid, MtmGetCurrentTransactionId());
939+
MTM_INFO("%d: REMOTE begin abort transaction %d\n", MyProcPid, MtmGetCurrentTransactionId());
940940
MtmEndSession(false);
941941
AbortCurrentTransaction();
942-
MTM_TRACE("%d: REMOTE end abort transaction %d\n", MyProcPid, MtmGetCurrentTransactionId());
942+
MTM_INFO("%d: REMOTE end abort transaction %d\n", MyProcPid, MtmGetCurrentTransactionId());
943943
}
944944
PG_END_TRY();
945945

0 commit comments

Comments
 (0)