|
24 | 24 | #include "common/connect.h"
|
25 | 25 | #include "funcapi.h"
|
26 | 26 | #include "libpq-fe.h"
|
| 27 | +#include "libpq/libpq-be-fe-helpers.h" |
27 | 28 | #include "mb/pg_wchar.h"
|
28 | 29 | #include "miscadmin.h"
|
29 | 30 | #include "pgstat.h"
|
@@ -125,7 +126,6 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
|
125 | 126 | char **err)
|
126 | 127 | {
|
127 | 128 | WalReceiverConn *conn;
|
128 |
| - PostgresPollingStatusType status; |
129 | 129 | const char *keys[6];
|
130 | 130 | const char *vals[6];
|
131 | 131 | int i = 0;
|
@@ -172,49 +172,10 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
|
172 | 172 | Assert(i < sizeof(keys));
|
173 | 173 |
|
174 | 174 | conn = palloc0(sizeof(WalReceiverConn));
|
175 |
| - conn->streamConn = PQconnectStartParams(keys, vals, |
176 |
| - /* expand_dbname = */ true); |
177 |
| - if (PQstatus(conn->streamConn) == CONNECTION_BAD) |
178 |
| - goto bad_connection_errmsg; |
179 |
| - |
180 |
| - /* |
181 |
| - * Poll connection until we have OK or FAILED status. |
182 |
| - * |
183 |
| - * Per spec for PQconnectPoll, first wait till socket is write-ready. |
184 |
| - */ |
185 |
| - status = PGRES_POLLING_WRITING; |
186 |
| - do |
187 |
| - { |
188 |
| - int io_flag; |
189 |
| - int rc; |
190 |
| - |
191 |
| - if (status == PGRES_POLLING_READING) |
192 |
| - io_flag = WL_SOCKET_READABLE; |
193 |
| -#ifdef WIN32 |
194 |
| - /* Windows needs a different test while waiting for connection-made */ |
195 |
| - else if (PQstatus(conn->streamConn) == CONNECTION_STARTED) |
196 |
| - io_flag = WL_SOCKET_CONNECTED; |
197 |
| -#endif |
198 |
| - else |
199 |
| - io_flag = WL_SOCKET_WRITEABLE; |
200 |
| - |
201 |
| - rc = WaitLatchOrSocket(MyLatch, |
202 |
| - WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag, |
203 |
| - PQsocket(conn->streamConn), |
204 |
| - 0, |
205 |
| - WAIT_EVENT_LIBPQWALRECEIVER_CONNECT); |
206 |
| - |
207 |
| - /* Interrupted? */ |
208 |
| - if (rc & WL_LATCH_SET) |
209 |
| - { |
210 |
| - ResetLatch(MyLatch); |
211 |
| - ProcessWalRcvInterrupts(); |
212 |
| - } |
213 |
| - |
214 |
| - /* If socket is ready, advance the libpq state machine */ |
215 |
| - if (rc & io_flag) |
216 |
| - status = PQconnectPoll(conn->streamConn); |
217 |
| - } while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED); |
| 175 | + conn->streamConn = |
| 176 | + libpqsrv_connect_params(keys, vals, |
| 177 | + /* expand_dbname = */ true, |
| 178 | + WAIT_EVENT_LIBPQWALRECEIVER_CONNECT); |
218 | 179 |
|
219 | 180 | if (PQstatus(conn->streamConn) != CONNECTION_OK)
|
220 | 181 | goto bad_connection_errmsg;
|
@@ -245,7 +206,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
|
245 | 206 |
|
246 | 207 | /* error path, error already set */
|
247 | 208 | bad_connection:
|
248 |
| - PQfinish(conn->streamConn); |
| 209 | + libpqsrv_disconnect(conn->streamConn); |
249 | 210 | pfree(conn);
|
250 | 211 | return NULL;
|
251 | 212 | }
|
@@ -744,7 +705,7 @@ libpqrcv_PQgetResult(PGconn *streamConn)
|
744 | 705 | static void
|
745 | 706 | libpqrcv_disconnect(WalReceiverConn *conn)
|
746 | 707 | {
|
747 |
| - PQfinish(conn->streamConn); |
| 708 | + libpqsrv_disconnect(conn->streamConn); |
748 | 709 | PQfreemem(conn->recvBuf);
|
749 | 710 | pfree(conn);
|
750 | 711 | }
|
|
0 commit comments