Skip to content

Commit e04a9cc

Browse files
committed
Consistency improvements for slot and decoding code.
Change the order of checks in similar functions to be the same; remove a parameter that's not needed anymore; rename a memory context and expand a couple of comments. Per review comments from Amit Kapila
1 parent 4d92b15 commit e04a9cc

File tree

6 files changed

+22
-12
lines changed

6 files changed

+22
-12
lines changed

src/backend/access/transam/xlog.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6270,7 +6270,7 @@ StartupXLOG(void)
62706270
* Initialize replication slots, before there's a chance to remove
62716271
* required resources.
62726272
*/
6273-
StartupReplicationSlots(checkPoint.redo);
6273+
StartupReplicationSlots();
62746274

62756275
/*
62766276
* Startup logical state, needs to be setup now so we have proper data

src/backend/replication/logical/logical.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ StartupDecodingContext(List *output_plugin_options,
125125
slot = MyReplicationSlot;
126126

127127
context = AllocSetContextCreate(CurrentMemoryContext,
128-
"Changeset Extraction Context",
128+
"Logical Decoding Context",
129129
ALLOCSET_DEFAULT_MINSIZE,
130130
ALLOCSET_DEFAULT_INITSIZE,
131131
ALLOCSET_DEFAULT_MAXSIZE);

src/backend/replication/slot.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -829,7 +829,7 @@ CheckPointReplicationSlots(void)
829829
* needs to be run before we start crash recovery.
830830
*/
831831
void
832-
StartupReplicationSlots(XLogRecPtr checkPointRedo)
832+
StartupReplicationSlots(void)
833833
{
834834
DIR *replication_dir;
835835
struct dirent *replication_de;

src/backend/replication/slotfuncs.c

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,15 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
4646
HeapTuple tuple;
4747
Datum result;
4848

49-
check_permissions();
50-
51-
CheckSlotRequirements();
49+
Assert(!MyReplicationSlot);
5250

5351
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
5452
elog(ERROR, "return type must be a row type");
5553

54+
check_permissions();
55+
56+
CheckSlotRequirements();
57+
5658
/* acquire replication slot, this will check for conflicting names */
5759
ReplicationSlotCreate(NameStr(*name), false, RS_PERSISTENT);
5860

@@ -87,17 +89,20 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
8789
Datum values[2];
8890
bool nulls[2];
8991

92+
Assert(!MyReplicationSlot);
93+
9094
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
9195
elog(ERROR, "return type must be a row type");
9296

9397
check_permissions();
9498

9599
CheckLogicalDecodingRequirements();
96100

97-
Assert(!MyReplicationSlot);
98-
99101
/*
100-
* Acquire a logical decoding slot, this will check for conflicting names.
102+
* Acquire a logical decoding slot, this will check for conflicting
103+
* names. Initially create it as ephemeral - that allows us to nicely
104+
* handle errors during initialization because it'll get dropped if this
105+
* transaction fails. We'll make it persistent at the end.
101106
*/
102107
ReplicationSlotCreate(NameStr(*name), true, RS_EPHEMERAL);
103108

src/backend/replication/walsender.c

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -781,6 +781,11 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
781781
else
782782
{
783783
CheckLogicalDecodingRequirements();
784+
/*
785+
* Initially create the slot as ephemeral - that allows us to nicely
786+
* handle errors during initialization because it'll get dropped if
787+
* this transaction fails. We'll make it persistent at the end.
788+
*/
784789
ReplicationSlotCreate(cmd->slotname, true, RS_EPHEMERAL);
785790
}
786791

@@ -1682,8 +1687,8 @@ ProcessStandbyHSFeedbackMessage(void)
16821687
* If we're using a replication slot we reserve the xmin via that,
16831688
* otherwise via the walsender's PGXACT entry.
16841689
*
1685-
* XXX: It might make sense to introduce ephemeral slots and always use
1686-
* the slot mechanism.
1690+
* XXX: It might make sense to generalize the ephemeral slot concept and
1691+
* always use the slot mechanism to handle the feedback xmin.
16871692
*/
16881693
if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
16891694
PhysicalReplicationSlotNewXmin(feedbackXmin);

src/include/replication/slot.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ extern void ReplicationSlotsComputeRequiredLSN(void);
164164
extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
165165
extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
166166

167-
extern void StartupReplicationSlots(XLogRecPtr checkPointRedo);
167+
extern void StartupReplicationSlots(void);
168168
extern void CheckPointReplicationSlots(void);
169169

170170
extern void CheckSlotRequirements(void);

0 commit comments

Comments
 (0)