Skip to content

Commit 9a17be1

Browse files
author
Amit Kapila
committed
Allow upgrades to preserve the full subscription's state.
This feature will allow us to replicate the changes on subscriber nodes after the upgrade. Previously, only the subscription metadata information was preserved. Without the list of relations and their state, it's not possible to re-enable the subscriptions without missing some records as the list of relations can only be refreshed after enabling the subscription (and therefore starting the apply worker). Even if we added a way to refresh the subscription while enabling a publication, we still wouldn't know which relations are new on the publication side, and therefore should be fully synced, and which shouldn't. To preserve the subscription relations, this patch teaches pg_dump to restore the content of pg_subscription_rel from the old cluster by using binary_upgrade_add_sub_rel_state SQL function. This is supported only in binary upgrade mode. The subscription's replication origin is needed to ensure that we don't replicate anything twice. To preserve the replication origins, this patch teaches pg_dump to update the replication origin along with creating a subscription by using binary_upgrade_replorigin_advance SQL function to restore the underlying replication origin remote LSN. This is supported only in binary upgrade mode. pg_upgrade will check that all the subscription relations are in 'i' (init) or in 'r' (ready) state and will error out if that's not the case, logging the reason for the failure. This helps to avoid the risk of any dangling slot or origin after the upgrade. Author: Vignesh C, Julien Rouhaud, Shlok Kyal Reviewed-by: Peter Smith, Masahiko Sawada, Michael Paquier, Amit Kapila, Hayato Kuroda Discussion: https://postgr.es/m/20230217075433.u5mjly4d5cr4hcfe@jrouhaud
1 parent cea89c9 commit 9a17be1

File tree

17 files changed

+1032
-16
lines changed

17 files changed

+1032
-16
lines changed

doc/src/sgml/ref/pgupgrade.sgml

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -456,6 +456,56 @@ make prefix=/usr/local/pgsql.new install
456456

457457
</step>
458458

459+
<step>
460+
<title>Prepare for subscriber upgrades</title>
461+
462+
<para>
463+
Setup the <link linkend="logical-replication-config-subscriber">
464+
subscriber configurations</link> in the new subscriber.
465+
<application>pg_upgrade</application> attempts to migrate subscription
466+
dependencies which includes the subscription's table information present in
467+
<link linkend="catalog-pg-subscription-rel">pg_subscription_rel</link>
468+
system catalog and also the subscription's replication origin. This allows
469+
logical replication on the new subscriber to continue from where the
470+
old subscriber was up to. Migration of subscription dependencies is only
471+
supported when the old cluster is version 17.0 or later. Subscription
472+
dependencies on clusters before version 17.0 will silently be ignored.
473+
</para>
474+
475+
<para>
476+
There are some prerequisites for <application>pg_upgrade</application> to
477+
be able to upgrade the subscriptions. If these are not met an error
478+
will be reported.
479+
</para>
480+
481+
<itemizedlist>
482+
<listitem>
483+
<para>
484+
All the subscription tables in the old subscriber should be in state
485+
<literal>i</literal> (initialize) or <literal>r</literal> (ready). This
486+
can be verified by checking <link linkend="catalog-pg-subscription-rel">pg_subscription_rel</link>.<structfield>srsubstate</structfield>.
487+
</para>
488+
</listitem>
489+
<listitem>
490+
<para>
491+
The replication origin entry corresponding to each of the subscriptions
492+
should exist in the old cluster. This can be found by checking
493+
<link linkend="catalog-pg-subscription">pg_subscription</link> and
494+
<link linkend="catalog-pg-replication-origin">pg_replication_origin</link>
495+
system tables.
496+
</para>
497+
</listitem>
498+
<listitem>
499+
<para>
500+
The new cluster must have
501+
<link linkend="guc-max-replication-slots"><varname>max_replication_slots</varname></link>
502+
configured to a value greater than or equal to the number of
503+
subscriptions present in the old cluster.
504+
</para>
505+
</listitem>
506+
</itemizedlist>
507+
</step>
508+
459509
<step>
460510
<title>Stop both servers</title>
461511

