Skip to content

Commit c657bc6

Browse files
committed
Apply WaitLSN patch (PGPRO-90)
1 parent 2de2911 commit c657bc6

File tree

14 files changed

+395
-5
lines changed

14 files changed

+395
-5
lines changed

doc/src/sgml/ref/allfiles.sgml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ Complete list of usable sgml source files in this directory.
172172
<!ENTITY update SYSTEM "update.sgml">
173173
<!ENTITY vacuum SYSTEM "vacuum.sgml">
174174
<!ENTITY values SYSTEM "values.sgml">
175+
<!ENTITY waitlsn SYSTEM "waitlsn.sgml">
175176

176177
<!-- applications and utilities -->
177178
<!ENTITY clusterdb SYSTEM "clusterdb.sgml">

doc/src/sgml/ref/waitlsn.sgml

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
<!--
2+
doc/src/sgml/ref/waitlsn.sgml
3+
PostgreSQL documentation
4+
-->
5+
6+
<refentry id="SQL-WAITLSN">
7+
<indexterm zone="sql-waitlsn">
8+
<primary>WAITLSN</primary>
9+
</indexterm>
10+
11+
<refmeta>
12+
<refentrytitle>WAITLSN</refentrytitle>
13+
<manvolnum>7</manvolnum>
14+
<refmiscinfo>SQL - Language Statements</refmiscinfo>
15+
</refmeta>
16+
17+
<refnamediv>
18+
<refname>WAITLSN</refname>
19+
<refpurpose>wait when target <acronym>LSN</> been replayed</refpurpose>
20+
</refnamediv>
21+
22+
<refsynopsisdiv>
23+
<synopsis>
24+
WAITLSN <replaceable class="PARAMETER">'LSN'</replaceable> [ , <replaceable class="PARAMETER">delay</replaceable> ]
25+
</synopsis>
26+
</refsynopsisdiv>
27+
28+
<refsect1>
29+
<title>Description</title>
30+
31+
<para>
32+
The <command>WAITLSN</command> wait till target <acronym>LSN</> will
33+
be replayed with an optional <quote>delay</> (milliseconds by default
34+
infinity) to be wait for LSN to replayed.
35+
</para>
36+
37+
<para>
38+
<command>WAITLSN</command> provides a simple
39+
interprocess <acronym>LSN</> wait mechanism for a backends on slave
40+
in master-slave replication scheme on <productname>PostgreSQL</productname> database.
41+
</para>
42+
</refsect1>
43+
44+
<refsect1>
45+
<title>Parameters</title>
46+
47+
<variablelist>
48+
<varlistentry>
49+
<term><replaceable class="PARAMETER">LSN</replaceable></term>
50+
<listitem>
51+
<para>
52+
Target log sequence number to be wait for.
53+
</para>
54+
</listitem>
55+
</varlistentry>
56+
<varlistentry>
57+
<term><replaceable class="PARAMETER">delay</replaceable></term>
58+
<listitem>
59+
<para>
60+
Time in miliseconds to waiting for LSN to be replayed.
61+
</para>
62+
</listitem>
63+
</varlistentry>
64+
</variablelist>
65+
</refsect1>
66+
67+
<refsect1>
68+
<title>Notes</title>
69+
70+
<para>
71+
Delay time to waiting for LSN to be replayed must be integer. For
72+
default it is infinity. Waiting can be interupped using Ctl+C, or
73+
by Postmaster death.
74+
</para>
75+
</refsect1>
76+
77+
<refsect1>
78+
<title>Examples</title>
79+
80+
<para>
81+
Configure and execute a waitlsn from
82+
<application>psql</application>:
83+
84+
<programlisting>
85+
WAITLSN '0/3F07A6B1', 10000;
86+
NOTICE: LSN is not reached. Try to make bigger delay.
87+
WAITLSN
88+
89+
WAITLSN '0/3F07A611';
90+
WAITLSN
91+
92+
WAITLSN '0/3F0FF791', 500000;
93+
^CCancel request sent
94+
NOTICE: LSN is not reached. Try to make bigger delay.
95+
ERROR: canceling statement due to user request
96+
</programlisting>
97+
</para>
98+
</refsect1>
99+
100+
<refsect1>
101+
<title>Compatibility</title>
102+
103+
<para>
104+
There is no <command>WAITLSN</command> statement in the SQL
105+
standard.
106+
</para>
107+
</refsect1>
108+
</refentry>

