Skip to content

Commit 1e8a850

Browse files
committed
Use asynchronous connect API in libpqwalreceiver
This makes the connection attempt from CREATE SUBSCRIPTION and from WalReceiver interruptable by the user in case the libpq connection is hanging. The previous coding required immediate shutdown (SIGQUIT) of PostgreSQL in that situation. From: Petr Jelinek <petr.jelinek@2ndquadrant.com> Tested-by: Thom Brown <thom@linux.com>
1 parent 9eb344f commit 1e8a850

File tree

3 files changed

+50
-5
lines changed

3 files changed

+50
-5
lines changed

src/backend/postmaster/pgstat.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3340,8 +3340,8 @@ pgstat_get_wait_client(WaitEventClient w)
33403340
case WAIT_EVENT_WAL_RECEIVER_WAIT_START:
33413341
event_name = "WalReceiverWaitStart";
33423342
break;
3343-
case WAIT_EVENT_LIBPQWALRECEIVER_READ:
3344-
event_name = "LibPQWalReceiverRead";
3343+
case WAIT_EVENT_LIBPQWALRECEIVER:
3344+
event_name = "LibPQWalReceiver";
33453345
break;
33463346
case WAIT_EVENT_WAL_SENDER_WAIT_WAL:
33473347
event_name = "WalSenderWaitForWAL";

src/backend/replication/libpqwalreceiver/libpqwalreceiver.c

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
113113
char **err)
114114
{
115115
WalReceiverConn *conn;
116+
PostgresPollingStatusType status;
116117
const char *keys[5];
117118
const char *vals[5];
118119
int i = 0;
@@ -146,7 +147,51 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
146147
Assert(i < sizeof(keys));
147148

148149
conn = palloc0(sizeof(WalReceiverConn));
149-
conn->streamConn = PQconnectdbParams(keys, vals, /* expand_dbname = */ true);
150+
conn->streamConn = PQconnectStartParams(keys, vals,
151+
/* expand_dbname = */ true);
152+
if (PQstatus(conn->streamConn) == CONNECTION_BAD)
153+
{
154+
*err = pchomp(PQerrorMessage(conn->streamConn));
155+
return NULL;
156+
}
157+
158+
/* Poll connection. */
159+
do
160+
{
161+
/* Determine current state of the connection. */
162+
status = PQconnectPoll(conn->streamConn);
163+
164+
/* Sleep a bit if waiting for socket. */
165+
if (status == PGRES_POLLING_READING ||
166+
status == PGRES_POLLING_WRITING)
167+
{
168+
int extra_flag;
169+
int rc;
170+
171+
extra_flag = (status == PGRES_POLLING_READING
172+
? WL_SOCKET_READABLE
173+
: WL_SOCKET_WRITEABLE);
174+
175+
ResetLatch(&MyProc->procLatch);
176+
rc = WaitLatchOrSocket(&MyProc->procLatch,
177+
WL_POSTMASTER_DEATH |
178+
WL_LATCH_SET | extra_flag,
179+
PQsocket(conn->streamConn),
180+
0,
181+
WAIT_EVENT_LIBPQWALRECEIVER);
182+
183+
/* Emergency bailout. */
184+
if (rc & WL_POSTMASTER_DEATH)
185+
exit(1);
186+
187+
/* Interrupted. */
188+
if (rc & WL_LATCH_SET)
189+
CHECK_FOR_INTERRUPTS();
190+
}
191+
192+
/* Otherwise loop until we have OK or FAILED status. */
193+
} while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED);
194+
150195
if (PQstatus(conn->streamConn) != CONNECTION_OK)
151196
{
152197
*err = pchomp(PQerrorMessage(conn->streamConn));
@@ -529,7 +574,7 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query)
529574
WL_LATCH_SET,
530575
PQsocket(streamConn),
531576
0,
532-
WAIT_EVENT_LIBPQWALRECEIVER_READ);
577+
WAIT_EVENT_LIBPQWALRECEIVER);
533578
if (rc & WL_POSTMASTER_DEATH)
534579
exit(1);
535580

src/include/pgstat.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -764,7 +764,7 @@ typedef enum
764764
WAIT_EVENT_CLIENT_WRITE,
765765
WAIT_EVENT_SSL_OPEN_SERVER,
766766
WAIT_EVENT_WAL_RECEIVER_WAIT_START,
767-
WAIT_EVENT_LIBPQWALRECEIVER_READ,
767+
WAIT_EVENT_LIBPQWALRECEIVER,
768768
WAIT_EVENT_WAL_SENDER_WAIT_WAL,
769769
WAIT_EVENT_WAL_SENDER_WRITE_DATA
770770
} WaitEventClient;

0 commit comments

Comments
 (0)