Skip to content

Commit 6e9c818

Browse files
committed
Use standard die() signal handler in walreceiver
This gets rid of the bespoken ProcessWalRcvInterrupts() function, which lets walreceiver terminate at any CHECK_FOR_INTERRUPTS() call. And it's less code anyway. We can now use the standard libpqsrv_connect_params() libpq wrapper from libpq-be-fe-helpers.h, removing more code. We attempted to do that earlier already in commit 728f86f, but that was reverted because it didn't call ProcessWalRcvInterrupts() and therefore didn't react to shutdown requests. Now that ProcessWalRcvInterrupts() is gone, it works. As stated in that commit, this also leads to libpqwalreceiver reserving file descriptors for libpq conncetions, which is nice. Author: Andres Freund <andres@anarazel.de> (the earlier commit) Author: Kyotaro Horiguchi <horikyota.ntt@gmail.com> Reviewed-by: Fujii Masao <masao.fujii@gmail.com> Reviewed-by: Yura Sokolov <y.sokolov@postgrespro.ru>
1 parent 8123e91 commit 6e9c818

File tree

4 files changed

+51
-203
lines changed

4 files changed

+51
-203
lines changed

src/backend/replication/libpqwalreceiver/libpqwalreceiver.c

Lines changed: 42 additions & 166 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include "common/connect.h"
2626
#include "funcapi.h"
2727
#include "libpq-fe.h"
28+
#include "libpq/libpq-be-fe-helpers.h"
2829
#include "mb/pg_wchar.h"
2930
#include "miscadmin.h"
3031
#include "pgstat.h"
@@ -113,8 +114,6 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
113114
};
114115

115116
/* Prototypes for private functions */
116-
static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query);
117-
static PGresult *libpqrcv_PQgetResult(PGconn *streamConn);
118117
static char *stringlist_to_identifierstr(PGconn *conn, List *strings);
119118

