Skip to content

Commit db16c65

Browse files
committed
Rename the logical replication global "wrconn"
The worker.c global wrconn is only meant to be used by logical apply/ tablesync workers, but there are other variables with the same name. To reduce future confusion rename the global from "wrconn" to "LogRepWorkerWalRcvConn". While this is just cosmetic, it seems better to backpatch it all the way back to 10 where this code appeared, to avoid future backpatching issues. Author: Peter Smith <smithpb2250@gmail.com> Discussion: https://postgr.es/m/CAHut+Pu7Jv9L2BOEx_Z0UtJxfDevQSAUW2mJqWU+CtmDrEZVAg@mail.gmail.com
1 parent 7dde987 commit db16c65

File tree

4 files changed

+36
-28
lines changed

4 files changed

+36
-28
lines changed

src/backend/replication/logical/launcher.c

+2-2
Original file line numberDiff line numberDiff line change
@@ -643,8 +643,8 @@ static void
643643
logicalrep_worker_onexit(int code, Datum arg)
644644
{
645645
/* Disconnect gracefully from the remote side. */
646-
if (wrconn)
647-
walrcv_disconnect(wrconn);
646+
if (LogRepWorkerWalRcvConn)
647+
walrcv_disconnect(LogRepWorkerWalRcvConn);
648648

649649
logicalrep_worker_detach();
650650

src/backend/replication/logical/tablesync.c

+21-14
Original file line numberDiff line numberDiff line change
@@ -302,8 +302,11 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
302302
MyLogicalRepWorker->relstate,
303303
MyLogicalRepWorker->relstate_lsn);
304304

305-
/* End wal streaming so wrconn can be re-used to drop the slot. */
306-
walrcv_endstreaming(wrconn, &tli);
305+
/*
306+
* End streaming so that LogRepWorkerWalRcvConn can be used to drop
307+
* the slot.
308+
*/
309+
walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
307310

308311
/*
309312
* Cleanup the tablesync slot.
@@ -322,7 +325,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
322325
* otherwise, it won't be dropped till the corresponding subscription
323326
* is dropped. So passing missing_ok = false.
324327
*/
325-
ReplicationSlotDropAtPubNode(wrconn, syncslotname, false);
328+
ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
326329

327330
finish_sync_worker();
328331
}
@@ -642,7 +645,7 @@ copy_read_data(void *outbuf, int minread, int maxread)
642645
for (;;)
643646
{
644647
/* Try read the data. */
645-
len = walrcv_receive(wrconn, &buf, &fd);
648+
len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
646649

647650
CHECK_FOR_INTERRUPTS();
648651

@@ -715,7 +718,8 @@ fetch_remote_table_info(char *nspname, char *relname,
715718
" AND c.relname = %s",
716719
quote_literal_cstr(nspname),
717720
quote_literal_cstr(relname));
718-
res = walrcv_exec(wrconn, cmd.data, lengthof(tableRow), tableRow);
721+
res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
722+
lengthof(tableRow), tableRow);
719723

720724
if (res->status != WALRCV_OK_TUPLES)
721725
ereport(ERROR,
@@ -752,9 +756,11 @@ fetch_remote_table_info(char *nspname, char *relname,
752756
" AND a.attrelid = %u"
753757
" ORDER BY a.attnum",
754758
lrel->remoteid,
755-
(walrcv_server_version(wrconn) >= 120000 ? "AND a.attgenerated = ''" : ""),
759+
(walrcv_server_version(LogRepWorkerWalRcvConn) >= 120000 ?
760+
"AND a.attgenerated = ''" : ""),
756761
lrel->remoteid);
757-
res = walrcv_exec(wrconn, cmd.data, lengthof(attrRow), attrRow);
762+
res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
763+
lengthof(attrRow), attrRow);
758764

