Skip to content

Commit 9a591c1

Browse files
committed
Fix statistics reporting in logical replication workers
This new arrangement ensures that statistics are reported right after commit of transactions. The previous arrangement didn't get this quite right and could lead to assertion failures. Author: Petr Jelinek <petr.jelinek@2ndquadrant.com> Reported-by: Erik Rijkers <er@xs4all.nl>
1 parent b6576e5 commit 9a591c1

File tree

2 files changed

+15
-5
lines changed

2 files changed

+15
-5
lines changed

src/backend/replication/logical/tablesync.c

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
274274
static List *table_states = NIL;
275275
static HTAB *last_start_times = NULL;
276276
ListCell *lc;
277+
bool started_tx = false;
277278

278279
Assert(!IsTransactionState());
279280

@@ -290,6 +291,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
290291
table_states = NIL;
291292

292293
StartTransactionCommand();
294+
started_tx = true;
293295

294296
/* Fetch all non-ready tables. */
295297
rstates = GetSubscriptionNotReadyRelations(MySubscription->oid);
@@ -304,8 +306,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
304306
}
305307
MemoryContextSwitchTo(oldctx);
306308

307-
CommitTransactionCommand();
308-
309309
table_states_valid = true;
310310
}
311311

@@ -350,11 +350,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
350350
{
351351
rstate->state = SUBREL_STATE_READY;
352352
rstate->lsn = current_lsn;
353-
StartTransactionCommand();
353+
if (!started_tx)
354+
{
355+
StartTransactionCommand();
356+
started_tx = true;
357+
}
354358
SetSubscriptionRelState(MyLogicalRepWorker->subid,
355359
rstate->relid, rstate->state,
356360
rstate->lsn);
357-
CommitTransactionCommand();
358361
}
359362
}
360363
else
@@ -457,6 +460,12 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
457460
}
458461
}
459462
}
463+
464+
if (started_tx)
465+
{
466+
CommitTransactionCommand();
467+
pgstat_report_stat(false);
468+
}
460469
}
461470

462471
/*
@@ -806,6 +815,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
806815
MyLogicalRepWorker->relstate,
807816
MyLogicalRepWorker->relstate_lsn);
808817
CommitTransactionCommand();
818+
pgstat_report_stat(false);
809819

810820
/*
811821
* We want to do the table data sync in single

src/backend/replication/logical/worker.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,7 @@ apply_handle_commit(StringInfo s)
453453
replorigin_session_origin_timestamp = commit_data.committime;
454454

455455
CommitTransactionCommand();
456+
pgstat_report_stat(false);
456457

457458
store_flush_position(commit_data.end_lsn);
458459
}
@@ -462,7 +463,6 @@ apply_handle_commit(StringInfo s)
462463
/* Process any tables that are being synchronized in parallel. */
463464
process_syncing_tables(commit_data.end_lsn);
464465

465-
pgstat_report_stat(false);
466466
pgstat_report_activity(STATE_IDLE, NULL);
467467
}
468468

0 commit comments

Comments
 (0)