Skip to content

Commit b0e6e08

Browse files
committed
Rename the logical replication global "wrconn"
The worker.c global wrconn is only meant to be used by logical apply/ tablesync workers, but there are other variables with the same name. To reduce future confusion rename the global from "wrconn" to "LogRepWorkerWalRcvConn". While this is just cosmetic, it seems better to backpatch it all the way back to 10 where this code appeared, to avoid future backpatching issues. Author: Peter Smith <smithpb2250@gmail.com> Discussion: https://postgr.es/m/CAHut+Pu7Jv9L2BOEx_Z0UtJxfDevQSAUW2mJqWU+CtmDrEZVAg@mail.gmail.com
1 parent fae98e3 commit b0e6e08

File tree

4 files changed

+27
-24
lines changed

4 files changed

+27
-24
lines changed

src/backend/replication/logical/launcher.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -727,8 +727,8 @@ static void
727727
logicalrep_worker_onexit(int code, Datum arg)
728728
{
729729
/* Disconnect gracefully from the remote side. */
730-
if (wrconn)
731-
walrcv_disconnect(wrconn);
730+
if (LogRepWorkerWalRcvConn)
731+
walrcv_disconnect(LogRepWorkerWalRcvConn);
732732

733733
logicalrep_worker_detach();
734734

src/backend/replication/logical/tablesync.c

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
303303
MyLogicalRepWorker->relstate,
304304
MyLogicalRepWorker->relstate_lsn);
305305

306-
walrcv_endstreaming(wrconn, &tli);
306+
walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
307307
finish_sync_worker();
308308
}
309309
else
@@ -600,7 +600,7 @@ copy_read_data(void *outbuf, int minread, int maxread)
600600
for (;;)
601601
{
602602
/* Try read the data. */
603-
len = walrcv_receive(wrconn, &buf, &fd);
603+
len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
604604

605605
CHECK_FOR_INTERRUPTS();
606606

@@ -678,7 +678,8 @@ fetch_remote_table_info(char *nspname, char *relname,
678678
" AND c.relkind = 'r'",
679679
quote_literal_cstr(nspname),
680680
quote_literal_cstr(relname));
681-
res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
681+
res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
682+
lengthof(tableRow), tableRow);
682683

683684
if (res->status != WALRCV_OK_TUPLES)
684685
ereport(ERROR,
@@ -714,7 +715,8 @@ fetch_remote_table_info(char *nspname, char *relname,
714715
" AND a.attrelid = %u"
715716
" ORDER BY a.attnum",
716717
lrel->remoteid, lrel->remoteid);
717-
res = walrcv_exec(wrconn, cmd.data, 4, attrRow);
718+
res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
719+
lengthof(attrRow), attrRow);
718720

