Skip to content

Commit af5de43

Browse files
committed
Avoid hanging on exit in pglogical_receiver
1 parent 2dbdebb commit af5de43

File tree

3 files changed

+15
-8
lines changed

3 files changed

+15
-8
lines changed

contrib/mmts/multimaster.c

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2322,10 +2322,16 @@ void MtmReceiverStarted(int nodeId)
23222322
* Druing recovery we need to open only one replication slot from which node should receive all transactions.
23232323
* Slots at other nodes should be removed
23242324
*/
2325-
MtmReplicationMode MtmGetReplicationMode(int nodeId)
2325+
MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shutdown)
23262326
{
23272327
bool recovery = false;
2328-
if (Mtm->status != MTM_CONNECTED && Mtm->status != MTM_ONLINE) {
2328+
2329+
while (Mtm->status != MTM_CONNECTED && Mtm->status != MTM_ONLINE)
2330+
{
2331+
if (*shutdown)
2332+
{
2333+
return REPLMODE_EXIT;
2334+
}
23292335
MTM_LOG2("%d: receiver slot mode %s", MyProcPid, MtmNodeStatusMnem[Mtm->status]);
23302336
if (Mtm->status == MTM_RECOVERY) {
23312337
recovery = true;
@@ -2342,7 +2348,6 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId)
23422348
}
23432349
/* delay opening of other slots until recovery is completed */
23442350
MtmSleep(STATUS_POLL_DELAY);
2345-
return REPLMODE_UNKNOWN;
23462351
}
23472352
if (recovery) {
23482353
MTM_LOG1("Recreate replication slot for node %d after end of recovery", nodeId);

contrib/mmts/multimaster.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ typedef enum
117117

118118
typedef enum
119119
{
120-
REPLMODE_UNKNOWN, /* receiver should wait */
120+
REPLMODE_EXIT, /* receiver should exit */
121121
REPLMODE_RECOVERED, /* recovery of node is completed so drop old slot and restart replication from the current position in WAL */
122122
REPLMODE_RECOVERY, /* perform recorvery of the node by applying all data from the slot from specified point */
123123
REPLMODE_NORMAL /* normal mode: use existed slot or create new one and start receiving data from it from the specified position */
@@ -245,7 +245,7 @@ extern csn_t MtmAssignCSN(void);
245245
extern csn_t MtmSyncClock(csn_t csn);
246246
extern void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t snapshot);
247247
extern void MtmReceiverStarted(int nodeId);
248-
extern MtmReplicationMode MtmGetReplicationMode(int nodeId);
248+
extern MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shutdown);
249249
extern void MtmExecute(void* work, int size);
250250
extern void MtmExecutor(int id, void* work, size_t size);
251251
extern void MtmSendNotificationMessage(MtmTransState* ts, MtmMessageCode cmd);

contrib/mmts/pglogical_receiver.c

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ feTimestampDifference(int64 start_time, int64 stop_time,
193193

194194
static char const* const MtmReplicationModeName[] =
195195
{
196+
"exit",
196197
"recovered", /* recovery of node is completed so drop old slot and restart replication from the current position in WAL */
197198
"recovery", /* perform recorvery of the node by applying all data from theslot from specified point */
198199
"normal" /* normal mode: use existed slot or create new one and start receiving data from it from the specified position */
@@ -251,9 +252,10 @@ pglogical_receiver_main(Datum main_arg)
251252
* Druing recovery we need to open only one replication slot from which node should receive all transactions.
252253
* Slots at other nodes should be removed
253254
*/
254-
mode = MtmGetReplicationMode(nodeId);
255-
if (mode == REPLMODE_UNKNOWN) {
256-
continue;
255+
mode = MtmGetReplicationMode(nodeId, &got_sigterm);
256+
if (mode == REPLMODE_EXIT)
257+
{
258+
break;
257259
}
258260
count = Mtm->recoveryCount;
259261

0 commit comments

Comments
 (0)