Skip to content

Commit 6f132ed

Browse files
author
Amit Kapila
committed
Allow synced slots to have their inactive_since.
This commit does two things: 1) Maintains inactive_since for sync slots whenever the slot is released just like any other regular slot. 2) Ensures the value is set to the current timestamp during the promotion of standby to help correctly interpret the time after promotion. We don't want the slots to appear inactive for a long time after promotion if they haven't been synchronized recently. This would also avoid the invalidation of such slots immediately after promotion if tomorrow we have a feature that invalidates slots based on their inactivity time. Whoever acquires the slot i.e. makes the slot active will reset it to NULL. Author: Bharath Rupireddy Reviewed-by: Bertrand Drouvot, Amit Kapila, Shveta Malik, Masahiko Sawada Discussion: https://postgr.es/m/CAA4eK1KrPGwfZV9LYGidjxHeW+rxJ=E2ThjXvwRGLO=iLNuo=Q@mail.gmail.com Discussion: https://postgr.es/m/CALj2ACW4aUe-_uFQOjdWCEN-xXoLGhmvRFnL8SNw_TZ5nJe+aw@mail.gmail.com Discussion: https://postgr.es/m/CA+Tgmob_Ta-t2ty8QrKHBGnNLrf4ZYcwhGHGFsuUoFrAEDw4sA@mail.gmail.com
1 parent f98dbde commit 6f132ed

File tree

6 files changed

+160
-39
lines changed

6 files changed

+160
-39
lines changed

doc/src/sgml/system-views.sgml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2530,6 +2530,13 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
25302530
<para>
25312531
The time since the slot has become inactive.
25322532
<literal>NULL</literal> if the slot is currently being used.
2533+
Note that for slots on the standby that are being synced from a
2534+
primary server (whose <structfield>synced</structfield> field is
2535+
<literal>true</literal>), the
2536+
<structfield>inactive_since</structfield> indicates the last
2537+
synchronization (see
2538+
<xref linkend="logicaldecoding-replication-slots-synchronization"/>)
2539+
time.
25332540
</para></entry>
25342541
</row>
25352542

src/backend/replication/logical/slotsync.c

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ typedef struct RemoteSlot
150150
} RemoteSlot;
151151

152152
static void slotsync_failure_callback(int code, Datum arg);
153+
static void update_synced_slots_inactive_since(void);
153154

154155
/*
155156
* If necessary, update the local synced slot's metadata based on the data
@@ -584,6 +585,11 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
584585
* overwriting 'invalidated' flag to remote_slot's value. See
585586
* InvalidatePossiblyObsoleteSlot() where it invalidates slot directly
586587
* if the slot is not acquired by other processes.
588+
*
589+
* XXX: If it ever turns out that slot acquire/release is costly for
590+
* cases when none of the slot properties is changed then we can do a
591+
* pre-check to ensure that at least one of the slot properties is
592+
* changed before acquiring the slot.
587593
*/
588594
ReplicationSlotAcquire(remote_slot->name, true);
589595

@@ -1355,6 +1361,54 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
13551361
Assert(false);
13561362
}
13571363

1364+
/*
1365+
* Update the inactive_since property for synced slots.
1366+
*
1367+
* Note that this function is currently called when we shutdown the slot
1368+
* sync machinery.
1369+
*/
1370+
static void
1371+
update_synced_slots_inactive_since(void)
1372+
{
1373+
TimestampTz now = 0;
1374+
1375+
/*
1376+
* We need to update inactive_since only when we are promoting standby to
1377+
* correctly interpret the inactive_since if the standby gets promoted
1378+
* without a restart. We don't want the slots to appear inactive for a
1379+
* long time after promotion if they haven't been synchronized recently.
1380+
* Whoever acquires the slot i.e.makes the slot active will reset it.
1381+
*/
1382+
if (!StandbyMode)
1383+
return;
1384+
1385+
/* The slot sync worker mustn't be running by now */
1386+
Assert(SlotSyncCtx->pid == InvalidPid);
1387+
1388+
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1389+
1390+
for (int i = 0; i < max_replication_slots; i++)
1391+
{
1392+
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1393+
1394+
/* Check if it is a synchronized slot */
1395+
if (s->in_use && s->data.synced)
1396+
{
1397+
Assert(SlotIsLogical(s));
1398+
1399+
/* Use the same inactive_since time for all the slots. */
1400+
if (now == 0)
1401+
now = GetCurrentTimestamp();
1402+
1403+
SpinLockAcquire(&s->mutex);
1404+
s->inactive_since = now;
1405+
SpinLockRelease(&s->mutex);
1406+
}
1407+
}
1408+
1409+
LWLockRelease(ReplicationSlotControlLock);
1410+
}
1411+
13581412
/*
13591413
* Shut down the slot sync worker.
13601414
*/
@@ -1368,6 +1422,7 @@ ShutDownSlotSync(void)
13681422
if (SlotSyncCtx->pid == InvalidPid)
13691423
{
13701424
SpinLockRelease(&SlotSyncCtx->mutex);
1425+
update_synced_slots_inactive_since();
13711426
return;
13721427
}
13731428
SpinLockRelease(&SlotSyncCtx->mutex);
@@ -1400,6 +1455,8 @@ ShutDownSlotSync(void)
14001455
}
14011456

