Skip to content

Commit 6f7cddc

Browse files
committed
Now that START_REPLICATION returns the next timeline's ID after reaching end
of timeline, take advantage of that in walreceiver. Startup process is still in control of choosign the target timeline, by scanning the timeline history files present in pg_xlog, but walreceiver now uses the next timeline's ID to fetch its history file immediately after it has finished streaming the old timeline. Before, the standby would first try to restart streaming on the old timeline, which fetches the missing timeline history file as a side-effect, and only then restart from the new timeline. This patch eliminates the extra iteration, which speeds up the timeline switch and reduces the noise in the log caused by the extra restart on the old timeline.
1 parent 2ff6555 commit 6f7cddc

File tree

3 files changed

+46
-29
lines changed

3 files changed

+46
-29
lines changed

src/backend/replication/libpqwalreceiver/libpqwalreceiver.c

Lines changed: 37 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ static void libpqrcv_connect(char *conninfo);
5050
static void libpqrcv_identify_system(TimeLineID *primary_tli);
5151
static void libpqrcv_readtimelinehistoryfile(TimeLineID tli, char **filename, char **content, int *len);
5252
static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint);
53-
static void libpqrcv_endstreaming(void);
53+
static void libpqrcv_endstreaming(TimeLineID *next_tli);
5454
static int libpqrcv_receive(int timeout, char **buffer);
5555
static void libpqrcv_send(const char *buffer, int nbytes);
5656
static void libpqrcv_disconnect(void);
@@ -199,10 +199,11 @@ libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint)
199199
}
200200

201201
/*
202-
* Stop streaming WAL data.
202+
* Stop streaming WAL data. Returns the next timeline's ID in *next_tli, as
203+
* reported by the server, or 0 if it did not report it.
203204
*/
204205
static void
205-
libpqrcv_endstreaming(void)
206+
libpqrcv_endstreaming(TimeLineID *next_tli)
206207
{
207208
PGresult *res;
208209

@@ -211,33 +212,42 @@ libpqrcv_endstreaming(void)
211212
(errmsg("could not send end-of-streaming message to primary: %s",
212213
PQerrorMessage(streamConn))));
213214

214-
/* Read the command result after COPY is finished */
215-
216-
while ((res = PQgetResult(streamConn)) != NULL)
215+
/*
216+
* After COPY is finished, we should receive a result set indicating the
217+
* next timeline's ID, or just CommandComplete if the server was shut down.
218+
*
219+
* If we had not yet received CopyDone from the backend, PGRES_COPY_IN
220+
* would also be possible. However, at the moment this function is only
221+
* called after receiving CopyDone from the backend - the walreceiver
222+
* never terminates replication on its own initiative.
223+
*/
224+
res = PQgetResult(streamConn);
225+
if (PQresultStatus(res) == PGRES_TUPLES_OK)
217226
{
218-
/*
219-
* After Copy, if the streaming ended because we reached end of the
220-
* timeline, server sends one result set with the next timeline's ID.
221-
* We don't need it, so just slurp and ignore it.
222-
*
223-
* If we had not yet received CopyDone from the backend, PGRES_COPY_IN
224-
* is also possible. However, at the moment this function is only
225-
* called after receiving CopyDone from the backend - the walreceiver
226-
* never terminates replication on its own initiative.
227-
*/
228-
switch (PQresultStatus(res))
229-
{
230-
case PGRES_COMMAND_OK:
231-
case PGRES_TUPLES_OK:
232-
break;
233-
234-
default:
235-
ereport(ERROR,
236-
(errmsg("error reading result of streaming command: %s",
237-
PQerrorMessage(streamConn))));
238-
}
227+
/* Read the next timeline's ID */
228+
if (PQnfields(res) != 1 || PQntuples(res) != 1)
229+
ereport(ERROR,
230+
(errmsg("unexpected result set after end-of-streaming")));
231+
*next_tli = pg_atoi(PQgetvalue(res, 0, 0), sizeof(uint32), 0);
239232
PQclear(res);
233+
234+
/* the result set should be followed by CommandComplete */
235+
res = PQgetResult(streamConn);
240236
}
237+
else
238+
*next_tli = 0;
239+
240+
if (PQresultStatus(res) != PGRES_COMMAND_OK)
241+
ereport(ERROR,
242+
(errmsg("error reading result of streaming command: %s",
243+
PQerrorMessage(streamConn))));
244+
245+
/* Verify that there are no more results */
246+
res = PQgetResult(streamConn);
247+
if (res != NULL)
248+
ereport(ERROR,
249+
(errmsg("unexpected result after CommandComplete: %s",
250+
PQerrorMessage(streamConn))));
241251
}
242252

243253
/*

src/backend/replication/walreceiver.c

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -505,8 +505,15 @@ WalReceiverMain(void)
505505
* our side, too.
506506
*/
507507
EnableWalRcvImmediateExit();
508-
walrcv_endstreaming();
508+
walrcv_endstreaming(&primaryTLI);
509509
DisableWalRcvImmediateExit();
510+
511+
/*
512+
* If the server had switched to a new timeline that we didn't know
513+
* about when we began streaming, fetch its timeline history file
514+
* now.
515+
*/
516+
WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
510517
}
511518
else
512519
ereport(LOG,

src/include/replication/walreceiver.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ extern PGDLLIMPORT walrcv_readtimelinehistoryfile_type walrcv_readtimelinehistor
128128
typedef bool (*walrcv_startstreaming_type) (TimeLineID tli, XLogRecPtr startpoint);
129129
extern PGDLLIMPORT walrcv_startstreaming_type walrcv_startstreaming;
130130

131-
typedef void (*walrcv_endstreaming_type) (void);
131+
typedef void (*walrcv_endstreaming_type) (TimeLineID *next_tli);
132132
extern PGDLLIMPORT walrcv_endstreaming_type walrcv_endstreaming;
133133

134134
typedef int (*walrcv_receive_type) (int timeout, char **buffer);

0 commit comments

Comments
 (0)