Skip to content

Commit 06c418e

Browse files
committed
Implement pg_wal_replay_wait() stored procedure
pg_wal_replay_wait() is to be used on standby and specifies waiting for the specific WAL location to be replayed before starting the transaction. This option is useful when the user makes some data changes on primary and needs a guarantee to see these changes on standby. The queue of waiters is stored in the shared memory array sorted by LSN. During replay of WAL waiters whose LSNs are already replayed are deleted from the shared memory array and woken up by setting of their latches. pg_wal_replay_wait() needs to wait without any snapshot held. Otherwise, the snapshot could prevent the replay of WAL records implying a kind of self-deadlock. This is why it is only possible to implement pg_wal_replay_wait() as a procedure working in a non-atomic context, not a function. Catversion is bumped. Discussion: https://postgr.es/m/eb12f9b03851bb2583adab5df9579b4b%40postgrespro.ru Author: Kartyshov Ivan, Alexander Korotkov Reviewed-by: Michael Paquier, Peter Eisentraut, Dilip Kumar, Amit Kapila Reviewed-by: Alexander Lakhin, Bharath Rupireddy, Euler Taveira
1 parent 6faca9a commit 06c418e

File tree

16 files changed

+648
-2
lines changed

16 files changed

+648
-2
lines changed

doc/src/sgml/func.sgml

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28284,6 +28284,119 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
2828428284
the pause, the rate of WAL generation and available disk space.
2828528285
</para>
2828628286

28287+
<para>
28288+
There are also procedures to control the progress of recovery.
28289+
They are shown in <xref linkend="procedures-recovery-control-table"/>.
28290+
These procedures may be executed only during recovery.
28291+
</para>
28292+
28293+
<table id="procedures-recovery-control-table">
28294+
<title>Recovery Control Procedures</title>
28295+
<tgroup cols="1">
28296+
<thead>
28297+
<row>
28298+
<entry role="func_table_entry"><para role="func_signature">
28299+
Procedure
28300+
</para>
28301+
<para>
28302+
Description
28303+
</para></entry>
28304+
</row>
28305+
</thead>
28306+
28307+
<tbody>
28308+
<row>
28309+
<entry role="func_table_entry"><para role="func_signature">
28310+
<indexterm>
28311+
<primary>pg_wal_replay_wait</primary>
28312+
</indexterm>
28313+
<function>pg_wal_replay_wait</function> (
28314+
<parameter>target_lsn</parameter> <type>pg_lsn</type>,
28315+
<parameter>timeout</parameter> <type>bigint</type> <literal>DEFAULT</literal> <literal>0</literal>)
28316+
<returnvalue>void</returnvalue>
28317+
</para>
28318+
<para>
28319+
If <parameter>timeout</parameter> is not specified or zero, this
28320+
procedure returns once WAL is replayed upto
28321+
<literal>target_lsn</literal>.
28322+
If the <parameter>timeout</parameter> is specified (in
28323+
milliseconds) and greater than zero, the procedure waits until the
28324+
server actually replays the WAL upto <literal>target_lsn</literal> or
28325+
until the given time has passed. On timeout, an error is emitted.
28326+
</para></entry>
28327+
</row>
28328+
</tbody>
28329+
</tgroup>
28330+
</table>
28331+
28332+
<para>
28333+
<function>pg_wal_replay_wait</function> waits till
28334+
<parameter>target_lsn</parameter> to be replayed on standby.
28335+
That is, after this function execution, the value returned by
28336+
<function>pg_last_wal_replay_lsn</function> should be greater or equal
28337+
to the <parameter>target_lsn</parameter> value. This is useful to achieve
28338+
read-your-writes-consistency, while using async replica for reads and
28339+
primary for writes. In that case <acronym>lsn</acronym> of the last
28340+
modification should be stored on the client application side or the
28341+
connection pooler side.
28342+
</para>
28343+
28344+
<para>
28345+
You can use <function>pg_wal_replay_wait</function> to wait for
28346+
the <type>pg_lsn</type> value. For example, an application could update
28347+
the <literal>movie</literal> table and get the <acronym>lsn</acronym> after
28348+
changes just made. This example uses <function>pg_current_wal_insert_lsn</function>
28349+
on primary server to get the <acronym>lsn</acronym> given that
28350+
<varname>synchronous_commit</varname> could be set to
28351+
<literal>off</literal>.
28352+
28353+
<programlisting>
28354+
postgres=# UPDATE movie SET genre = 'Dramatic' WHERE genre = 'Drama';
28355+
UPDATE 100
28356+
postgres=# SELECT pg_current_wal_insert_lsn();
28357+
pg_current_wal_insert_lsn
28358+
--------------------
28359+
0/306EE20
28360+
(1 row)
28361+
</programlisting>
28362+
28363+
Then an application could run <function>pg_wal_replay_wait</function>
28364+
with the <acronym>lsn</acronym> obtained from primary. After that the
28365+
changes made of primary should be guaranteed to be visible on replica.
28366+
28367+
<programlisting>
28368+
postgres=# CALL pg_wal_replay_wait('0/306EE20');
28369+
CALL
28370+
postgres=# SELECT * FROM movie WHERE genre = 'Drama';
28371+
genre
28372+
-------
28373+
(0 rows)
28374+
</programlisting>
28375+
28376+
It may also happen that target <acronym>lsn</acronym> is not achieved
28377+
within the timeout. In that case the error is thrown.
28378+
28379+
<programlisting>
28380+
postgres=# CALL pg_wal_replay_wait('0/306EE20', 100);
28381+
ERROR: timed out while waiting for target LSN 0/306EE20 to be replayed; current replay LSN 0/306EA60
28382+
</programlisting>
28383+
28384+
</para>
28385+
28386+
<para>
28387+
<function>pg_wal_replay_wait</function> can't be used within
28388+
the transaction, another procedure or function. Any of them holds a
28389+
snapshot, which could prevent the replay of WAL records.
28390+
28391+
<programlisting>
28392+
postgres=# BEGIN;
28393+
BEGIN
28394+
postgres=*# CALL pg_wal_replay_wait('0/306EE20');
28395+
ERROR: pg_wal_replay_wait() must be only called in non-atomic context
28396+
DETAIL: Make sure pg_wal_replay_wait() isn't called within a transaction, another procedure, or a function.
28397+
</programlisting>
28398+
28399+
</para>
2828728400
</sect2>
2828828401

