@@ -35,6 +35,8 @@ static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
35
35
uint32 timeline , char * basedir ,
36
36
stream_stop_callback stream_stop , int standby_message_timeout ,
37
37
char * partial_suffix , XLogRecPtr * stoppos );
38
+ static int CopyStreamPoll (PGconn * conn , long timeout_ms );
39
+ static int CopyStreamReceive (PGconn * conn , long timeout , char * * buffer );
38
40
39
41
static bool ReadEndOfStreamingResult (PGresult * res , XLogRecPtr * startpos ,
40
42
uint32 * timeline );
@@ -744,12 +746,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
744
746
int bytes_written ;
745
747
int64 now ;
746
748
int hdr_len ;
747
-
748
- if (copybuf != NULL )
749
- {
750
- PQfreemem (copybuf );
751
- copybuf = NULL ;
752
- }
749
+ long sleeptime ;
753
750
754
751
/*
755
752
* Check if we should continue streaming, or abort at this point.
@@ -784,67 +781,38 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
784
781
last_status = now ;
785
782
}
786
783
787
- r = PQgetCopyData (conn , & copybuf , 1 );
788
- if (r == 0 )
784
+ /*
785
+ * Compute how long send/receive loops should sleep
786
+ */
787
+ if (standby_message_timeout && still_sending )
789
788
{
790
- /*
791
- * No data available. Wait for some to appear, but not longer than
792
- * the specified timeout, so that we can ping the server.
793
- */
794
- fd_set input_mask ;
795
- struct timeval timeout ;
796
- struct timeval * timeoutptr ;
797
-
798
- FD_ZERO ( & input_mask );
799
- FD_SET ( PQsocket ( conn ), & input_mask );
800
- if (standby_message_timeout && still_sending )
789
+ int64 targettime ;
790
+ long secs ;
791
+ int usecs ;
792
+
793
+ targettime = last_status + ( standby_message_timeout - 1 ) * (( int64 ) 1000 ) ;
794
+ feTimestampDifference ( now ,
795
+ targettime ,
796
+ & secs ,
797
+ & usecs );
798
+ /* Always sleep at least 1 sec */
799
+ if (secs <= 0 )
801
800
{
802
- int64 targettime ;
803
- long secs ;
804
- int usecs ;
805
-
806
- targettime = last_status + (standby_message_timeout - 1 ) * ((int64 ) 1000 );
807
- feTimestampDifference (now ,
808
- targettime ,
809
- & secs ,
810
- & usecs );
811
- if (secs <= 0 )
812
- timeout .tv_sec = 1 ; /* Always sleep at least 1 sec */
813
- else
814
- timeout .tv_sec = secs ;
815
- timeout .tv_usec = usecs ;
816
- timeoutptr = & timeout ;
801
+ secs = 1 ;
802
+ usecs = 0 ;
817
803
}
818
- else
819
- timeoutptr = NULL ;
820
804
821
- r = select (PQsocket (conn ) + 1 , & input_mask , NULL , NULL , timeoutptr );
822
- if (r == 0 || (r < 0 && errno == EINTR ))
823
- {
824
- /*
825
- * Got a timeout or signal. Continue the loop and either
826
- * deliver a status packet to the server or just go back into
827
- * blocking.
828
- */
829
- continue ;
830
- }
831
- else if (r < 0 )
832
- {
833
- fprintf (stderr , _ ("%s: select() failed: %s\n" ),
834
- progname , strerror (errno ));
835
- goto error ;
836
- }
837
- /* Else there is actually data on the socket */
838
- if (PQconsumeInput (conn ) == 0 )
839
- {
840
- fprintf (stderr ,
841
- _ ("%s: could not receive data from WAL stream: %s" ),
842
- progname , PQerrorMessage (conn ));
843
- goto error ;
844
- }
845
- continue ;
805
+ sleeptime = secs * 1000 + usecs / 1000 ;
846
806
}
807
+ else
808
+ sleeptime = -1 ;
809
+
810
+ r = CopyStreamReceive (conn , sleeptime , & copybuf );
811
+ if (r == 0 )
812
+ continue ;
847
813
if (r == -1 )
814
+ goto error ;
815
+ if (r == -2 )
848
816
{
849
817
PGresult * res = PQgetResult (conn );
850
818
@@ -877,15 +845,10 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
877
845
}
878
846
if (copybuf != NULL )
879
847
PQfreemem (copybuf );
848
+ copybuf = NULL ;
880
849
* stoppos = blockpos ;
881
850
return res ;
882
851
}
883
- if (r == -2 )
884
- {
885
- fprintf (stderr , _ ("%s: could not read COPY data: %s" ),
886
- progname , PQerrorMessage (conn ));
887
- goto error ;
888
- }
889
852
890
853
/* Check the message type. */
891
854
if (copybuf [0 ] == 'k' )
@@ -1056,3 +1019,115 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
1056
1019
PQfreemem (copybuf );
1057
1020
return NULL ;
1058
1021
}
1022
+
1023
+ /*
1024
+ * Wait until we can read CopyData message, or timeout.
1025
+ *
1026
+ * Returns 1 if data has become available for reading, 0 if timed out
1027
+ * or interrupted by signal, and -1 on an error.
1028
+ */
1029
+ static int
1030
+ CopyStreamPoll (PGconn * conn , long timeout_ms )
1031
+ {
1032
+ int ret ;
1033
+ fd_set input_mask ;
1034
+ struct timeval timeout ;
1035
+ struct timeval * timeoutptr ;
1036
+
1037
+ if (PQsocket (conn ) < 0 )
1038
+ {
1039
+ fprintf (stderr , _ ("%s: socket not open" ), progname );
1040
+ return -1 ;
1041
+ }
1042
+
1043
+ FD_ZERO (& input_mask );
1044
+ FD_SET (PQsocket (conn ), & input_mask );
1045
+
1046
+ if (timeout_ms < 0 )
1047
+ timeoutptr = NULL ;
1048
+ else
1049
+ {
1050
+ timeout .tv_sec = timeout_ms / 1000L ;
1051
+ timeout .tv_usec = (timeout_ms % 1000L ) * 1000L ;
1052
+ timeoutptr = & timeout ;
1053
+ }
1054
+
1055
+ ret = select (PQsocket (conn ) + 1 , & input_mask , NULL , NULL , timeoutptr );
1056
+ if (ret == 0 || (ret < 0 && errno == EINTR ))
1057
+ return 0 ; /* Got a timeout or signal */
1058
+ else if (ret < 0 )
1059
+ {
1060
+ fprintf (stderr , _ ("%s: select() failed: %s\n" ),
1061
+ progname , strerror (errno ));
1062
+ return -1 ;
1063
+ }
1064
+
1065
+ return 1 ;
1066
+ }
1067
+
1068
+ /*
1069
+ * Receive CopyData message available from XLOG stream, blocking for
1070
+ * maximum of 'timeout' ms.
1071
+ *
1072
+ * If data was received, returns the length of the data. *buffer is set to
1073
+ * point to a buffer holding the received message. The buffer is only valid
1074
+ * until the next CopyStreamReceive call.
1075
+ *
1076
+ * 0 if no data was available within timeout, or wait was interrupted
1077
+ * by signal. -1 on error. -2 if the server ended the COPY.
1078
+ */
1079
+ static int
1080
+ CopyStreamReceive (PGconn * conn , long timeout , char * * buffer )
1081
+ {
1082
+ static char * copybuf = NULL ;
1083
+ int rawlen ;
1084
+
1085
+ if (copybuf != NULL )
1086
+ PQfreemem (copybuf );
1087
+ copybuf = NULL ;
1088
+ * buffer = NULL ;
1089
+
1090
+ /* Try to receive a CopyData message */
1091
+ rawlen = PQgetCopyData (conn , & copybuf , 1 );
1092
+ if (rawlen == 0 )
1093
+ {
1094
+ /*
1095
+ * No data available. Wait for some to appear, but not longer than
1096
+ * the specified timeout, so that we can ping the server.
1097
+ */
1098
+ if (timeout > 0 )
1099
+ {
1100
+ int ret ;
1101
+
1102
+ ret = CopyStreamPoll (conn , timeout );
1103
+ if (ret <= 0 )
1104
+ return ret ;
1105
+ }
1106
+
1107
+ /* Else there is actually data on the socket */
1108
+ if (PQconsumeInput (conn ) == 0 )
1109
+ {
1110
+ fprintf (stderr ,
1111
+ _ ("%s: could not receive data from WAL stream: %s" ),
1112
+ progname , PQerrorMessage (conn ));
1113
+ return -1 ;
1114
+ }
1115
+
1116
+ /* Now that we've consumed some input, try again */
1117
+ rawlen = PQgetCopyData (conn , & copybuf , 1 );
1118
+ if (rawlen == 0 )
1119
+ return 0 ;
1120
+ }
1121
+ if (rawlen == -1 ) /* end-of-streaming or error */
1122
+ return -2 ;
1123
+ if (rawlen == -2 )
1124
+ {
1125
+ fprintf (stderr , _ ("%s: could not read COPY data: %s" ),
1126
+ progname , PQerrorMessage (conn ));
1127
+ return -1 ;
1128
+ }
1129
+
1130
+ /* Return received messages to caller */
1131
+ * buffer = copybuf ;
1132
+ return rawlen ;
1133
+ }
0 commit comments