Skip to content

Commit 2bef06d

Browse files
committed
Preserve required !catalog tuples while computing initial decoding snapshot.
The logical decoding machinery already preserved all the required catalog tuples, which is sufficient in the course of normal logical decoding, but did not guarantee that non-catalog tuples were preserved during computation of the initial snapshot when creating a slot over the replication protocol. This could cause a corrupted initial snapshot being exported. The time window for issues is usually not terribly large, but on a busy server it's perfectly possible to it hit it. Ongoing decoding is not affected by this bug. To avoid increased overhead for the SQL API, only retain additional tuples when a logical slot is being created over the replication protocol. To do so this commit changes the signature of CreateInitDecodingContext(), but it seems unlikely that it's being used in an extension, so that's probably ok. In a drive-by fix, fix handling of ReplicationSlotsComputeRequiredXmin's already_locked argument, which should only apply to ProcArrayLock, not ReplicationSlotControlLock. Reported-By: Erik Rijkers Analyzed-By: Petr Jelinek Author: Petr Jelinek, heavily editorialized by Andres Freund Reviewed-By: Andres Freund Discussion: https://postgr.es/m/9a897b86-46e1-9915-ee4c-da02e4ff6a95@2ndquadrant.com Backport: 9.4, where logical decoding was introduced.
1 parent fa31b6f commit 2bef06d

File tree

8 files changed

+66
-18
lines changed

8 files changed

+66
-18
lines changed

src/backend/replication/logical/logical.c

+17-8
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ StartupDecodingContext(List *output_plugin_options,
210210
LogicalDecodingContext *
211211
CreateInitDecodingContext(char *plugin,
212212
List *output_plugin_options,
213+
bool need_full_snapshot,
213214
XLogPageReadCB read_page,
214215
LogicalOutputPluginWriterPrepareWrite prepare_write,
215216
LogicalOutputPluginWriterWrite do_write)
@@ -267,23 +268,31 @@ CreateInitDecodingContext(char *plugin,
267268
* the slot machinery about the new limit. Once that's done the
268269
* ProcArrayLock can be released as the slot machinery now is
269270
* protecting against vacuum.
271+
*
272+
* Note that, temporarily, the data, not just the catalog, xmin has to be
273+
* reserved if a data snapshot is to be exported. Otherwise the initial
274+
* data snapshot created here is not guaranteed to be valid. After that
275+
* the data xmin doesn't need to be managed anymore and the global xmin
276+
* should be recomputed. As we are fine with losing the pegged data xmin
277+
* after crash - no chance a snapshot would get exported anymore - we can
278+
* get away with just setting the slot's
279+
* effective_xmin. ReplicationSlotRelease will reset it again.
280+
*
270281
* ----
271282
*/
272283
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
273284

274-
slot->effective_catalog_xmin = GetOldestSafeDecodingTransactionId();
275-
slot->data.catalog_xmin = slot->effective_catalog_xmin;
285+
xmin_horizon = GetOldestSafeDecodingTransactionId(need_full_snapshot);
286+
287+
slot->effective_catalog_xmin = xmin_horizon;
288+
slot->data.catalog_xmin = xmin_horizon;
289+
if (need_full_snapshot)
290+
slot->effective_xmin = xmin_horizon;
276291

277292
ReplicationSlotsComputeRequiredXmin(true);
278293

279294
LWLockRelease(ProcArrayLock);
280295

281-
/*
282-
* tell the snapshot builder to only assemble snapshot once reaching the
283-
* running_xact's record with the respective xmin.
284-
*/
285-
xmin_horizon = slot->data.catalog_xmin;
286-
287296
ReplicationSlotMarkDirty();
288297
ReplicationSlotSave();
289298

src/backend/replication/logical/snapbuild.c

+12
Original file line numberDiff line numberDiff line change
@@ -533,6 +533,18 @@ SnapBuildInitialSnapshot(SnapBuild *builder)
533533
* mechanism. Due to that we can do this without locks, we're only
534534
* changing our own value.
535535
*/
536+
#ifdef USE_ASSERT_CHECKING
537+
{
538+
TransactionId safeXid;
539+
540+
LWLockAcquire(ProcArrayLock, LW_SHARED);
541+
safeXid = GetOldestSafeDecodingTransactionId(true);
542+
LWLockRelease(ProcArrayLock);
543+
544+
Assert(TransactionIdPrecedesOrEquals(safeXid, snap->xmin));
545+
}
546+
#endif
547+
536548
MyPgXact->xmin = snap->xmin;
537549

538550
/* allocate in transaction context */

src/backend/replication/slot.c

+21-4
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,22 @@ ReplicationSlotRelease(void)
398398
SpinLockRelease(&slot->mutex);
399399
}
400400

401+
402+
/*
403+
* If slot needed to temporarily restrain both data and catalog xmin to
404+
* create the catalog snapshot, remove that temporary constraint.
405+
* Snapshots can only be exported while the initial snapshot is still
406+
* acquired.
407+
*/
408+
if (!TransactionIdIsValid(slot->data.xmin) &&
409+
TransactionIdIsValid(slot->effective_xmin))
410+
{
411+
SpinLockAcquire(&slot->mutex);
412+
slot->effective_xmin = InvalidTransactionId;
413+
SpinLockRelease(&slot->mutex);
414+
ReplicationSlotsComputeRequiredXmin(false);
415+
}
416+
401417
MyReplicationSlot = NULL;
402418

403419
/* might not have been set when we've been a plain slot */
@@ -612,6 +628,9 @@ ReplicationSlotPersist(void)
612628

613629
/*
614630
* Compute the oldest xmin across all slots and store it in the ProcArray.
631+
*
632+
* If already_locked is true, ProcArrayLock has already been acquired
633+
* exclusively.
615634
*/
616635
void
617636
ReplicationSlotsComputeRequiredXmin(bool already_locked)
@@ -622,8 +641,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
622641

623642
Assert(ReplicationSlotCtl != NULL);
624643

625-
if (!already_locked)
626-
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
644+
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
627645

628646
for (i = 0; i < max_replication_slots; i++)
629647
{
@@ -652,8 +670,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
652670
agg_catalog_xmin = effective_catalog_xmin;
653671
}
654672

655-
if (!already_locked)
656-
LWLockRelease(ReplicationSlotControlLock);
673+
LWLockRelease(ReplicationSlotControlLock);
657674

658675
ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
659676
}