759765
if (res->status != WALRCV_OK_TUPLES)
760766
ereport(ERROR,
@@ -841,7 +847,7 @@ copy_table(Relation rel)
841847
appendStringInfo(&cmd, " FROM %s) TO STDOUT",
842848
quote_qualified_identifier(lrel.nspname, lrel.relname));
843849
}
844-
res = walrcv_exec(wrconn, cmd.data, 0, NULL);
850+
res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
845851
pfree(cmd.data);
846852
if (res->status != WALRCV_OK_COPY_OUT)
847853
ereport(ERROR,
@@ -957,8 +963,9 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
957963
* application_name, so that it is different from the main apply worker,
958964
* so that synchronous replication can distinguish them.
959965
*/
960-
wrconn = walrcv_connect(MySubscription->conninfo, true, slotname, &err);
961-
if (wrconn == NULL)
966+
LogRepWorkerWalRcvConn =
967+
walrcv_connect(MySubscription->conninfo, true, slotname, &err);
968+
if (LogRepWorkerWalRcvConn == NULL)
962969
ereport(ERROR,
963970
(errmsg("could not connect to the publisher: %s", err)));
964971

@@ -985,7 +992,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
985992
* breakdown then it wouldn't have succeeded so trying it next time
986993
* seems like a better bet.
987994
*/
988-
ReplicationSlotDropAtPubNode(wrconn, slotname, true);
995+
ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, slotname, true);
989996
}
990997
else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
991998
{
@@ -1038,7 +1045,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
10381045
* ensures that both the replication slot we create (see below) and the
10391046
* COPY are consistent with each other.
10401047
*/
1041-
res = walrcv_exec(wrconn,
1048+
res = walrcv_exec(LogRepWorkerWalRcvConn,
10421049
"BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
10431050
0, NULL);
10441051
if (res->status != WALRCV_OK_COMMAND)
@@ -1058,7 +1065,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
10581065
* slot leading to a dangling slot on the server.
10591066
*/
10601067
HOLD_INTERRUPTS();
1061-
walrcv_create_slot(wrconn, slotname, false /* permanent */ ,
1068+
walrcv_create_slot(LogRepWorkerWalRcvConn, slotname, false /* permanent */ ,
10621069
CRS_USE_SNAPSHOT, origin_startpos);
10631070
RESUME_INTERRUPTS();
10641071

@@ -1100,7 +1107,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
11001107
copy_table(rel);
11011108
PopActiveSnapshot();
11021109

1103-
res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
1110+
res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
11041111
if (res->status != WALRCV_OK_COMMAND)
11051112
ereport(ERROR,
11061113
(errmsg("table copy could not finish transaction on publisher: %s",

src/backend/replication/logical/worker.c

+12-11
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ MemoryContext ApplyContext = NULL;
156156
/* per stream context for streaming transactions */
157157
static MemoryContext LogicalStreamingContext = NULL;
158158

159-
WalReceiverConn *wrconn = NULL;
159+
WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
160160

161161
Subscription *MySubscription = NULL;
162162
bool MySubscriptionValid = false;
@@ -2126,7 +2126,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
21262126

21272127
MemoryContextSwitchTo(ApplyMessageContext);
21282128

2129-
len = walrcv_receive(wrconn, &buf, &fd);
2129+
len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
21302130

21312131
if (len != 0)
21322132
{
@@ -2206,7 +2206,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
22062206
MemoryContextReset(ApplyMessageContext);
22072207
}
22082208

2209-
len = walrcv_receive(wrconn, &buf, &fd);
2209+
len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
22102210
}
22112211
}
22122212

@@ -2312,7 +2312,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
23122312
}
23132313

23142314
/* All done */
2315-
walrcv_endstreaming(wrconn, &tli);
2315+
walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
23162316
}
23172317

23182318
/*
@@ -2396,7 +2396,8 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
23962396
LSN_FORMAT_ARGS(writepos),
23972397
LSN_FORMAT_ARGS(flushpos));
23982398

2399-
walrcv_send(wrconn, reply_message->data, reply_message->len);
2399+
walrcv_send(LogRepWorkerWalRcvConn,
2400+
reply_message->data, reply_message->len);
24002401

24012402
if (recvpos > last_recvpos)
24022403
last_recvpos = recvpos;
@@ -3090,17 +3091,17 @@ ApplyWorkerMain(Datum main_arg)
30903091
origin_startpos = replorigin_session_get_progress(false);
30913092
CommitTransactionCommand();
30923093

3093-
wrconn = walrcv_connect(MySubscription->conninfo, true, MySubscription->name,
3094-
&err);
3095-
if (wrconn == NULL)
3094+
LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
3095+
MySubscription->name, &err);
3096+
if (LogRepWorkerWalRcvConn == NULL)
30963097
ereport(ERROR,
30973098
(errmsg("could not connect to the publisher: %s", err)));
30983099

30993100
/*
31003101
* We don't really use the output identify_system for anything but it
31013102
* does some initializations on the upstream so let's still call it.
31023103
*/
3103-
(void) walrcv_identify_system(wrconn, &startpointTLI);
3104+
(void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
31043105
}
31053106

31063107
/*
@@ -3116,14 +3117,14 @@ ApplyWorkerMain(Datum main_arg)
31163117
options.startpoint = origin_startpos;
31173118
options.slotname = myslotname;
31183119
options.proto.logical.proto_version =
3119-
walrcv_server_version(wrconn) >= 140000 ?
3120+
walrcv_server_version(LogRepWorkerWalRcvConn) >= 140000 ?
31203121
LOGICALREP_PROTO_STREAM_VERSION_NUM : LOGICALREP_PROTO_VERSION_NUM;
31213122
options.proto.logical.publication_names = MySubscription->publications;
31223123
options.proto.logical.binary = MySubscription->binary;
31233124
options.proto.logical.streaming = MySubscription->stream;
31243125

31253126
/* Start normal logical streaming replication. */
3126-
walrcv_startstreaming(wrconn, &options);
3127+
walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
31273128

31283129
/* Run the main loop. */
31293130
LogicalRepApplyLoop(origin_startpos);

src/include/replication/worker_internal.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ typedef struct LogicalRepWorker
6262
extern MemoryContext ApplyContext;
6363

6464
/* libpqreceiver connection */
65-
extern struct WalReceiverConn *wrconn;
65+
extern struct WalReceiverConn *LogRepWorkerWalRcvConn;
6666

6767
/* Worker and subscription objects. */
6868
extern Subscription *MySubscription;

0 commit comments

Comments
 (0)