Skip to content

Commit 56e19d9

Browse files
committed
Don't use on-disk snapshots for exported logical decoding snapshot.
Logical decoding stores historical snapshots on disk, so that logical decoding can restart without having to reconstruct a snapshot from scratch (for which the resources are not guaranteed to be present anymore). These serialized snapshots were also used when creating a new slot via the walsender interface, which can export a "full" snapshot (i.e. one that can read all tables, not just catalog ones). The problem is that the serialized snapshots are only useful for catalogs and not for normal user tables. Thus the use of such a serialized snapshot could result in an inconsistent snapshot being exported, which could lead to queries returning wrong data. This would only happen if logical slots are created while another logical slot already exists. Author: Petr Jelinek Reviewed-By: Andres Freund Discussion: https://postgr.es/m/f37e975c-908f-858e-707f-058d3b1eb214@2ndquadrant.com Backport: 9.4, where logical decoding was introduced.
1 parent 7834d20 commit 56e19d9

File tree

3 files changed

+22
-9
lines changed

3 files changed

+22
-9
lines changed

src/backend/replication/logical/logical.c

+6-3
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ static LogicalDecodingContext *
114114
StartupDecodingContext(List *output_plugin_options,
115115
XLogRecPtr start_lsn,
116116
TransactionId xmin_horizon,
117+
bool need_full_snapshot,
117118
XLogPageReadCB read_page,
118119
LogicalOutputPluginWriterPrepareWrite prepare_write,
119120
LogicalOutputPluginWriterWrite do_write)
@@ -171,7 +172,8 @@ StartupDecodingContext(List *output_plugin_options,
171172

172173
ctx->reorder = ReorderBufferAllocate();
173174
ctx->snapshot_builder =
174-
AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn);
175+
AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn,
176+
need_full_snapshot);
175177

176178
ctx->reorder->private_data = ctx;
177179

@@ -297,7 +299,8 @@ CreateInitDecodingContext(char *plugin,
297299
ReplicationSlotSave();
298300

299301
ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
300-
read_page, prepare_write, do_write);
302+
need_full_snapshot, read_page, prepare_write,
303+
do_write);
301304

302305
/* call output plugin initialization callback */
303306
old_context = MemoryContextSwitchTo(ctx->context);
@@ -386,7 +389,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
386389
}
387390

388391
ctx = StartupDecodingContext(output_plugin_options,
389-
start_lsn, InvalidTransactionId,
392+
start_lsn, InvalidTransactionId, false,
390393
read_page, prepare_write, do_write);
391394

392395
/* call output plugin initialization callback */

src/backend/replication/logical/snapbuild.c

+14-5
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,9 @@ struct SnapBuild
165165
*/
166166
TransactionId initial_xmin_horizon;
167167

168+
/* Indicates if we are building full snapshot or just catalog one .*/
169+
bool building_full_snapshot;
170+
168171
/*
169172
* Snapshot that's valid to see the catalog state seen at this moment.
170173
*/
@@ -281,7 +284,8 @@ static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn);
281284
SnapBuild *
282285
AllocateSnapshotBuilder(ReorderBuffer *reorder,
283286
TransactionId xmin_horizon,
284-
XLogRecPtr start_lsn)
287+
XLogRecPtr start_lsn,
288+
bool need_full_snapshot)
285289
{
286290
MemoryContext context;
287291
MemoryContext oldcontext;
@@ -308,6 +312,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
308312

309313
builder->initial_xmin_horizon = xmin_horizon;
310314
builder->start_decoding_at = start_lsn;
315+
builder->building_full_snapshot = need_full_snapshot;
311316

312317
MemoryContextSwitchTo(oldcontext);
313318

@@ -1245,7 +1250,7 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
12451250
*
12461251
* a) There were no running transactions when the xl_running_xacts record
12471252
* was inserted, jump to CONSISTENT immediately. We might find such a
1248-
* state we were waiting for b) and c).
1253+
* state we were waiting for b) or c).
12491254
*
12501255
* b) Wait for all toplevel transactions that were running to end. We
12511256
* simply track the number of in-progress toplevel transactions and
@@ -1260,7 +1265,10 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
12601265
* at all.
12611266
*
12621267
* c) This (in a previous run) or another decoding slot serialized a
1263-
* snapshot to disk that we can use.
1268+
* snapshot to disk that we can use. Can't use this method for the
1269+
* initial snapshot when slot is being created and needs full snapshot
1270+
* for export or direct use, as that snapshot will only contain catalog
1271+
* modifying transactions.
12641272
* ---
12651273
*/
12661274

@@ -1315,8 +1323,9 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
13151323

13161324
return false;
13171325
}
1318-
/* c) valid on disk state */
1319-
else if (SnapBuildRestore(builder, lsn))
1326+
/* c) valid on disk state and not building full snapshot */
1327+
else if (!builder->building_full_snapshot &&
1328+
SnapBuildRestore(builder, lsn))
13201329
{
13211330
/* there won't be any state to cleanup */
13221331
return false;

src/include/replication/snapbuild.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ struct xl_running_xacts;
5454
extern void CheckPointSnapBuild(void);
5555

5656
extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *cache,
57-
TransactionId xmin_horizon, XLogRecPtr start_lsn);
57+
TransactionId xmin_horizon, XLogRecPtr start_lsn,
58+
bool need_full_snapshot);
5859
extern void FreeSnapshotBuilder(SnapBuild *cache);
5960

6061
extern void SnapBuildSnapDecRefcount(Snapshot snap);

0 commit comments

Comments
 (0)