Skip to content

Commit 728f86f

Browse files
committed
libpqwalreceiver: Convert to libpq-be-fe-helpers.h
In contrast to the changes to dblink and postgres_fdw, this does not fix a bug, as libpqwalreceiver did already process interrupts. Besides reducing code duplication, the conversion leads to libpqwalreceiver now using reserving file descriptors for libpq connections. While not strictly required for the use in walreceiver, we are also using libpqwalreceiver for logical replication, where it does seem more important. Even if we eventually decide to backpatch the prior commits, there'd be no need to backpatch this commit, due to not fixing an active bug. Reviewed-by: Thomas Munro <thomas.munro@gmail.com> Discussion: https://postgr.es/m/20220925232237.p6uskba2dw6fnwj2@awork3.anarazel.de
1 parent e460248 commit 728f86f

File tree

1 file changed

+7
-46
lines changed

1 file changed

+7
-46
lines changed

src/backend/replication/libpqwalreceiver/libpqwalreceiver.c

Lines changed: 7 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include "common/connect.h"
2525
#include "funcapi.h"
2626
#include "libpq-fe.h"
27+
#include "libpq/libpq-be-fe-helpers.h"
2728
#include "mb/pg_wchar.h"
2829
#include "miscadmin.h"
2930
#include "pgstat.h"
@@ -125,7 +126,6 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
125126
char **err)
126127
{
127128
WalReceiverConn *conn;
128-
PostgresPollingStatusType status;
129129
const char *keys[6];
130130
const char *vals[6];
131131
int i = 0;
@@ -172,49 +172,10 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
172172
Assert(i < sizeof(keys));
173173

174174
conn = palloc0(sizeof(WalReceiverConn));
175-
conn->streamConn = PQconnectStartParams(keys, vals,
176-
/* expand_dbname = */ true);
177-
if (PQstatus(conn->streamConn) == CONNECTION_BAD)
178-
goto bad_connection_errmsg;
179-
180-
/*
181-
* Poll connection until we have OK or FAILED status.
182-
*
183-
* Per spec for PQconnectPoll, first wait till socket is write-ready.
184-
*/
185-
status = PGRES_POLLING_WRITING;
186-
do
187-
{
188-
int io_flag;
189-
int rc;
190-
191-
if (status == PGRES_POLLING_READING)
192-
io_flag = WL_SOCKET_READABLE;
193-
#ifdef WIN32
194-
/* Windows needs a different test while waiting for connection-made */
195-
else if (PQstatus(conn->streamConn) == CONNECTION_STARTED)
196-
io_flag = WL_SOCKET_CONNECTED;
197-
#endif
198-
else
199-
io_flag = WL_SOCKET_WRITEABLE;
200-
201-
rc = WaitLatchOrSocket(MyLatch,
202-
WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
203-
PQsocket(conn->streamConn),
204-
0,
205-
WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
206-
207-
/* Interrupted? */
208-
if (rc & WL_LATCH_SET)
209-
{
210-
ResetLatch(MyLatch);
211-
ProcessWalRcvInterrupts();
212-
}
213-
214-
/* If socket is ready, advance the libpq state machine */
215-
if (rc & io_flag)
216-
status = PQconnectPoll(conn->streamConn);
217-
} while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED);
175+
conn->streamConn =
176+
libpqsrv_connect_params(keys, vals,
177+
/* expand_dbname = */ true,
178+
WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
218179

219180
if (PQstatus(conn->streamConn) != CONNECTION_OK)
220181
goto bad_connection_errmsg;
@@ -245,7 +206,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
245206

246207
/* error path, error already set */
247208
bad_connection:
248-
PQfinish(conn->streamConn);
209+
libpqsrv_disconnect(conn->streamConn);
249210
pfree(conn);
250211
return NULL;
251212
}
@@ -744,7 +705,7 @@ libpqrcv_PQgetResult(PGconn *streamConn)
744705
static void
745706
libpqrcv_disconnect(WalReceiverConn *conn)
746707
{
747-
PQfinish(conn->streamConn);
708+
libpqsrv_disconnect(conn->streamConn);
748709
PQfreemem(conn->recvBuf);
749710
pfree(conn);
750711
}

0 commit comments

Comments
 (0)