doc/src/sgml/reference.sgml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@
200200
&update;
201201
&vacuum;
202202
&values;
203+
&waitlsn;
203204

204205
</reference>
205206

src/backend/access/transam/xlog.c

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
#include "catalog/pg_control.h"
4040
#include "catalog/pg_database.h"
4141
#include "commands/tablespace.h"
42+
#include "commands/waitlsn.h"
4243
#include "miscadmin.h"
4344
#include "pgstat.h"
4445
#include "postmaster/bgwriter.h"
@@ -6949,6 +6950,11 @@ StartupXLOG(void)
69496950
break;
69506951
}
69516952

6953+
/*
6954+
* After update lastReplayedEndRecPtr set Latches in SHMEM array
6955+
*/
6956+
WaitLSNSetLatch();
6957+
69526958
/* Else, try to fetch the next WAL record */
69536959
record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false);
69546960
} while (record != NULL);

src/backend/commands/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,6 @@ OBJS = amcmds.o aggregatecmds.o alter.o analyze.o async.o cluster.o comment.o \
2020
policy.o portalcmds.o prepare.o proclang.o \
2121
schemacmds.o seclabel.o sequence.o tablecmds.o tablespace.o trigger.o \
2222
tsearchcmds.o typecmds.o user.o vacuum.o vacuumlazy.o \
23-
variable.o view.o
23+
variable.o view.o waitlsn.o
2424

2525
include $(top_srcdir)/src/backend/common.mk

src/backend/commands/async.c

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,6 @@
139139
#include "utils/ps_status.h"
140140
#include "utils/timestamp.h"
141141