src/backend/catalog/pg_subscription.c

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -228,10 +228,14 @@ textarray_to_stringlist(ArrayType *textarray)
228228

229229
/*
230230
* Add new state record for a subscription table.
231+
*
232+
* If retain_lock is true, then don't release the locks taken in this function.
233+
* We normally release the locks at the end of transaction but in binary-upgrade
234+
* mode, we expect to release those immediately.
231235
*/
232236
void
233237
AddSubscriptionRelState(Oid subid, Oid relid, char state,
234-
XLogRecPtr sublsn)
238+
XLogRecPtr sublsn, bool retain_lock)
235239
{
236240
Relation rel;
237241
HeapTuple tup;
@@ -269,7 +273,15 @@ AddSubscriptionRelState(Oid subid, Oid relid, char state,
269273
heap_freetuple(tup);
270274

271275
/* Cleanup. */
272-
table_close(rel, NoLock);
276+
if (retain_lock)
277+
{
278+
table_close(rel, NoLock);
279+
}
280+
else
281+
{
282+
table_close(rel, RowExclusiveLock);
283+
UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
284+
}
273285
}
274286

275287
/*

src/backend/commands/subscriptioncmds.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -773,7 +773,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
773773
rv->schemaname, rv->relname);
774774

775775
AddSubscriptionRelState(subid, relid, table_state,
776-
InvalidXLogRecPtr);
776+
InvalidXLogRecPtr, true);
777777
}
778778

779779
/*
@@ -943,7 +943,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
943943
{
944944
AddSubscriptionRelState(sub->oid, relid,
945945
copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
946-
InvalidXLogRecPtr);
946+
InvalidXLogRecPtr, true);
947947
ereport(DEBUG1,
948948
(errmsg_internal("table \"%s.%s\" added to subscription \"%s\"",
949949
rv->schemaname, rv->relname, sub->name)));

src/backend/utils/adt/pg_upgrade_support.c

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,24 @@
1111

1212
#include "postgres.h"
1313

14+
#include "access/relation.h"
15+
#include "access/table.h"
1416
#include "catalog/binary_upgrade.h"
1517
#include "catalog/heap.h"
1618
#include "catalog/namespace.h"
19+
#include "catalog/pg_subscription_rel.h"
1720
#include "catalog/pg_type.h"
1821
#include "commands/extension.h"
1922
#include "miscadmin.h"
2023
#include "replication/logical.h"
24+
#include "replication/origin.h"
25+
#include "replication/worker_internal.h"
26+
#include "storage/lmgr.h"
2127
#include "utils/array.h"
2228
#include "utils/builtins.h"
29+
#include "utils/lsyscache.h"
30+
#include "utils/pg_lsn.h"
31+
#include "utils/syscache.h"
2332

2433

2534
#define CHECK_IS_BINARY_UPGRADE \
@@ -305,3 +314,100 @@ binary_upgrade_logical_slot_has_caught_up(PG_FUNCTION_ARGS)
305314

306315
PG_RETURN_BOOL(!found_pending_wal);
307316
}
317+
318+
/*
319+
* binary_upgrade_add_sub_rel_state
320+
*
321+
* Add the relation with the specified relation state to pg_subscription_rel
322+
* catalog.
323+
*/
324+
Datum
325+
binary_upgrade_add_sub_rel_state(PG_FUNCTION_ARGS)
326+
{
327+
Relation subrel;
328+
Relation rel;
329+
Oid subid;
330+
char *subname;
331+
Oid relid;
332+
char relstate;
333+
XLogRecPtr sublsn;
334+
335+
CHECK_IS_BINARY_UPGRADE;
336+
337+
/* We must check these things before dereferencing the arguments */
338+
if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2))
339+
elog(ERROR, "null argument to binary_upgrade_add_sub_rel_state is not allowed");
340+
341+
subname = text_to_cstring(PG_GETARG_TEXT_PP(0));
342+
relid = PG_GETARG_OID(1);
343+
relstate = PG_GETARG_CHAR(2);
344+
sublsn = PG_ARGISNULL(3) ? InvalidXLogRecPtr : PG_GETARG_LSN(3);
345+
346+
subrel = table_open(SubscriptionRelationId, RowExclusiveLock);
347+
subid = get_subscription_oid(subname, false);
348+
rel = relation_open(relid, AccessShareLock);
349+
350+
/*
351+
* Since there are no concurrent ALTER/DROP SUBSCRIPTION commands during
352+
* the upgrade process, and the apply worker (which builds cache based on
353+
* the subscription catalog) is not running, the locks can be released
354+
* immediately.
355+
*/
356+
AddSubscriptionRelState(subid, relid, relstate, sublsn, false);
357+
relation_close(rel, AccessShareLock);
358+
table_close(subrel, RowExclusiveLock);
359+
360+
PG_RETURN_VOID();
361+
}
362+
363+
/*
364+
* binary_upgrade_replorigin_advance
365+
*
366+
* Update the remote_lsn for the subscriber's replication origin.
367+
*/
368+
Datum
369+
binary_upgrade_replorigin_advance(PG_FUNCTION_ARGS)
370+
{
371+
Relation rel;
372+
Oid subid;
373+
char *subname;
374+
char originname[NAMEDATALEN];
375+
RepOriginId node;
376+
XLogRecPtr remote_commit;
377+
378+
CHECK_IS_BINARY_UPGRADE;
379+
380+
/*
381+
* We must ensure a non-NULL subscription name before dereferencing the
382+
* arguments.
383+
*/
384+
if (PG_ARGISNULL(0))
385+
elog(ERROR, "null argument to binary_upgrade_replorigin_advance is not allowed");
386+
387+
subname = text_to_cstring(PG_GETARG_TEXT_PP(0));
388+
remote_commit = PG_ARGISNULL(1) ? InvalidXLogRecPtr : PG_GETARG_LSN(1);
389+
390+
rel = table_open(SubscriptionRelationId, RowExclusiveLock);
391+
subid = get_subscription_oid(subname, false);
392+
393+
ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
394+
395+
/* Lock to prevent the replication origin from vanishing */
396+
LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
397+
node = replorigin_by_name(originname, false);
398+
399+
/*
400+
* The server will be stopped after setting up the objects in the new
401+
* cluster and the origins will be flushed during the shutdown checkpoint.
402+
* This will ensure that the latest LSN values for origin will be
403+
* available after the upgrade.
404+
*/
405+
replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
406+
false /* backward */ ,
407+
false /* WAL log */ );
408+
409+
UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
410+
table_close(rel, RowExclusiveLock);
411+
412+
PG_RETURN_VOID();
413+
}

