@@ -302,8 +302,11 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
302
302
MyLogicalRepWorker -> relstate ,
303
303
MyLogicalRepWorker -> relstate_lsn );
304
304
305
- /* End wal streaming so wrconn can be re-used to drop the slot. */
306
- walrcv_endstreaming (wrconn , & tli );
305
+ /*
306
+ * End streaming so that LogRepWorkerWalRcvConn can be used to drop
307
+ * the slot.
308
+ */
309
+ walrcv_endstreaming (LogRepWorkerWalRcvConn , & tli );
307
310
308
311
/*
309
312
* Cleanup the tablesync slot.
@@ -322,7 +325,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
322
325
* otherwise, it won't be dropped till the corresponding subscription
323
326
* is dropped. So passing missing_ok = false.
324
327
*/
325
- ReplicationSlotDropAtPubNode (wrconn , syncslotname , false);
328
+ ReplicationSlotDropAtPubNode (LogRepWorkerWalRcvConn , syncslotname , false);
326
329
327
330
finish_sync_worker ();
328
331
}
@@ -642,7 +645,7 @@ copy_read_data(void *outbuf, int minread, int maxread)
642
645
for (;;)
643
646
{
644
647
/* Try read the data. */
645
- len = walrcv_receive (wrconn , & buf , & fd );
648
+ len = walrcv_receive (LogRepWorkerWalRcvConn , & buf , & fd );
646
649
647
650
CHECK_FOR_INTERRUPTS ();
648
651
@@ -715,7 +718,8 @@ fetch_remote_table_info(char *nspname, char *relname,
715
718
" AND c.relname = %s" ,
716
719
quote_literal_cstr (nspname ),
717
720
quote_literal_cstr (relname ));
718
- res = walrcv_exec (wrconn , cmd .data , lengthof (tableRow ), tableRow );
721
+ res = walrcv_exec (LogRepWorkerWalRcvConn , cmd .data ,
722
+ lengthof (tableRow ), tableRow );
719
723
720
724
if (res -> status != WALRCV_OK_TUPLES )
721
725
ereport (ERROR ,
@@ -752,9 +756,11 @@ fetch_remote_table_info(char *nspname, char *relname,
752
756
" AND a.attrelid = %u"
753
757
" ORDER BY a.attnum" ,
754
758
lrel -> remoteid ,
755
- (walrcv_server_version (wrconn ) >= 120000 ? "AND a.attgenerated = ''" : "" ),
759
+ (walrcv_server_version (LogRepWorkerWalRcvConn ) >= 120000 ?
760
+ "AND a.attgenerated = ''" : "" ),
756
761
lrel -> remoteid );
757
- res = walrcv_exec (wrconn , cmd .data , lengthof (attrRow ), attrRow );
762
+ res = walrcv_exec (LogRepWorkerWalRcvConn , cmd .data ,
763
+ lengthof (attrRow ), attrRow );
758
764
759
765
if (res -> status != WALRCV_OK_TUPLES )
760
766
ereport (ERROR ,
@@ -841,7 +847,7 @@ copy_table(Relation rel)
841
847
appendStringInfo (& cmd , " FROM %s) TO STDOUT" ,
842
848
quote_qualified_identifier (lrel .nspname , lrel .relname ));
843
849
}
844
- res = walrcv_exec (wrconn , cmd .data , 0 , NULL );
850
+ res = walrcv_exec (LogRepWorkerWalRcvConn , cmd .data , 0 , NULL );
845
851
pfree (cmd .data );
846
852
if (res -> status != WALRCV_OK_COPY_OUT )
847
853
ereport (ERROR ,
@@ -957,8 +963,9 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
957
963
* application_name, so that it is different from the main apply worker,
958
964
* so that synchronous replication can distinguish them.
959
965
*/
960
- wrconn = walrcv_connect (MySubscription -> conninfo , true, slotname , & err );
961
- if (wrconn == NULL )
966
+ LogRepWorkerWalRcvConn =
967
+ walrcv_connect (MySubscription -> conninfo , true, slotname , & err );
968
+ if (LogRepWorkerWalRcvConn == NULL )
962
969
ereport (ERROR ,
963
970
(errmsg ("could not connect to the publisher: %s" , err )));
964
971
@@ -985,7 +992,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
985
992
* breakdown then it wouldn't have succeeded so trying it next time
986
993
* seems like a better bet.
987
994
*/
988
- ReplicationSlotDropAtPubNode (wrconn , slotname , true);
995
+ ReplicationSlotDropAtPubNode (LogRepWorkerWalRcvConn , slotname , true);
989
996
}
990
997
else if (MyLogicalRepWorker -> relstate == SUBREL_STATE_FINISHEDCOPY )
991
998
{
@@ -1038,7 +1045,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
1038
1045
* ensures that both the replication slot we create (see below) and the
1039
1046
* COPY are consistent with each other.
1040
1047
*/
1041
- res = walrcv_exec (wrconn ,
1048
+ res = walrcv_exec (LogRepWorkerWalRcvConn ,
1042
1049
"BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ" ,
1043
1050
0 , NULL );
1044
1051
if (res -> status != WALRCV_OK_COMMAND )
@@ -1058,7 +1065,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
1058
1065
* slot leading to a dangling slot on the server.
1059
1066
*/
1060
1067
HOLD_INTERRUPTS ();
1061
- walrcv_create_slot (wrconn , slotname , false /* permanent */ ,
1068
+ walrcv_create_slot (LogRepWorkerWalRcvConn , slotname , false /* permanent */ ,
1062
1069
CRS_USE_SNAPSHOT , origin_startpos );
1063
1070
RESUME_INTERRUPTS ();
1064
1071
@@ -1100,7 +1107,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
1100
1107
copy_table (rel );
1101
1108
PopActiveSnapshot ();
1102
1109
1103
- res = walrcv_exec (wrconn , "COMMIT" , 0 , NULL );
1110
+ res = walrcv_exec (LogRepWorkerWalRcvConn , "COMMIT" , 0 , NULL );
1104
1111
if (res -> status != WALRCV_OK_COMMAND )
1105
1112
ereport (ERROR ,
1106
1113
(errmsg ("table copy could not finish transaction on publisher: %s" ,
0 commit comments