142-
143142
/*
144143
* Maximum size of a NOTIFY payload, including terminating NULL. This
145144
* must be kept small enough so that a notification message fits on one

src/backend/commands/waitlsn.c

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
/*-------------------------------------------------------------------------
2+
*
3+
* waitlsn.c
4+
* WaitLSN statment: WAITLSN
5+
*
6+
* Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
7+
* Portions Copyright (c) 1994, Regents of the University of California
8+
*
9+
* IDENTIFICATION
10+
* src/backend/commands/waitlsn.c
11+
*
12+
*-------------------------------------------------------------------------
13+
*/
14+
15+
/*-------------------------------------------------------------------------
16+
* Wait for LSN been replayed on slave as of 9.5:
17+
* README
18+
*-------------------------------------------------------------------------
19+
*/
20+
21+
#include "postgres.h"
22+
#include "fmgr.h"
23+
#include "utils/pg_lsn.h"
24+
#include "storage/latch.h"
25+
#include "miscadmin.h"
26+
#include "storage/spin.h"
27+
#include "storage/backendid.h"
28+
#include "access/xact.h"
29+
#include "storage/shmem.h"
30+
#include "storage/ipc.h"
31+
#include "access/xlog_fn.h"
32+
#include "utils/timestamp.h"
33+
#include "storage/pmsignal.h"
34+
#include "access/xlog.h"
35+
#include "access/xlogdefs.h"
36+
#include "commands/waitlsn.h"
37+
38+
39+
/* Latches Own-DisownLatch and AbortCаllBack */
40+
static uint32 GetSHMEMSize(void);
41+
static void WLDisownLatchAbort(XactEvent event, void *arg);
42+
static void WLOwnLatch(void);
43+
static void WLDisownLatch(void);
44+
45+
void _PG_init(void);
46+
47+
/* Shared memory structures */
48+
typedef struct
49+
{
50+
int pid;
51+
volatile slock_t slock;
52+
Latch latch;
53+
} BIDLatch;
54+
55+
typedef struct
56+
{
57+
char dummy;
58+
BIDLatch l_arr[FLEXIBLE_ARRAY_MEMBER];
59+
} GlobState;
60+
61+
static volatile GlobState *state;
62+
bool is_latch_owned = false;
63+
64+
static void
65+
WLOwnLatch(void)
66+
{
67+
SpinLockAcquire(&state->l_arr[MyBackendId].slock);
68+
OwnLatch(&state->l_arr[MyBackendId].latch);
69+
is_latch_owned = true;
70+
state->l_arr[MyBackendId].pid = MyProcPid;
71+
SpinLockRelease(&state->l_arr[MyBackendId].slock);
72+
}
73+
74+
static void
75+
WLDisownLatch(void)
76+
{
77+
SpinLockAcquire(&state->l_arr[MyBackendId].slock);
78+
DisownLatch(&state->l_arr[MyBackendId].latch);
79+
is_latch_owned = false;
80+
state->l_arr[MyBackendId].pid = 0;
81+
SpinLockRelease(&state->l_arr[MyBackendId].slock);
82+
}
83+
84+
/* CallBack function */
85+
static void
86+
WLDisownLatchAbort(XactEvent event, void *arg)
87+
{
88+
if (is_latch_owned && (event == XACT_EVENT_PARALLEL_ABORT ||
89+
event == XACT_EVENT_ABORT))
90+
{
91+
WLDisownLatch();
92+
}
93+
}
94+
95+
/* Module load callback */
96+
void
97+
_PG_init(void)
98+
{
99+
if (!IsUnderPostmaster)
100+
RegisterXactCallback(WLDisownLatchAbort, NULL);
101+
}
102+
103+
static uint32
104+
GetSHMEMSize(void)
105+
{
106+
return offsetof(GlobState, l_arr) + sizeof(BIDLatch) * (MaxConnections+1);
107+
}
108+
109+
void
110+
WaitLSNShmemInit(void)
111+
{
112+
bool found;
113+
uint i;
114+
115+
state = (GlobState *) ShmemInitStruct("pg_wait_lsn",
116+
GetSHMEMSize(),
117+
&found);
118+
if (!found)
119+
{
120+
for (i = 0; i < (MaxConnections+1); i++)
121+
{
122+
state->l_arr[i].pid = 0;
123+
SpinLockInit(&state->l_arr[i].slock);
124+
InitSharedLatch(&state->l_arr[i].latch);
125+
}
126+
}
127+
}
128+
129+
void
130+
WaitLSNSetLatch(void)
131+
{
132+
uint i;
133+
for (i = 0; i < (MaxConnections+1); i++)
134+
{
135+
SpinLockAcquire(&state->l_arr[i].slock);
136+
if (state->l_arr[i].pid != 0)
137+
SetLatch(&state->l_arr[i].latch);
138+
SpinLockRelease(&state->l_arr[i].slock);
139+
}
140+
}
141+
142+
void
143+
WaitLSNUtility(const char *lsn, const int *delay)
144+
{
145+
XLogRecPtr trg_lsn;
146+
XLogRecPtr cur_lsn;
147+
int latch_events;
148+
int tdelay = delay;
149+
TimestampTz timer = GetCurrentTimestamp();
150+
trg_lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in, CStringGetDatum(lsn)));
151+
152+
tdelay *= 1000;
153+
154+
if (delay > 0)
155+
latch_events = WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH;
156+
else
157+
latch_events = WL_LATCH_SET | WL_POSTMASTER_DEATH;
158+
159+
WLOwnLatch();
160+
161+
for (;;)
162+
{
163+
ResetLatch(&state->l_arr[MyBackendId].latch);
164+
cur_lsn = GetXLogReplayRecPtr(NULL);
165+
166+
/* If LSN had been Replayed */
167+
if (trg_lsn <= cur_lsn)
168+
break;
169+
170+
/* If the postmaster dies, finish immediately */
171+
if (!PostmasterIsAlive())
172+
break;
173+
174+
/* If Delay time is over */
175+
if (latch_events & WL_TIMEOUT)
176+
{
177+
tdelay -= (GetCurrentTimestamp() - timer);
178+
if (tdelay <= 0)
179+
break;
180+
timer = GetCurrentTimestamp();
181+
}
182+
183+
/* Tom Lane insists on! Discussion: <1661(dot)1469996911(at)sss(dot)pgh(dot)pa(dot)us> */
184+
CHECK_FOR_INTERRUPTS();
185+
WaitLatch(&state->l_arr[MyBackendId].latch, latch_events, tdelay);
186+
}
187+
188+
WLDisownLatch();
189+
190+
if (trg_lsn > cur_lsn)
191+
elog(NOTICE,"LSN is not reached. Try to make bigger delay.");
192+
}

0 commit comments

Comments
 (0)