@@ -39,8 +39,9 @@ static bool still_sending = true; /* feedback still needs to be sent? */
39
39
40
40
static PGresult * HandleCopyStream (PGconn * conn , StreamCtl * stream ,
41
41
XLogRecPtr * stoppos );
42
- static int CopyStreamPoll (PGconn * conn , long timeout_ms );
43
- static int CopyStreamReceive (PGconn * conn , long timeout , char * * buffer );
42
+ static int CopyStreamPoll (PGconn * conn , long timeout_ms , pgsocket stop_socket );
43
+ static int CopyStreamReceive (PGconn * conn , long timeout , pgsocket stop_socket ,
44
+ char * * buffer );
44
45
static bool ProcessKeepaliveMsg (PGconn * conn , StreamCtl * stream , char * copybuf ,
45
46
int len , XLogRecPtr blockpos , TimestampTz * last_status );
46
47
static bool ProcessXLogDataMsg (PGconn * conn , StreamCtl * stream , char * copybuf , int len ,
@@ -417,8 +418,15 @@ CheckServerVersionForStreaming(PGconn *conn)
417
418
* return. As long as it returns false, streaming will continue
418
419
* indefinitely.
419
420
*
421
+ * If stream_stop() checks for external input, stop_socket should be set to
422
+ * the FD it checks. This will allow such input to be detected promptly
423
+ * rather than after standby_message_timeout (which might be indefinite).
424
+ * Note that signals will interrupt waits for input as well, but that is
425
+ * race-y since a signal received while busy won't interrupt the wait.
426
+ *
420
427
* standby_message_timeout controls how often we send a message
421
428
* back to the master letting it know our progress, in milliseconds.
429
+ * Zero means no messages are sent.
422
430
* This message will only contain the write location, and never
423
431
* flush or replay.
424
432
*
@@ -825,7 +833,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
825
833
sleeptime = CalculateCopyStreamSleeptime (now , stream -> standby_message_timeout ,
826
834
last_status );
827
835
828
- r = CopyStreamReceive (conn , sleeptime , & copybuf );
836
+ r = CopyStreamReceive (conn , sleeptime , stream -> stop_socket , & copybuf );
829
837
while (r != 0 )
830
838
{
831
839
if (r == -1 )
@@ -870,7 +878,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
870
878
* Process the received data, and any subsequent data we can read
871
879
* without blocking.
872
880
*/
873
- r = CopyStreamReceive (conn , 0 , & copybuf );
881
+ r = CopyStreamReceive (conn , 0 , stream -> stop_socket , & copybuf );
874
882
}
875
883
}
876
884
@@ -881,28 +889,39 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
881
889
}
882
890
883
891
/*
884
- * Wait until we can read CopyData message, or timeout.
892
+ * Wait until we can read a CopyData message,
893
+ * or timeout, or occurrence of a signal or input on the stop_socket.
894
+ * (timeout_ms < 0 means wait indefinitely; 0 means don't wait.)
885
895
*
886
896
* Returns 1 if data has become available for reading, 0 if timed out
887
- * or interrupted by signal, and -1 on an error.
897
+ * or interrupted by signal or stop_socket input , and -1 on an error.
888
898
*/
889
899
static int
890
- CopyStreamPoll (PGconn * conn , long timeout_ms )
900
+ CopyStreamPoll (PGconn * conn , long timeout_ms , pgsocket stop_socket )
891
901
{
892
902
int ret ;
893
903
fd_set input_mask ;
904
+ int connsocket ;
905
+ int maxfd ;
894
906
struct timeval timeout ;
895
907
struct timeval * timeoutptr ;
896
908
897
- if (PQsocket (conn ) < 0 )
909
+ connsocket = PQsocket (conn );
910
+ if (connsocket < 0 )
898
911
{
899
912
fprintf (stderr , _ ("%s: invalid socket: %s" ), progname ,
900
913
PQerrorMessage (conn ));
901
914
return -1 ;
902
915
}
903
916
904
917
FD_ZERO (& input_mask );
905
- FD_SET (PQsocket (conn ), & input_mask );
918
+ FD_SET (connsocket , & input_mask );
919
+ maxfd = connsocket ;
920
+ if (stop_socket != PGINVALID_SOCKET )
921
+ {
922
+ FD_SET (stop_socket , & input_mask );
923
+ maxfd = Max (maxfd , stop_socket );
924
+ }
906
925
907
926
if (timeout_ms < 0 )
908
927
timeoutptr = NULL ;
@@ -913,17 +932,20 @@ CopyStreamPoll(PGconn *conn, long timeout_ms)
913
932
timeoutptr = & timeout ;
914
933
}
915
934
916
- ret = select (PQsocket (conn ) + 1 , & input_mask , NULL , NULL , timeoutptr );
917
- if (ret == 0 || (ret < 0 && errno == EINTR ))
918
- return 0 ; /* Got a timeout or signal */
919
- else if (ret < 0 )
935
+ ret = select (maxfd + 1 , & input_mask , NULL , NULL , timeoutptr );
936
+
937
+ if (ret < 0 )
920
938
{
939
+ if (errno == EINTR )
940
+ return 0 ; /* Got a signal, so not an error */
921
941
fprintf (stderr , _ ("%s: select() failed: %s\n" ),
922
942
progname , strerror (errno ));
923
943
return -1 ;
924
944
}
945
+ if (ret > 0 && FD_ISSET (connsocket , & input_mask ))
946
+ return 1 ; /* Got input on connection socket */
925
947
926
- return 1 ;
948
+ return 0 ; /* Got timeout or input on stop_socket */
927
949
}
928
950
929
951
/*
@@ -934,11 +956,13 @@ CopyStreamPoll(PGconn *conn, long timeout_ms)
934
956
* point to a buffer holding the received message. The buffer is only valid
935
957
* until the next CopyStreamReceive call.
936
958
*
937
- * 0 if no data was available within timeout, or wait was interrupted
938
- * by signal. -1 on error. -2 if the server ended the COPY.
959
+ * Returns 0 if no data was available within timeout, or if wait was
960
+ * interrupted by signal or stop_socket input.
961
+ * -1 on error. -2 if the server ended the COPY.
939
962
*/
940
963
static int
941
- CopyStreamReceive (PGconn * conn , long timeout , char * * buffer )
964
+ CopyStreamReceive (PGconn * conn , long timeout , pgsocket stop_socket ,
965
+ char * * buffer )
942
966
{
943
967
char * copybuf = NULL ;
944
968
int rawlen ;
@@ -951,20 +975,18 @@ CopyStreamReceive(PGconn *conn, long timeout, char **buffer)
951
975
rawlen = PQgetCopyData (conn , & copybuf , 1 );
952
976
if (rawlen == 0 )
953
977
{
978
+ int ret ;
979
+
954
980
/*
955
- * No data available. Wait for some to appear, but not longer than the
956
- * specified timeout, so that we can ping the server.
981
+ * No data available. Wait for some to appear, but not longer than
982
+ * the specified timeout, so that we can ping the server. Also stop
983
+ * waiting if input appears on stop_socket.
957
984
*/
958
- if (timeout != 0 )
959
- {
960
- int ret ;
961
-
962
- ret = CopyStreamPoll (conn , timeout );
963
- if (ret <= 0 )
964
- return ret ;
965
- }
985
+ ret = CopyStreamPoll (conn , timeout , stop_socket );
986
+ if (ret <= 0 )
987
+ return ret ;
966
988
967
- /* Else there is actually data on the socket */
989
+ /* Now there is actually data on the socket */
968
990
if (PQconsumeInput (conn ) == 0 )
969
991
{
970
992
fprintf (stderr ,
0 commit comments