Skip to content

Commit 208c5d6

Browse files
author
Amit Kapila
committed
Add ALTER SUBSCRIPTION ... SKIP.
This feature allows skipping the transaction on subscriber nodes. If incoming change violates any constraint, logical replication stops until it's resolved. Currently, users need to either manually resolve the conflict by updating a subscriber-side database or by using function pg_replication_origin_advance() to skip the conflicting transaction. This commit introduces a simpler way to skip the conflicting transactions. The user can specify LSN by ALTER SUBSCRIPTION ... SKIP (lsn = XXX), which allows the apply worker to skip the transaction finished at specified LSN. The apply worker skips all data modification changes within the transaction. Author: Masahiko Sawada Reviewed-by: Takamichi Osumi, Hou Zhijie, Peter Eisentraut, Amit Kapila, Shi Yu, Vignesh C, Greg Nancarrow, Haiying Tang, Euler Taveira Discussion: https://postgr.es/m/CAD21AoDeScrsHhLyEPYqN3sydg6PxAPVBboK=30xJfUVihNZDA@mail.gmail.com
1 parent 315ae75 commit 208c5d6

File tree

18 files changed

+665
-173
lines changed

18 files changed

+665
-173
lines changed

doc/src/sgml/catalogs.sgml

+10
Original file line numberDiff line numberDiff line change
@@ -7797,6 +7797,16 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
77977797
</para></entry>
77987798
</row>
77997799

7800+
<row>
7801+
<entry role="catalog_table_entry"><para role="column_definition">
7802+
<structfield>subskiplsn</structfield> <type>pg_lsn</type>
7803+
</para>
7804+
<para>
7805+
Finish LSN of the transaction whose changes are to be skipped, if a valid
7806+
LSN; otherwise <literal>0/0</literal>.
7807+
</para></entry>
7808+
</row>
7809+
78007810
<row>
78017811
<entry role="catalog_table_entry"><para role="column_definition">
78027812
<structfield>subconninfo</structfield> <type>text</type>

doc/src/sgml/logical-replication.sgml

+16-11
Original file line numberDiff line numberDiff line change
@@ -362,19 +362,24 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER
362362
</screen>
363363
The LSN of the transaction that contains the change violating the constraint and
364364
the replication origin name can be found from the server log (LSN 0/14C0378 and
365-
replication origin <literal>pg_16395</literal> in the above case). To skip the
366-
transaction, the subscription needs to be disabled temporarily by
367-
<command>ALTER SUBSCRIPTION ... DISABLE</command> first or alternatively, the
365+
replication origin <literal>pg_16395</literal> in the above case). The
366+
transaction that produces conflict can be skipped by using
367+
<command>ALTER SUBSCRIPTION ... SKIP</command> with the finish LSN
368+
(i.e., LSN 0/14C0378). The finish LSN could be an LSN at which the transaction
369+
is committed or prepared on the publisher. Alternatively, the transaction can
370+
also be skipped by calling the <link linkend="pg-replication-origin-advance">
371+
<function>pg_replication_origin_advance()</function></link> function
372+
transaction. Before using this function, the subscription needs to be disabled
373+
temporarily either by <command>ALTER SUBSCRIPTION ... DISABLE</command> or, the
368374
subscription can be used with the <literal>disable_on_error</literal> option.
369-
Then, the transaction can be skipped by calling the
370-
<link linkend="pg-replication-origin-advance">
371-
<function>pg_replication_origin_advance()</function></link> function with
372-
the <parameter>node_name</parameter> (i.e., <literal>pg_16395</literal>) and the
373-
next LSN of the transaction's LSN (i.e., LSN 0/14C0379). After that the replication
374-
can be resumed by <command>ALTER SUBSCRIPTION ... ENABLE</command>. The current
375-
position of origins can be seen in the
376-
<link linkend="view-pg-replication-origin-status">
375+
Then, you can use <function>pg_replication_origin_advance()</function> function
376+
with the <parameter>node_name</parameter> (i.e., <literal>pg_16395</literal>)
377+
and the next LSN of the finish LSN (i.e., 0/14C0379). The current position of
378+
origins can be seen in the <link linkend="view-pg-replication-origin-status">
377379
<structname>pg_replication_origin_status</structname></link> system view.
380+
Please note that skipping the whole transaction include skipping changes that
381+
might not violate any constraint. This can easily make the subscriber
382+
inconsistent.
378383
</para>
379384
</sect1>
380385