src/bin/pg_dump/common.c

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include "catalog/pg_operator_d.h"
2525
#include "catalog/pg_proc_d.h"
2626
#include "catalog/pg_publication_d.h"
27+
#include "catalog/pg_subscription_d.h"
2728
#include "catalog/pg_type_d.h"
2829
#include "common/hashfn.h"
2930
#include "fe_utils/string_utils.h"
@@ -265,6 +266,9 @@ getSchemaData(Archive *fout, int *numTablesPtr)
265266
pg_log_info("reading subscriptions");
266267
getSubscriptions(fout);
267268

269+
pg_log_info("reading subscription membership of tables");
270+
getSubscriptionTables(fout);
271+
268272
free(inhinfo); /* not needed any longer */
269273

270274
*numTablesPtr = numTables;
@@ -978,6 +982,24 @@ findPublicationByOid(Oid oid)
978982
return (PublicationInfo *) dobj;
979983
}
980984

985+
/*
986+
* findSubscriptionByOid
987+
* finds the DumpableObject for the subscription with the given oid
988+
* returns NULL if not found
989+
*/
990+
SubscriptionInfo *
991+
findSubscriptionByOid(Oid oid)
992+
{
993+
CatalogId catId;
994+
DumpableObject *dobj;
995+
996+
catId.tableoid = SubscriptionRelationId;
997+
catId.oid = oid;
998+
dobj = findObjectByCatalogId(catId);
999+
Assert(dobj == NULL || dobj->objType == DO_SUBSCRIPTION);
1000+
return (SubscriptionInfo *) dobj;
1001+
}
1002+
9811003

9821004
/*
9831005
* recordExtensionMembership

0 commit comments

Comments
 (0)