14021457
SpinLockRelease(&SlotSyncCtx->mutex);
1458+
1459+
update_synced_slots_inactive_since();
14031460
}
14041461

14051462
/*

src/backend/replication/slot.c

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -690,13 +690,10 @@ ReplicationSlotRelease(void)
690690
}
691691

692692
/*
693-
* Set the last inactive time after marking the slot inactive. We don't
694-
* set it for the slots currently being synced from the primary to the
695-
* standby because such slots are typically inactive as decoding is not
696-
* allowed on those.
693+
* Set the time since the slot has become inactive. We get the current
694+
* time beforehand to avoid system call while holding the spinlock.
697695
*/
698-
if (!(RecoveryInProgress() && slot->data.synced))
699-
now = GetCurrentTimestamp();
696+
now = GetCurrentTimestamp();
700697

701698
if (slot->data.persistency == RS_PERSISTENT)
702699
{
@@ -2369,16 +2366,11 @@ RestoreSlotFromDisk(const char *name)
23692366
slot->active_pid = 0;
23702367

23712368
/*
2372-
* We set the last inactive time after loading the slot from the disk
2373-
* into memory. Whoever acquires the slot i.e. makes the slot active
2374-
* will reset it. We don't set it for the slots currently being synced
2375-
* from the primary to the standby because such slots are typically
2376-
* inactive as decoding is not allowed on those.
2369+
* Set the time since the slot has become inactive after loading the
2370+
* slot from the disk into memory. Whoever acquires the slot i.e.
2371+
* makes the slot active will reset it.
23772372
*/
2378-
if (!(RecoveryInProgress() && slot->data.synced))
2379-
slot->inactive_since = GetCurrentTimestamp();
2380-
else
2381-
slot->inactive_since = 0;
2373+
slot->inactive_since = GetCurrentTimestamp();
23822374

23832375
restored = true;
23842376
break;

src/test/perl/PostgreSQL/Test/Cluster.pm

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3276,6 +3276,37 @@ sub create_logical_slot_on_standby
32763276

32773277
=pod
32783278
3279+
=item $node->validate_slot_inactive_since(self, slot_name, reference_time)
3280+
3281+
Validate inactive_since value of a given replication slot against the reference
3282+
time and return it.
3283+
3284+
=cut
3285+
3286+
sub validate_slot_inactive_since
3287+
{
3288+
my ($self, $slot_name, $reference_time) = @_;
3289+
my $name = $self->name;
3290+
3291+
my $inactive_since = $self->safe_psql('postgres',
3292+
qq(SELECT inactive_since FROM pg_replication_slots
3293+
WHERE slot_name = '$slot_name' AND inactive_since IS NOT NULL;)
3294+
);
3295+
3296+
# Check that the inactive_since is sane
3297+
is($self->safe_psql('postgres',
3298+
qq[SELECT '$inactive_since'::timestamptz > to_timestamp(0) AND
3299+
'$inactive_since'::timestamptz > '$reference_time'::timestamptz;]
3300+
),
3301+
't',
3302+
"last inactive time for slot $slot_name is valid on node $name")
3303+
or die "could not validate captured inactive_since for slot $slot_name";
3304+
3305+
return $inactive_since;
3306+
}
3307+
3308+
=pod
3309+
32793310
=item $node->advance_wal(num)
32803311
32813312
Advance WAL of node by given number of segments.

src/test/recovery/t/019_replslot_limit.pl

Lines changed: 2 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -443,7 +443,7 @@
443443
# Get inactive_since value after the slot's creation. Note that the slot is
444444
# still inactive till it's used by the standby below.
445445
my $inactive_since =
446-
capture_and_validate_slot_inactive_since($primary4, $sb4_slot, $slot_creation_time);
446+
$primary4->validate_slot_inactive_since($sb4_slot, $slot_creation_time);
447447

448448
$standby4->start;
449449

@@ -502,7 +502,7 @@
502502
# Get inactive_since value after the slot's creation. Note that the slot is
503503
# still inactive till it's used by the subscriber below.
504504
$inactive_since =
505-
capture_and_validate_slot_inactive_since($publisher4, $lsub4_slot, $slot_creation_time);
505+
$publisher4->validate_slot_inactive_since($lsub4_slot, $slot_creation_time);
506506

507507
$subscriber4->start;
508508
$subscriber4->safe_psql('postgres',
@@ -540,26 +540,4 @@
540540
$publisher4->stop;
541541
$subscriber4->stop;
542542

543-
# Capture and validate inactive_since of a given slot.
544-
sub capture_and_validate_slot_inactive_since
545-
{
546-
my ($node, $slot_name, $slot_creation_time) = @_;
547-
548-
my $inactive_since = $node->safe_psql('postgres',
549-
qq(SELECT inactive_since FROM pg_replication_slots
550-
WHERE slot_name = '$slot_name' AND inactive_since IS NOT NULL;)
551-
);
552-
553-
# Check that the captured time is sane
554-
is( $node->safe_psql(
555-
'postgres',
556-
qq[SELECT '$inactive_since'::timestamptz > to_timestamp(0) AND
557-
'$inactive_since'::timestamptz >= '$slot_creation_time'::timestamptz;]
558-
),
559-
't',
560-
"last inactive time for an active slot $slot_name is sane");
561-
562-
return $inactive_since;
563-
}
564-
565543
done_testing();

src/test/recovery/t/040_standby_failover_slots_sync.pl

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,13 @@
3535
$subscriber1->init;
3636
$subscriber1->start;
3737

38+
# Capture the time before the logical failover slot is created on the
39+
# primary. We later call this publisher as primary anyway.
40+
my $slot_creation_time_on_primary = $publisher->safe_psql(
41+
'postgres', qq[
42+
SELECT current_timestamp;
43+
]);
44+
3845
# Create a slot on the publisher with failover disabled
3946
$publisher->safe_psql('postgres',
4047
"SELECT 'init' FROM pg_create_logical_replication_slot('lsub1_slot', 'pgoutput', false, false, false);"
@@ -174,6 +181,11 @@
174181
"SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active = 'f'",
175182
1);
176183

184+
# Capture the inactive_since of the slot from the primary. Note that the slot
185+
# will be inactive since the corresponding subscription is disabled.
186+
my $inactive_since_on_primary =
187+
$primary->validate_slot_inactive_since('lsub1_slot', $slot_creation_time_on_primary);
188+
177189
# Wait for the standby to catch up so that the standby is not lagging behind
178190
# the subscriber.
179191
$primary->wait_for_replay_catchup($standby1);
@@ -190,6 +202,18 @@
190202
"t",
191203
'logical slots have synced as true on standby');
192204

205+
# Capture the inactive_since of the synced slot on the standby
206+
my $inactive_since_on_standby =
207+
$standby1->validate_slot_inactive_since('lsub1_slot', $slot_creation_time_on_primary);
208+
209+
# Synced slot on the standby must get its own inactive_since
210+
is( $standby1->safe_psql(
211+
'postgres',
212+
"SELECT '$inactive_since_on_primary'::timestamptz < '$inactive_since_on_standby'::timestamptz;"
213+
),
214+
"t",
215+
'synchronized slot has got its own inactive_since');
216+
193217
##################################################
194218
# Test that the synchronized slot will be dropped if the corresponding remote
195219
# slot on the primary server has been dropped.
@@ -237,6 +261,13 @@
237261
$standby1->append_conf('postgresql.conf', 'max_slot_wal_keep_size = -1');
238262
$standby1->reload;
239263

264+
# Capture the time before the logical failover slot is created on the primary.
265+
# Note that the subscription creates the slot again on the primary.
266+
$slot_creation_time_on_primary = $publisher->safe_psql(
267+
'postgres', qq[
268+
SELECT current_timestamp;
269+
]);
270+
240271
# To ensure that restart_lsn has moved to a recent WAL position, we re-create
241272
# the subscription and the logical slot.
242273
$subscriber1->safe_psql(
@@ -257,6 +288,11 @@
257288
"SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active = 'f'",
258289
1);
259290

291+
# Capture the inactive_since of the slot from the primary. Note that the slot
292+
# will be inactive since the corresponding subscription is disabled.
293+
$inactive_since_on_primary =
294+
$primary->validate_slot_inactive_since('lsub1_slot', $slot_creation_time_on_primary);
295+
260296
# Wait for the standby to catch up so that the standby is not lagging behind
261297
# the subscriber.
262298
$primary->wait_for_replay_catchup($standby1);
@@ -808,8 +844,28 @@
808844
$standby1->start;
809845
$primary->wait_for_replay_catchup($standby1);
810846

847+
# Capture the time before the standby is promoted
848+
my $promotion_time_on_primary = $standby1->safe_psql(
849+
'postgres', qq[
850+
SELECT current_timestamp;
851+
]);
852+
811853
$standby1->promote;
812854

855+
# Capture the inactive_since of the synced slot after the promotion.
856+
# The expectation here is that the slot gets its inactive_since as part of the
857+
# promotion. We do this check before the slot is enabled on the new primary
858+
# below, otherwise, the slot gets active setting inactive_since to NULL.
859+
my $inactive_since_on_new_primary =
860+
$standby1->validate_slot_inactive_since('lsub1_slot', $promotion_time_on_primary);
861+
862+
is( $standby1->safe_psql(
863+
'postgres',
864+
"SELECT '$inactive_since_on_new_primary'::timestamptz > '$inactive_since_on_primary'::timestamptz"
865+
),
866+
"t",
867+
'synchronized slot has got its own inactive_since on the new primary after promotion');
868+
813869
# Update subscription with the new primary's connection info
814870
my $standby1_conninfo = $standby1->connstr . ' dbname=postgres';
815871
$subscriber1->safe_psql('postgres',

0 commit comments

Comments
 (0)