doc/src/sgml/ref/alter_subscription.sgml

+42
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> REFRESH PUB
2929
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> ENABLE
3030
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> DISABLE
3131
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SET ( <replaceable class="parameter">subscription_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] )
32+
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SKIP ( <replaceable class="parameter">skip_option</replaceable> = <replaceable class="parameter">value</replaceable> )
3233
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> OWNER TO { <replaceable>new_owner</replaceable> | CURRENT_ROLE | CURRENT_USER | SESSION_USER }
3334
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <replaceable>new_name</replaceable>
3435
</synopsis>
@@ -210,6 +211,47 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
210211
</listitem>
211212
</varlistentry>
212213

214+
<varlistentry>
215+
<term><literal>SKIP ( <replaceable class="parameter">skip_option</replaceable> = <replaceable class="parameter">value</replaceable> )</literal></term>
216+
<listitem>
217+
<para>
218+
Skips applying all changes of the remote transaction. If incoming data
219+
violates any constraints, logical replication will stop until it is
220+
resolved. By using <command>ALTER SUBSCRIPTION ... SKIP</command> command,
221+
the logical replication worker skips all data modification changes within
222+
the transaction. This option has no effect on the transactions that are
223+
already prepared by enabling <literal>two_phase</literal> on
224+
subscriber.
225+
After logical replication worker successfully skips the transaction or
226+
finishes a transaction, LSN (stored in
227+
<structname>pg_subscription</structname>.<structfield>subskiplsn</structfield>)
228+
is cleared. See <xref linkend="logical-replication-conflicts"/> for
229+
the details of logical replication conflicts. Using this command requires
230+
superuser privilege.
231+
</para>
232+
233+
<para>
234+
<replaceable>skip_option</replaceable> specifies options for this operation.
235+
The supported option is:
236+
237+
<variablelist>
238+
<varlistentry>
239+
<term><literal>lsn</literal> (<type>pg_lsn</type>)</term>
240+
<listitem>
241+
<para>
242+
Specifies the finish LSN of the remote transaction whose changes
243+
are to be skipped by the logical replication worker. The finish LSN
244+
is the LSN at which the transaction is either committed or prepared.
245+
Skipping individual subtransaction is not supported. Setting
246+
<literal>NONE</literal> resets the LSN.
247+
</para>
248+
</listitem>
249+
</varlistentry>
250+
</variablelist>
251+
</para>
252+
</listitem>
253+
</varlistentry>
254+
213255
<varlistentry>
214256
<term><replaceable class="parameter">new_owner</replaceable></term>
215257
<listitem>

src/backend/catalog/pg_subscription.c

+1
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ GetSubscription(Oid subid, bool missing_ok)
7070
sub->stream = subform->substream;
7171
sub->twophasestate = subform->subtwophasestate;
7272
sub->disableonerr = subform->subdisableonerr;
73+
sub->skiplsn = subform->subskiplsn;
7374

7475
/* Get conninfo */
7576
datum = SysCacheGetAttr(SUBSCRIPTIONOID,

src/backend/catalog/system_views.sql

+1-1
Original file line numberDiff line numberDiff line change
@@ -1261,7 +1261,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
12611261
-- All columns of pg_subscription except subconninfo are publicly readable.
12621262
REVOKE ALL ON pg_subscription FROM public;
12631263
GRANT SELECT (oid, subdbid, subname, subowner, subenabled, subbinary,
1264-
substream, subtwophasestate, subdisableonerr, subslotname,
1264+
substream, subtwophasestate, subdisableonerr, subskiplsn, subslotname,
12651265
subsynccommit, subpublications)
12661266
ON pg_subscription TO public;
12671267

src/backend/commands/subscriptioncmds.c

+73
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
#include "utils/guc.h"
4646
#include "utils/lsyscache.h"
4747
#include "utils/memutils.h"
48+
#include "utils/pg_lsn.h"
4849
#include "utils/syscache.h"
4950

5051
/*
@@ -62,6 +63,7 @@
6263
#define SUBOPT_STREAMING 0x00000100
6364
#define SUBOPT_TWOPHASE_COMMIT 0x00000200
6465
#define SUBOPT_DISABLE_ON_ERR 0x00000400
66+
#define SUBOPT_LSN 0x00000800
6567

6668
/* check if the 'val' has 'bits' set */
6769
#define IsSet(val, bits) (((val) & (bits)) == (bits))
@@ -84,6 +86,7 @@ typedef struct SubOpts
8486
bool streaming;
8587
bool twophase;
8688
bool disableonerr;
89+
XLogRecPtr lsn;
8790
} SubOpts;
8891