2828928402
<sect2 id="functions-snapshot-synchronization">

src/backend/access/transam/xlog.c

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
#include "catalog/catversion.h"
6767
#include "catalog/pg_control.h"
6868
#include "catalog/pg_database.h"
69+
#include "commands/waitlsn.h"
6970
#include "common/controldata_utils.h"
7071
#include "common/file_utils.h"
7172
#include "executor/instrument.h"
@@ -6040,6 +6041,12 @@ StartupXLOG(void)
60406041
UpdateControlFile();
60416042
LWLockRelease(ControlFileLock);
60426043

6044+
/*
6045+
* Wake up all waiters for replay LSN. They need to report an error that
6046+
* recovery was ended before achieving the target LSN.
6047+
*/
6048+
WaitLSNSetLatches(InvalidXLogRecPtr);
6049+
60436050
/*
60446051
* Shutdown the recovery environment. This must occur after
60456052
* RecoverPreparedTransactions() (see notes in lock_twophase_recover())

src/backend/access/transam/xlogrecovery.c

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
#include "backup/basebackup.h"
4444
#include "catalog/pg_control.h"
4545
#include "commands/tablespace.h"
46+
#include "commands/waitlsn.h"
4647
#include "common/file_utils.h"
4748
#include "miscadmin.h"
4849
#include "pgstat.h"
@@ -1828,6 +1829,16 @@ PerformWalRecovery(void)
18281829
break;
18291830
}
18301831

1832+
/*
1833+
* If we replayed an LSN that someone was waiting for then walk
1834+
* over the shared memory array and set latches to notify the
1835+
* waiters.
1836+
*/
1837+
if (waitLSN &&
1838+
(XLogRecoveryCtl->lastReplayedEndRecPtr >=
1839+
pg_atomic_read_u64(&waitLSN->minLSN)))
1840+
WaitLSNSetLatches(XLogRecoveryCtl->lastReplayedEndRecPtr);
1841+
18311842
/* Else, try to fetch the next WAL record */
18321843
record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
18331844
} while (record != NULL);

src/backend/catalog/system_functions.sql

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,9 @@ CREATE OR REPLACE FUNCTION
414414
json_populate_recordset(base anyelement, from_json json, use_json_as_text boolean DEFAULT false)
415415
RETURNS SETOF anyelement LANGUAGE internal STABLE ROWS 100 AS 'json_populate_recordset' PARALLEL SAFE;
416416

417+
CREATE OR REPLACE PROCEDURE pg_wal_replay_wait(target_lsn pg_lsn, timeout int8 DEFAULT 0)
418+
LANGUAGE internal AS 'pg_wal_replay_wait';
419+
417420
CREATE OR REPLACE FUNCTION pg_logical_slot_get_changes(
418421
IN slot_name name, IN upto_lsn pg_lsn, IN upto_nchanges int, VARIADIC options text[] DEFAULT '{}',
419422
OUT lsn pg_lsn, OUT xid xid, OUT data text)

src/backend/commands/Makefile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ OBJS = \
6161
vacuum.o \
6262
vacuumparallel.o \
6363
variable.o \
64-
view.o
64+
view.o \
65+
waitlsn.o
6566

6667
include $(top_srcdir)/src/backend/common.mk

src/backend/commands/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,4 +50,5 @@ backend_sources += files(
5050
'vacuumparallel.c',
5151
'variable.c',
5252
'view.c',
53+
'waitlsn.c',
5354
)

0 commit comments

Comments
 (0)