719721
if (res->status != WALRCV_OK_TUPLES)
720722
ereport(ERROR,
@@ -784,7 +786,7 @@ copy_table(Relation rel)
784786
initStringInfo(&cmd);
785787
appendStringInfo(&cmd, "COPY %s TO STDOUT",
786788
quote_qualified_identifier(lrel.nspname, lrel.relname));
787-
res = walrcv_exec(wrconn, cmd.data, 0, NULL);
789+
res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
788790
pfree(cmd.data);
789791
if (res->status != WALRCV_OK_COPY_OUT)
790792
ereport(ERROR,
@@ -850,8 +852,9 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
850852
* application_name, so that it is different from the main apply worker,
851853
* so that synchronous replication can distinguish them.
852854
*/
853-
wrconn = walrcv_connect(MySubscription->conninfo, true, slotname, &err);
854-
if (wrconn == NULL)
855+
LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
856+
slotname, &err);
857+
if (LogRepWorkerWalRcvConn == NULL)
855858
ereport(ERROR,
856859
(errmsg("could not connect to the publisher: %s", err)));
857860

@@ -896,7 +899,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
896899
* inside the transaction so that we can use the snapshot made
897900
* by the slot to get existing data.
898901
*/
899-
res = walrcv_exec(wrconn,
902+
res = walrcv_exec(LogRepWorkerWalRcvConn,
900903
"BEGIN READ ONLY ISOLATION LEVEL "
901904
"REPEATABLE READ", 0, NULL);
902905
if (res->status != WALRCV_OK_COMMAND)
@@ -913,14 +916,14 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
913916
* that is consistent with the lsn used by the slot to start
914917
* decoding.
915918
*/
916-
walrcv_create_slot(wrconn, slotname, true,
919+
walrcv_create_slot(LogRepWorkerWalRcvConn, slotname, true,
917920
CRS_USE_SNAPSHOT, origin_startpos);
918921

919922
PushActiveSnapshot(GetTransactionSnapshot());
920923
copy_table(rel);
921924
PopActiveSnapshot();
922925

923-
res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
926+
res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
924927
if (res->status != WALRCV_OK_COMMAND)
925928
ereport(ERROR,
926929
(errmsg("table copy could not finish transaction on publisher"),

src/backend/replication/logical/worker.c

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ typedef struct SlotErrCallbackArg
112112
static MemoryContext ApplyMessageContext = NULL;
113113
MemoryContext ApplyContext = NULL;
114114

115-
WalReceiverConn *wrconn = NULL;
115+
WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
116116

117117
Subscription *MySubscription = NULL;
118118
bool MySubscriptionValid = false;
@@ -1172,7 +1172,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
11721172

11731173
MemoryContextSwitchTo(ApplyMessageContext);
11741174

1175-
len = walrcv_receive(wrconn, &buf, &fd);
1175+
len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
11761176

11771177
if (len != 0)
11781178
{
@@ -1252,7 +1252,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
12521252
MemoryContextReset(ApplyMessageContext);
12531253
}
12541254

1255-
len = walrcv_receive(wrconn, &buf, &fd);
1255+
len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
12561256
}
12571257
}
12581258

@@ -1282,7 +1282,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
12821282
{
12831283
TimeLineID tli;
12841284

1285-
walrcv_endstreaming(wrconn, &tli);
1285+
walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
12861286
break;
12871287
}
12881288

@@ -1449,7 +1449,8 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
14491449
(uint32) (flushpos >> 32), (uint32) flushpos
14501450
);
14511451

1452-
walrcv_send(wrconn, reply_message->data, reply_message->len);
1452+
walrcv_send(LogRepWorkerWalRcvConn,
1453+
reply_message->data, reply_message->len);
14531454

14541455
if (recvpos > last_recvpos)
14551456
last_recvpos = recvpos;
@@ -1761,19 +1762,18 @@ ApplyWorkerMain(Datum main_arg)
17611762
origin_startpos = replorigin_session_get_progress(false);
17621763
CommitTransactionCommand();
17631764

1764-
wrconn = walrcv_connect(MySubscription->conninfo, true, MySubscription->name,
1765-
&err);
1766-
if (wrconn == NULL)
1765+
LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
1766+
MySubscription->name, &err);
1767+
if (LogRepWorkerWalRcvConn == NULL)
17671768
ereport(ERROR,
17681769
(errmsg("could not connect to the publisher: %s", err)));
17691770

17701771
/*
17711772
* We don't really use the output identify_system for anything but it
17721773
* does some initializations on the upstream so let's still call it.
17731774
*/
1774-
(void) walrcv_identify_system(wrconn, &startpointTLI,
1775+
(void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI,
17751776
&server_version);
1776-
17771777
}
17781778

17791779
/*
@@ -1792,7 +1792,7 @@ ApplyWorkerMain(Datum main_arg)
17921792
options.proto.logical.publication_names = MySubscription->publications;
17931793

17941794
/* Start normal logical streaming replication. */
1795-
walrcv_startstreaming(wrconn, &options);
1795+
walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
17961796

17971797
/* Run the main loop. */
17981798
LogicalRepApplyLoop(origin_startpos);

src/include/replication/worker_internal.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ typedef struct LogicalRepWorker
6060
extern MemoryContext ApplyContext;
6161

6262
/* libpqreceiver connection */
63-
extern struct WalReceiverConn *wrconn;
63+
extern struct WalReceiverConn *LogRepWorkerWalRcvConn;
6464

6565
/* Worker and subscription objects. */
6666
extern Subscription *MySubscription;

0 commit comments

Comments
 (0)