8992
static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
@@ -262,6 +265,33 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
262265
opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
263266
opts->disableonerr = defGetBoolean(defel);
264267
}
268+
else if (IsSet(supported_opts, SUBOPT_LSN) &&
269+
strcmp(defel->defname, "lsn") == 0)
270+
{
271+
char *lsn_str = defGetString(defel);
272+
XLogRecPtr lsn;
273+
274+
if (IsSet(opts->specified_opts, SUBOPT_LSN))
275+
errorConflictingDefElem(defel, pstate);
276+
277+
/* Setting lsn = NONE is treated as resetting LSN */
278+
if (strcmp(lsn_str, "none") == 0)
279+
lsn = InvalidXLogRecPtr;
280+
else
281+
{
282+
/* Parse the argument as LSN */
283+
lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
284+
CStringGetDatum(lsn_str)));
285+
286+
if (XLogRecPtrIsInvalid(lsn))
287+
ereport(ERROR,
288+
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
289+
errmsg("invalid WAL location (LSN): %s", lsn_str)));
290+
}
291+
292+
opts->specified_opts |= SUBOPT_LSN;
293+
opts->lsn = lsn;
294+
}
265295
else
266296
ereport(ERROR,
267297
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -479,6 +509,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
479509
LOGICALREP_TWOPHASE_STATE_PENDING :
480510
LOGICALREP_TWOPHASE_STATE_DISABLED);
481511
values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
512+
values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
482513
values[Anum_pg_subscription_subconninfo - 1] =
483514
CStringGetTextDatum(conninfo);
484515
if (opts.slot_name)
@@ -1106,6 +1137,48 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
11061137
break;
11071138
}
11081139

1140+
case ALTER_SUBSCRIPTION_SKIP:
1141+
{
1142+
parse_subscription_options(pstate, stmt->options, SUBOPT_LSN, &opts);
1143+
1144+
/* ALTER SUBSCRIPTION ... SKIP supports only LSN option */
1145+
Assert(IsSet(opts.specified_opts, SUBOPT_LSN));
1146+
1147+
if (!superuser())
1148+
ereport(ERROR,
1149+
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1150+
errmsg("must be superuser to skip transaction")));
1151+
1152+
/*
1153+
* If the user sets subskiplsn, we do a sanity check to make
1154+
* sure that the specified LSN is a probable value.
1155+
*/
1156+
if (!XLogRecPtrIsInvalid(opts.lsn))
1157+
{
1158+
RepOriginId originid;
1159+
char originname[NAMEDATALEN];
1160+
XLogRecPtr remote_lsn;
1161+
1162+
snprintf(originname, sizeof(originname), "pg_%u", subid);
1163+
originid = replorigin_by_name(originname, false);
1164+
remote_lsn = replorigin_get_progress(originid, false);
1165+
1166+
/* Check the given LSN is at least a future LSN */
1167+
if (!XLogRecPtrIsInvalid(remote_lsn) && opts.lsn < remote_lsn)
1168+
ereport(ERROR,
1169+
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1170+
errmsg("skip WAL location (LSN %X/%X) must be greater than origin LSN %X/%X",
1171+
LSN_FORMAT_ARGS(opts.lsn),
1172+
LSN_FORMAT_ARGS(remote_lsn))));
1173+
}
1174+
1175+
values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(opts.lsn);
1176+
replaces[Anum_pg_subscription_subskiplsn - 1] = true;
1177+
1178+
update_tuple = true;
1179+
break;
1180+
}
1181+
11091182
default:
11101183
elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
11111184
stmt->kind);

src/backend/parser/gram.y

+9
Original file line numberDiff line numberDiff line change
@@ -9983,6 +9983,15 @@ AlterSubscriptionStmt:
99839983
(Node *)makeBoolean(false), @1));
99849984
$$ = (Node *)n;
99859985
}
9986+
| ALTER SUBSCRIPTION name SKIP definition
9987+
{
9988+
AlterSubscriptionStmt *n =
9989+
makeNode(AlterSubscriptionStmt);
9990+
n->kind = ALTER_SUBSCRIPTION_SKIP;
9991+
n->subname = $3;
9992+
n->options = $5;
9993+
$$ = (Node *)n;
9994+
}
99869995
;
99879996

99889997
/*****************************************************************************

0 commit comments

Comments
 (0)