120119
/*
@@ -148,7 +147,6 @@ libpqrcv_connect(const char *conninfo, bool replication, bool logical,
148147
bool must_use_password, const char *appname, char **err)
149148
{
150149
WalReceiverConn *conn;
151-
PostgresPollingStatusType status;
152150
const char *keys[6];
153151
const char *vals[6];
154152
int i = 0;
@@ -214,56 +212,17 @@ libpqrcv_connect(const char *conninfo, bool replication, bool logical,
214212
Assert(i < lengthof(keys));
215213

216214
conn = palloc0(sizeof(WalReceiverConn));
217-
conn->streamConn = PQconnectStartParams(keys, vals,
218-
/* expand_dbname = */ true);
219-
if (PQstatus(conn->streamConn) == CONNECTION_BAD)
220-
goto bad_connection_errmsg;
221-
222-
/*
223-
* Poll connection until we have OK or FAILED status.
224-
*
225-
* Per spec for PQconnectPoll, first wait till socket is write-ready.
226-
*/
227-
status = PGRES_POLLING_WRITING;
228-
do
229-
{
230-
int io_flag;
231-
int rc;
232-
233-
if (status == PGRES_POLLING_READING)
234-
io_flag = WL_SOCKET_READABLE;
235-
#ifdef WIN32
236-
/* Windows needs a different test while waiting for connection-made */
237-
else if (PQstatus(conn->streamConn) == CONNECTION_STARTED)
238-
io_flag = WL_SOCKET_CONNECTED;
239-
#endif
240-
else
241-
io_flag = WL_SOCKET_WRITEABLE;
242-
243-
rc = WaitLatchOrSocket(MyLatch,
244-
WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
245-
PQsocket(conn->streamConn),
246-
0,
247-
WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
248-
249-
/* Interrupted? */
250-
if (rc & WL_LATCH_SET)
251-
{
252-
ResetLatch(MyLatch);
253-
ProcessWalRcvInterrupts();
254-
}
255-
256-
/* If socket is ready, advance the libpq state machine */
257-
if (rc & io_flag)
258-
status = PQconnectPoll(conn->streamConn);
259-
} while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED);
215+
conn->streamConn =
216+
libpqsrv_connect_params(keys, vals,
217+
/* expand_dbname = */ true,
218+
WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
260219

261220
if (PQstatus(conn->streamConn) != CONNECTION_OK)
262221
goto bad_connection_errmsg;
263222

264223
if (must_use_password && !PQconnectionUsedPassword(conn->streamConn))
265224
{
266-
PQfinish(conn->streamConn);
225+
libpqsrv_disconnect(conn->streamConn);
267226
pfree(conn);
268227

269228
ereport(ERROR,
@@ -281,8 +240,9 @@ libpqrcv_connect(const char *conninfo, bool replication, bool logical,
281240
{
282241
PGresult *res;
283242

284-
res = libpqrcv_PQexec(conn->streamConn,
285-
ALWAYS_SECURE_SEARCH_PATH_SQL);
243+
res = libpqsrv_exec(conn->streamConn,
244+
ALWAYS_SECURE_SEARCH_PATH_SQL,
245+
WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
286246
if (PQresultStatus(res) != PGRES_TUPLES_OK)
287247
{
288248
PQclear(res);
@@ -303,7 +263,7 @@ libpqrcv_connect(const char *conninfo, bool replication, bool logical,
303263

304264
/* error path, error already set */
305265
bad_connection:
306-
PQfinish(conn->streamConn);
266+
libpqsrv_disconnect(conn->streamConn);
307267
pfree(conn);
308268
return NULL;
309269
}
@@ -454,7 +414,9 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
454414
* Get the system identifier and timeline ID as a DataRow message from the
455415
* primary server.
456416
*/
457-
res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM");
417+
res = libpqsrv_exec(conn->streamConn,
418+
"IDENTIFY_SYSTEM",
419+
WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
458420
if (PQresultStatus(res) != PGRES_TUPLES_OK)
459421
{
460422
PQclear(res);
@@ -631,7 +593,9 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
631593
options->proto.physical.startpointTLI);
632594

633595
/* Start streaming. */
634-
res = libpqrcv_PQexec(conn->streamConn, cmd.data);
596+
res = libpqsrv_exec(conn->streamConn,
597+
cmd.data,
598+
WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
635599
pfree(cmd.data);
636600

637601
if (PQresultStatus(res) == PGRES_COMMAND_OK)
@@ -661,7 +625,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
661625
PGresult *res;
662626

663627
/*
664-
* Send copy-end message. As in libpqrcv_PQexec, this could theoretically
628+
* Send copy-end message. As in libpqsrv_exec, this could theoretically
665629
* block, but the risk seems small.
666630
*/
667631
if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
@@ -681,7 +645,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
681645
* If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
682646
* also possible in case we aborted the copy in mid-stream.
683647
*/
684-
res = libpqrcv_PQgetResult(conn->streamConn);
648+
res = libpqsrv_get_result(conn->streamConn,
649+
WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
685650
if (PQresultStatus(res) == PGRES_TUPLES_OK)
686651
{
687652
/*
@@ -696,7 +661,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
696661
PQclear(res);
697662

698663
/* the result set should be followed by CommandComplete */
699-
res = libpqrcv_PQgetResult(conn->streamConn);
664+
res = libpqsrv_get_result(conn->streamConn,
665+
WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
700666
}
701667
else if (PQresultStatus(res) == PGRES_COPY_OUT)
702668
{
@@ -710,7 +676,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
710676
pchomp(PQerrorMessage(conn->streamConn)))));
711677

712678
/* CommandComplete should follow */
713-
res = libpqrcv_PQgetResult(conn->streamConn);
679+
res = libpqsrv_get_result(conn->streamConn,
680+
WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
714681
}
715682

716683
if (PQresultStatus(res) != PGRES_COMMAND_OK)
@@ -721,7 +688,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
721688
PQclear(res);
722689

723690
/* Verify that there are no more results */
724-
res = libpqrcv_PQgetResult(conn->streamConn);
691+
res = libpqsrv_get_result(conn->streamConn,
692+
WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
725693
if (res != NULL)
726694
ereport(ERROR,
727695
(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -746,7 +714,9 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
746714
* Request the primary to send over the history file for given timeline.
747715
*/
748716
snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
749-
res = libpqrcv_PQexec(conn->streamConn, cmd);
717+
res = libpqsrv_exec(conn->streamConn,
718+
cmd,
719+
WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
750720
if (PQresultStatus(res) != PGRES_TUPLES_OK)
751721
{
752722
PQclear(res);
@@ -776,114 +746,13 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
776746
PQclear(res);
777747
}
778748

779-
/*
780-
* Send a query and wait for the results by using the asynchronous libpq
781-
* functions and socket readiness events.
782-
*
783-
* The function is modeled on libpqsrv_exec(), with the behavior difference
784-
* being that it calls ProcessWalRcvInterrupts(). As an optimization, it
785-
* skips try/catch, since all errors terminate the process.
786-
*
787-
* May return NULL, rather than an error result, on failure.
788-
*/
789-
static PGresult *
790-
libpqrcv_PQexec(PGconn *streamConn, const char *query)
791-
{
792-
PGresult *lastResult = NULL;
793-
794-
/*
795-
* PQexec() silently discards any prior query results on the connection.
796-
* This is not required for this function as it's expected that the caller
797-
* (which is this library in all cases) will behave correctly and we don't
798-
* have to be backwards compatible with old libpq.
799-
*/
800-
801-
/*
802-
* Submit the query. Since we don't use non-blocking mode, this could
803-
* theoretically block. In practice, since we don't send very long query
804-
* strings, the risk seems negligible.
805-
*/
806-
if (!PQsendQuery(streamConn, query))
807-
return NULL;
808-
809-
for (;;)
810-
{
811-
/* Wait for, and collect, the next PGresult. */
812-
PGresult *result;
813-
814-
result = libpqrcv_PQgetResult(streamConn);
815-
if (result == NULL)
816-
break; /* query is complete, or failure */
817-
818-
/*
819-
* Emulate PQexec()'s behavior of returning the last result when there
820-
* are many. We are fine with returning just last error message.
821-
*/
822-
PQclear(lastResult);
823-
lastResult = result;
824-
825-
if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
826-
PQresultStatus(lastResult) == PGRES_COPY_OUT ||
827-
PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
828-
PQstatus(streamConn) == CONNECTION_BAD)
829-
break;
830-
}
831-
832-
return lastResult;
833-
}
834-
835-
/*
836-
* Perform the equivalent of PQgetResult(), but watch for interrupts.
837-
*/
838-
static PGresult *
839-
libpqrcv_PQgetResult(PGconn *streamConn)
840-
{
841-
/*
842-
* Collect data until PQgetResult is ready to get the result without
843-
* blocking.
844-
*/
845-
while (PQisBusy(streamConn))
846-
{
847-
int rc;
848-
849-
/*
850-
* We don't need to break down the sleep into smaller increments,
851-
* since we'll get interrupted by signals and can handle any
852-
* interrupts here.
853-
*/
854-
rc = WaitLatchOrSocket(MyLatch,
855-
WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
856-
WL_LATCH_SET,
857-
PQsocket(streamConn),
858-
0,
859-
WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
860-
861-
/* Interrupted? */
862-
if (rc & WL_LATCH_SET)
863-
{
864-
ResetLatch(MyLatch);
865-
ProcessWalRcvInterrupts();
866-
}
867-
868-
/* Consume whatever data is available from the socket */
869-
if (PQconsumeInput(streamConn) == 0)
870-
{
871-
/* trouble; return NULL */
872-
return NULL;
873-
}
874-
}
875-
876-
/* Now we can collect and return the next PGresult */
877-
return PQgetResult(streamConn);
878-
}
879-
880749
/*
881750
* Disconnect connection to primary, if any.
882751
*/
883752
static void
884753
libpqrcv_disconnect(WalReceiverConn *conn)
885754
{
886-
PQfinish(conn->streamConn);
755+
libpqsrv_disconnect(conn->streamConn);
887756
PQfreemem(conn->recvBuf);
888757
pfree(conn);
889758
}
@@ -937,13 +806,15 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer,
937806
{
938807
PGresult *res;
939808

940-
res = libpqrcv_PQgetResult(conn->streamConn);
809+
res = libpqsrv_get_result(conn->streamConn,
810+
WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
941811
if (PQresultStatus(res) == PGRES_COMMAND_OK)
942812
{
943813
PQclear(res);
944814

945815
/* Verify that there are no more results. */
946-
res = libpqrcv_PQgetResult(conn->streamConn);
816+
res = libpqsrv_get_result(conn->streamConn,
817+
WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
947818
if (res != NULL)
948819
{
949820
PQclear(res);
@@ -1094,7 +965,9 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
1094965
appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL");
1095966
}
1096967

1097-
res = libpqrcv_PQexec(conn->streamConn, cmd.data);
968+
res = libpqsrv_exec(conn->streamConn,
969+
cmd.data,
970+
WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
1098971
pfree(cmd.data);
1099972

1100973
if (PQresultStatus(res) != PGRES_TUPLES_OK)
@@ -1147,7 +1020,8 @@ libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
11471020

11481021
appendStringInfoString(&cmd, " );");
11491022

1150-
res = libpqrcv_PQexec(conn->streamConn, cmd.data);
1023+
res = libpqsrv_exec(conn->streamConn, cmd.data,
1024+
WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
11511025
pfree(cmd.data);
11521026

11531027
if (PQresultStatus(res) != PGRES_COMMAND_OK)
@@ -1214,7 +1088,7 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
12141088
{
12151089
char *cstrs[MaxTupleAttributeNumber];
12161090

1217-
ProcessWalRcvInterrupts();
1091+
CHECK_FOR_INTERRUPTS();
12181092

12191093
/* Do the allocations in temporary context. */
12201094
oldcontext = MemoryContextSwitchTo(rowcontext);
@@ -1260,7 +1134,9 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query,
12601134
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
12611135
errmsg("the query interface requires a database connection")));
12621136

1263-
pgres = libpqrcv_PQexec(conn->streamConn, query);
1137+
pgres = libpqsrv_exec(conn->streamConn,
1138+
query,
1139+
WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
12641140

12651141
switch (PQresultStatus(pgres))
12661142
{

0 commit comments

Comments
 (0)