src/backend/replication/slotfuncs.c

+2-2
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
131131
/*
132132
* Create logical decoding context, to build the initial snapshot.
133133
*/
134-
ctx = CreateInitDecodingContext(
135-
NameStr(*plugin), NIL,
134+
ctx = CreateInitDecodingContext(NameStr(*plugin), NIL,
135+
false, /* do not build snapshot */
136136
logical_read_local_xlog_page, NULL, NULL);
137137

138138
/* build initial snapshot, might take a while */

src/backend/replication/walsender.c

+1
Original file line numberDiff line numberDiff line change
@@ -909,6 +909,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
909909
}
910910

911911
ctx = CreateInitDecodingContext(cmd->plugin, NIL,
912+
true, /* build snapshot */
912913
logical_read_xlog_page,
913914
WalSndPrepareWrite, WalSndWriteData);
914915

src/backend/storage/ipc/procarray.c

+11-3
Original file line numberDiff line numberDiff line change
@@ -2151,7 +2151,7 @@ GetOldestActiveTransactionId(void)
21512151
* that the caller will immediately use the xid to peg the xmin horizon.
21522152
*/
21532153
TransactionId
2154-
GetOldestSafeDecodingTransactionId(void)
2154+
GetOldestSafeDecodingTransactionId(bool catalogOnly)
21552155
{
21562156
ProcArrayStruct *arrayP = procArray;
21572157
TransactionId oldestSafeXid;
@@ -2174,9 +2174,17 @@ GetOldestSafeDecodingTransactionId(void)
21742174
/*
21752175
* If there's already a slot pegging the xmin horizon, we can start with
21762176
* that value, it's guaranteed to be safe since it's computed by this
2177-
* routine initially and has been enforced since.
2177+
* routine initially and has been enforced since. We can always use the
2178+
* slot's general xmin horizon, but the catalog horizon is only usable
2179+
* when we only catalog data is going to be looked at.
21782180
*/
2179-
if (TransactionIdIsValid(procArray->replication_slot_catalog_xmin) &&
2181+
if (TransactionIdIsValid(procArray->replication_slot_xmin) &&
2182+
TransactionIdPrecedes(procArray->replication_slot_xmin,
2183+
oldestSafeXid))
2184+
oldestSafeXid = procArray->replication_slot_xmin;
2185+
2186+
if (catalogOnly &&
2187+
TransactionIdIsValid(procArray->replication_slot_catalog_xmin) &&
21802188
TransactionIdPrecedes(procArray->replication_slot_catalog_xmin,
21812189
oldestSafeXid))
21822190
oldestSafeXid = procArray->replication_slot_catalog_xmin;

src/include/replication/logical.h

+1
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ extern void CheckLogicalDecodingRequirements(void);
8282

8383
extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
8484
List *output_plugin_options,
85+
bool need_full_snapshot,
8586
XLogPageReadCB read_page,
8687
LogicalOutputPluginWriterPrepareWrite prepare_write,
8788
LogicalOutputPluginWriterWrite do_write);

src/include/storage/procarray.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ extern bool TransactionIdIsInProgress(TransactionId xid);
8888
extern bool TransactionIdIsActive(TransactionId xid);
8989
extern TransactionId GetOldestXmin(Relation rel, int flags);
9090
extern TransactionId GetOldestActiveTransactionId(void);
91-
extern TransactionId GetOldestSafeDecodingTransactionId(void);
91+
extern TransactionId GetOldestSafeDecodingTransactionId(bool catalogOnly);
9292

9393
extern VirtualTransactionId *GetVirtualXIDsDelayingChkpt(int *nvxids);
9494
extern bool HaveVirtualXIDsDelayingChkpt(VirtualTransactionId *vxids, int nvxids);

0 commit comments

Comments
 (0)