@@ -295,7 +295,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
295
295
MyLogicalRepWorker -> relstate ,
296
296
MyLogicalRepWorker -> relstate_lsn );
297
297
298
- walrcv_endstreaming (wrconn , & tli );
298
+ walrcv_endstreaming (LogRepWorkerWalRcvConn , & tli );
299
299
finish_sync_worker ();
300
300
}
301
301
else
@@ -591,7 +591,7 @@ copy_read_data(void *outbuf, int minread, int maxread)
591
591
for (;;)
592
592
{
593
593
/* Try read the data. */
594
- len = walrcv_receive (wrconn , & buf , & fd );
594
+ len = walrcv_receive (LogRepWorkerWalRcvConn , & buf , & fd );
595
595
596
596
CHECK_FOR_INTERRUPTS ();
597
597
@@ -665,7 +665,8 @@ fetch_remote_table_info(char *nspname, char *relname,
665
665
" AND c.relkind = 'r'" ,
666
666
quote_literal_cstr (nspname ),
667
667
quote_literal_cstr (relname ));
668
- res = walrcv_exec (wrconn , cmd .data , 2 , tableRow );
668
+ res = walrcv_exec (LogRepWorkerWalRcvConn , cmd .data ,
669
+ lengthof (tableRow ), tableRow );
669
670
670
671
if (res -> status != WALRCV_OK_TUPLES )
671
672
ereport (ERROR ,
@@ -701,9 +702,11 @@ fetch_remote_table_info(char *nspname, char *relname,
701
702
" AND a.attrelid = %u"
702
703
" ORDER BY a.attnum" ,
703
704
lrel -> remoteid ,
704
- (walrcv_server_version (wrconn ) >= 120000 ? "AND a.attgenerated = ''" : "" ),
705
+ (walrcv_server_version (LogRepWorkerWalRcvConn ) >= 120000 ?
706
+ "AND a.attgenerated = ''" : "" ),
705
707
lrel -> remoteid );
706
- res = walrcv_exec (wrconn , cmd .data , 4 , attrRow );
708
+ res = walrcv_exec (LogRepWorkerWalRcvConn , cmd .data ,
709
+ lengthof (attrRow ), attrRow );
707
710
708
711
if (res -> status != WALRCV_OK_TUPLES )
709
712
ereport (ERROR ,
@@ -773,7 +776,7 @@ copy_table(Relation rel)
773
776
initStringInfo (& cmd );
774
777
appendStringInfo (& cmd , "COPY %s TO STDOUT" ,
775
778
quote_qualified_identifier (lrel .nspname , lrel .relname ));
776
- res = walrcv_exec (wrconn , cmd .data , 0 , NULL );
779
+ res = walrcv_exec (LogRepWorkerWalRcvConn , cmd .data , 0 , NULL );
777
780
pfree (cmd .data );
778
781
if (res -> status != WALRCV_OK_COPY_OUT )
779
782
ereport (ERROR ,
@@ -840,8 +843,9 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
840
843
* application_name, so that it is different from the main apply worker,
841
844
* so that synchronous replication can distinguish them.
842
845
*/
843
- wrconn = walrcv_connect (MySubscription -> conninfo , true, slotname , & err );
844
- if (wrconn == NULL )
846
+ LogRepWorkerWalRcvConn = walrcv_connect (MySubscription -> conninfo , true,
847
+ slotname , & err );
848
+ if (LogRepWorkerWalRcvConn == NULL )
845
849
ereport (ERROR ,
846
850
(errmsg ("could not connect to the publisher: %s" , err )));
847
851
@@ -886,7 +890,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
886
890
* inside the transaction so that we can use the snapshot made
887
891
* by the slot to get existing data.
888
892
*/
889
- res = walrcv_exec (wrconn ,
893
+ res = walrcv_exec (LogRepWorkerWalRcvConn ,
890
894
"BEGIN READ ONLY ISOLATION LEVEL "
891
895
"REPEATABLE READ" , 0 , NULL );
892
896
if (res -> status != WALRCV_OK_COMMAND )
@@ -903,14 +907,14 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
903
907
* that is consistent with the lsn used by the slot to start
904
908
* decoding.
905
909
*/
906
- walrcv_create_slot (wrconn , slotname , true,
910
+ walrcv_create_slot (LogRepWorkerWalRcvConn , slotname , true,
907
911
CRS_USE_SNAPSHOT , origin_startpos );
908
912
909
913
PushActiveSnapshot (GetTransactionSnapshot ());
910
914
copy_table (rel );
911
915
PopActiveSnapshot ();
912
916
913
- res = walrcv_exec (wrconn , "COMMIT" , 0 , NULL );
917
+ res = walrcv_exec (LogRepWorkerWalRcvConn , "COMMIT" , 0 , NULL );
914
918
if (res -> status != WALRCV_OK_COMMAND )
915
919
ereport (ERROR ,
916
920
(errmsg ("table copy could not finish transaction on publisher" ),
0 commit comments