25
25
#include "common/connect.h"
26
26
#include "funcapi.h"
27
27
#include "libpq-fe.h"
28
+ #include "libpq/libpq-be-fe-helpers.h"
28
29
#include "mb/pg_wchar.h"
29
30
#include "miscadmin.h"
30
31
#include "pgstat.h"
@@ -113,8 +114,6 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
113
114
};
114
115
115
116
/* Prototypes for private functions */
116
- static PGresult * libpqrcv_PQexec (PGconn * streamConn , const char * query );
117
- static PGresult * libpqrcv_PQgetResult (PGconn * streamConn );
118
117
static char * stringlist_to_identifierstr (PGconn * conn , List * strings );
119
118
120
119
/*
@@ -148,7 +147,6 @@ libpqrcv_connect(const char *conninfo, bool replication, bool logical,
148
147
bool must_use_password , const char * appname , char * * err )
149
148
{
150
149
WalReceiverConn * conn ;
151
- PostgresPollingStatusType status ;
152
150
const char * keys [6 ];
153
151
const char * vals [6 ];
154
152
int i = 0 ;
@@ -214,56 +212,17 @@ libpqrcv_connect(const char *conninfo, bool replication, bool logical,
214
212
Assert (i < lengthof (keys ));
215
213
216
214
conn = palloc0 (sizeof (WalReceiverConn ));
217
- conn -> streamConn = PQconnectStartParams (keys , vals ,
218
- /* expand_dbname = */ true);
219
- if (PQstatus (conn -> streamConn ) == CONNECTION_BAD )
220
- goto bad_connection_errmsg ;
221
-
222
- /*
223
- * Poll connection until we have OK or FAILED status.
224
- *
225
- * Per spec for PQconnectPoll, first wait till socket is write-ready.
226
- */
227
- status = PGRES_POLLING_WRITING ;
228
- do
229
- {
230
- int io_flag ;
231
- int rc ;
232
-
233
- if (status == PGRES_POLLING_READING )
234
- io_flag = WL_SOCKET_READABLE ;
235
- #ifdef WIN32
236
- /* Windows needs a different test while waiting for connection-made */
237
- else if (PQstatus (conn -> streamConn ) == CONNECTION_STARTED )
238
- io_flag = WL_SOCKET_CONNECTED ;
239
- #endif
240
- else
241
- io_flag = WL_SOCKET_WRITEABLE ;
242
-
243
- rc = WaitLatchOrSocket (MyLatch ,
244
- WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag ,
245
- PQsocket (conn -> streamConn ),
246
- 0 ,
247
- WAIT_EVENT_LIBPQWALRECEIVER_CONNECT );
248
-
249
- /* Interrupted? */
250
- if (rc & WL_LATCH_SET )
251
- {
252
- ResetLatch (MyLatch );
253
- ProcessWalRcvInterrupts ();
254
- }
255
-
256
- /* If socket is ready, advance the libpq state machine */
257
- if (rc & io_flag )
258
- status = PQconnectPoll (conn -> streamConn );
259
- } while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED );
215
+ conn -> streamConn =
216
+ libpqsrv_connect_params (keys , vals ,
217
+ /* expand_dbname = */ true,
218
+ WAIT_EVENT_LIBPQWALRECEIVER_CONNECT );
260
219
261
220
if (PQstatus (conn -> streamConn ) != CONNECTION_OK )
262
221
goto bad_connection_errmsg ;
263
222
264
223
if (must_use_password && !PQconnectionUsedPassword (conn -> streamConn ))
265
224
{
266
- PQfinish (conn -> streamConn );
225
+ libpqsrv_disconnect (conn -> streamConn );
267
226
pfree (conn );
268
227
269
228
ereport (ERROR ,
@@ -281,8 +240,9 @@ libpqrcv_connect(const char *conninfo, bool replication, bool logical,
281
240
{
282
241
PGresult * res ;
283
242
284
- res = libpqrcv_PQexec (conn -> streamConn ,
285
- ALWAYS_SECURE_SEARCH_PATH_SQL );
243
+ res = libpqsrv_exec (conn -> streamConn ,
244
+ ALWAYS_SECURE_SEARCH_PATH_SQL ,
245
+ WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE );
286
246
if (PQresultStatus (res ) != PGRES_TUPLES_OK )
287
247
{
288
248
PQclear (res );
@@ -303,7 +263,7 @@ libpqrcv_connect(const char *conninfo, bool replication, bool logical,
303
263
304
264
/* error path, error already set */
305
265
bad_connection :
306
- PQfinish (conn -> streamConn );
266
+ libpqsrv_disconnect (conn -> streamConn );
307
267
pfree (conn );
308
268
return NULL ;
309
269
}
@@ -454,7 +414,9 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
454
414
* Get the system identifier and timeline ID as a DataRow message from the
455
415
* primary server.
456
416
*/
457
- res = libpqrcv_PQexec (conn -> streamConn , "IDENTIFY_SYSTEM" );
417
+ res = libpqsrv_exec (conn -> streamConn ,
418
+ "IDENTIFY_SYSTEM" ,
419
+ WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE );
458
420
if (PQresultStatus (res ) != PGRES_TUPLES_OK )
459
421
{
460
422
PQclear (res );
@@ -631,7 +593,9 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
631
593
options -> proto .physical .startpointTLI );
632
594
633
595
/* Start streaming. */
634
- res = libpqrcv_PQexec (conn -> streamConn , cmd .data );
596
+ res = libpqsrv_exec (conn -> streamConn ,
597
+ cmd .data ,
598
+ WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE );
635
599
pfree (cmd .data );
636
600
637
601
if (PQresultStatus (res ) == PGRES_COMMAND_OK )
@@ -661,7 +625,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
661
625
PGresult * res ;
662
626
663
627
/*
664
- * Send copy-end message. As in libpqrcv_PQexec , this could theoretically
628
+ * Send copy-end message. As in libpqsrv_exec , this could theoretically
665
629
* block, but the risk seems small.
666
630
*/
667
631
if (PQputCopyEnd (conn -> streamConn , NULL ) <= 0 ||
@@ -681,7 +645,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
681
645
* If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
682
646
* also possible in case we aborted the copy in mid-stream.
683
647
*/
684
- res = libpqrcv_PQgetResult (conn -> streamConn );
648
+ res = libpqsrv_get_result (conn -> streamConn ,
649
+ WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE );
685
650
if (PQresultStatus (res ) == PGRES_TUPLES_OK )
686
651
{
687
652
/*
@@ -696,7 +661,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
696
661
PQclear (res );
697
662
698
663
/* the result set should be followed by CommandComplete */
699
- res = libpqrcv_PQgetResult (conn -> streamConn );
664
+ res = libpqsrv_get_result (conn -> streamConn ,
665
+ WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE );
700
666
}
701
667
else if (PQresultStatus (res ) == PGRES_COPY_OUT )
702
668
{
@@ -710,7 +676,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
710
676
pchomp (PQerrorMessage (conn -> streamConn )))));
711
677
712
678
/* CommandComplete should follow */
713
- res = libpqrcv_PQgetResult (conn -> streamConn );
679
+ res = libpqsrv_get_result (conn -> streamConn ,
680
+ WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE );
714
681
}
715
682
716
683
if (PQresultStatus (res ) != PGRES_COMMAND_OK )
@@ -721,7 +688,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
721
688
PQclear (res );
722
689
723
690
/* Verify that there are no more results */
724
- res = libpqrcv_PQgetResult (conn -> streamConn );
691
+ res = libpqsrv_get_result (conn -> streamConn ,
692
+ WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE );
725
693
if (res != NULL )
726
694
ereport (ERROR ,
727
695
(errcode (ERRCODE_PROTOCOL_VIOLATION ),
@@ -746,7 +714,9 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
746
714
* Request the primary to send over the history file for given timeline.
747
715
*/
748
716
snprintf (cmd , sizeof (cmd ), "TIMELINE_HISTORY %u" , tli );
749
- res = libpqrcv_PQexec (conn -> streamConn , cmd );
717
+ res = libpqsrv_exec (conn -> streamConn ,
718
+ cmd ,
719
+ WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE );
750
720
if (PQresultStatus (res ) != PGRES_TUPLES_OK )
751
721
{
752
722
PQclear (res );
@@ -776,114 +746,13 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
776
746
PQclear (res );
777
747
}
778
748
779
- /*
780
- * Send a query and wait for the results by using the asynchronous libpq
781
- * functions and socket readiness events.
782
- *
783
- * The function is modeled on libpqsrv_exec(), with the behavior difference
784
- * being that it calls ProcessWalRcvInterrupts(). As an optimization, it
785
- * skips try/catch, since all errors terminate the process.
786
- *
787
- * May return NULL, rather than an error result, on failure.
788
- */
789
- static PGresult *
790
- libpqrcv_PQexec (PGconn * streamConn , const char * query )
791
- {
792
- PGresult * lastResult = NULL ;
793
-
794
- /*
795
- * PQexec() silently discards any prior query results on the connection.
796
- * This is not required for this function as it's expected that the caller
797
- * (which is this library in all cases) will behave correctly and we don't
798
- * have to be backwards compatible with old libpq.
799
- */
800
-
801
- /*
802
- * Submit the query. Since we don't use non-blocking mode, this could
803
- * theoretically block. In practice, since we don't send very long query
804
- * strings, the risk seems negligible.
805
- */
806
- if (!PQsendQuery (streamConn , query ))
807
- return NULL ;
808
-
809
- for (;;)
810
- {
811
- /* Wait for, and collect, the next PGresult. */
812
- PGresult * result ;
813
-
814
- result = libpqrcv_PQgetResult (streamConn );
815
- if (result == NULL )
816
- break ; /* query is complete, or failure */
817
-
818
- /*
819
- * Emulate PQexec()'s behavior of returning the last result when there
820
- * are many. We are fine with returning just last error message.
821
- */
822
- PQclear (lastResult );
823
- lastResult = result ;
824
-
825
- if (PQresultStatus (lastResult ) == PGRES_COPY_IN ||
826
- PQresultStatus (lastResult ) == PGRES_COPY_OUT ||
827
- PQresultStatus (lastResult ) == PGRES_COPY_BOTH ||
828
- PQstatus (streamConn ) == CONNECTION_BAD )
829
- break ;
830
- }
831
-
832
- return lastResult ;
833
- }
834
-
835
- /*
836
- * Perform the equivalent of PQgetResult(), but watch for interrupts.
837
- */
838
- static PGresult *
839
- libpqrcv_PQgetResult (PGconn * streamConn )
840
- {
841
- /*
842
- * Collect data until PQgetResult is ready to get the result without
843
- * blocking.
844
- */
845
- while (PQisBusy (streamConn ))
846
- {
847
- int rc ;
848
-
849
- /*
850
- * We don't need to break down the sleep into smaller increments,
851
- * since we'll get interrupted by signals and can handle any
852
- * interrupts here.
853
- */
854
- rc = WaitLatchOrSocket (MyLatch ,
855
- WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
856
- WL_LATCH_SET ,
857
- PQsocket (streamConn ),
858
- 0 ,
859
- WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE );
860
-
861
- /* Interrupted? */
862
- if (rc & WL_LATCH_SET )
863
- {
864
- ResetLatch (MyLatch );
865
- ProcessWalRcvInterrupts ();
866
- }
867
-
868
- /* Consume whatever data is available from the socket */
869
- if (PQconsumeInput (streamConn ) == 0 )
870
- {
871
- /* trouble; return NULL */
872
- return NULL ;
873
- }
874
- }
875
-
876
- /* Now we can collect and return the next PGresult */
877
- return PQgetResult (streamConn );
878
- }
879
-
880
749
/*
881
750
* Disconnect connection to primary, if any.
882
751
*/
883
752
static void
884
753
libpqrcv_disconnect (WalReceiverConn * conn )
885
754
{
886
- PQfinish (conn -> streamConn );
755
+ libpqsrv_disconnect (conn -> streamConn );
887
756
PQfreemem (conn -> recvBuf );
888
757
pfree (conn );
889
758
}
@@ -937,13 +806,15 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer,
937
806
{
938
807
PGresult * res ;
939
808
940
- res = libpqrcv_PQgetResult (conn -> streamConn );
809
+ res = libpqsrv_get_result (conn -> streamConn ,
810
+ WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE );
941
811
if (PQresultStatus (res ) == PGRES_COMMAND_OK )
942
812
{
943
813
PQclear (res );
944
814
945
815
/* Verify that there are no more results. */
946
- res = libpqrcv_PQgetResult (conn -> streamConn );
816
+ res = libpqsrv_get_result (conn -> streamConn ,
817
+ WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE );
947
818
if (res != NULL )
948
819
{
949
820
PQclear (res );
@@ -1094,7 +965,9 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
1094
965
appendStringInfoString (& cmd , " PHYSICAL RESERVE_WAL" );
1095
966
}
1096
967
1097
- res = libpqrcv_PQexec (conn -> streamConn , cmd .data );
968
+ res = libpqsrv_exec (conn -> streamConn ,
969
+ cmd .data ,
970
+ WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE );
1098
971
pfree (cmd .data );
1099
972
1100
973
if (PQresultStatus (res ) != PGRES_TUPLES_OK )
@@ -1147,7 +1020,8 @@ libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
1147
1020
1148
1021
appendStringInfoString (& cmd , " );" );
1149
1022
1150
- res = libpqrcv_PQexec (conn -> streamConn , cmd .data );
1023
+ res = libpqsrv_exec (conn -> streamConn , cmd .data ,
1024
+ WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE );
1151
1025
pfree (cmd .data );
1152
1026
1153
1027
if (PQresultStatus (res ) != PGRES_COMMAND_OK )
@@ -1214,7 +1088,7 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
1214
1088
{
1215
1089
char * cstrs [MaxTupleAttributeNumber ];
1216
1090
1217
- ProcessWalRcvInterrupts ();
1091
+ CHECK_FOR_INTERRUPTS ();
1218
1092
1219
1093
/* Do the allocations in temporary context. */
1220
1094
oldcontext = MemoryContextSwitchTo (rowcontext );
@@ -1260,7 +1134,9 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query,
1260
1134
(errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
1261
1135
errmsg ("the query interface requires a database connection" )));
1262
1136
1263
- pgres = libpqrcv_PQexec (conn -> streamConn , query );
1137
+ pgres = libpqsrv_exec (conn -> streamConn ,
1138
+ query ,
1139
+ WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE );
1264
1140
1265
1141
switch (PQresultStatus (pgres ))
1266
1142
{
0 commit comments