Skip to content

Commit 6d30e3a

Browse files
committed
Refrain from duplicating data in reorderbuffers
If a walsender exits leaving data in reorderbuffers, the next walsender that tries to decode the same transaction would append its decoded data in the same spill files without truncating it first, which effectively duplicate the data. Avoid that by removing any leftover reorderbuffer spill files when a walsender starts. Backpatch to 9.4; this bug has been there from the very beginning of logical decoding. Author: Craig Ringer, revised by me Reviewed by: Álvaro Herrera, Petr Jelínek, Masahiko Sawada
1 parent 165fa27 commit 6d30e3a

File tree

1 file changed

+82
-55
lines changed

1 file changed

+82
-55
lines changed

src/backend/replication/logical/reorderbuffer.c

Lines changed: 82 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,9 @@ static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn
201201
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
202202
char *change);
203203
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
204+
static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
205+
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
206+
TransactionId xid, XLogSegNo segno);
204207

205208
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
206209
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
@@ -219,7 +222,8 @@ static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *t
219222

220223

221224
/*
222-
* Allocate a new ReorderBuffer
225+
* Allocate a new ReorderBuffer and clean out any old serialized state from
226+
* prior ReorderBuffer instances for the same slot.
223227
*/
224228
ReorderBuffer *
225229
ReorderBufferAllocate(void)
@@ -228,6 +232,8 @@ ReorderBufferAllocate(void)
228232
HASHCTL hash_ctl;
229233
MemoryContext new_ctx;
230234

235+
Assert(MyReplicationSlot != NULL);
236+
231237
/* allocate memory in own context, to have better accountability */
232238
new_ctx = AllocSetContextCreate(CurrentMemoryContext,
233239
"ReorderBuffer",
@@ -267,6 +273,13 @@ ReorderBufferAllocate(void)
267273
dlist_init(&buffer->cached_changes);
268274
slist_init(&buffer->cached_tuplebufs);
269275

276+
/*
277+
* Ensure there's no stale data from prior uses of this slot, in case some
278+
* prior exit avoided calling ReorderBufferFree. Failure to do this can
279+
* produce duplicated txns, and it's very cheap if there's nothing there.
280+
*/
281+
ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name));
282+
270283
return buffer;
271284
}
272285

@@ -283,6 +296,9 @@ ReorderBufferFree(ReorderBuffer *rb)
283296
* memory context.
284297
*/
285298
MemoryContextDelete(context);
299+
300+
/* Free disk space used by unconsumed reorder buffers */
301+
ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name));
286302
}
287303

288304
/*
@@ -1964,7 +1980,6 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
19641980
int fd = -1;
19651981
XLogSegNo curOpenSegNo = 0;
19661982
Size spilled = 0;
1967-
char path[MAXPGPATH];
19681983

19691984
elog(DEBUG2, "spill %u changes in XID %u to disk",
19701985
(uint32) txn->nentries_mem, txn->xid);
@@ -1991,21 +2006,19 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
19912006
*/
19922007
if (fd == -1 || !XLByteInSeg(change->lsn, curOpenSegNo))
19932008
{
1994-
XLogRecPtr recptr;
2009+
char path[MAXPGPATH];
19952010

19962011
if (fd != -1)
19972012
CloseTransientFile(fd);
19982013

19992014
XLByteToSeg(change->lsn, curOpenSegNo);
2000-
XLogSegNoOffsetToRecPtr(curOpenSegNo, 0, recptr);
20012015

20022016
/*
20032017
* No need to care about TLIs here, only used during a single run,
20042018
* so each LSN only maps to a specific WAL record.
20052019
*/
2006-
sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2007-
NameStr(MyReplicationSlot->data.name), txn->xid,
2008-
(uint32) (recptr >> 32), (uint32) recptr);
2020+
ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
2021+
curOpenSegNo);
20092022

20102023
/* open segment, create it if necessary */
20112024
fd = OpenTransientFile(path,
@@ -2015,8 +2028,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
20152028
if (fd < 0)
20162029
ereport(ERROR,
20172030
(errcode_for_file_access(),
2018-
errmsg("could not open file \"%s\": %m",
2019-
path)));
2031+
errmsg("could not open file \"%s\": %m", path)));
20202032
}
20212033

20222034
ReorderBufferSerializeChange(rb, txn, fd, change);
@@ -2203,25 +2215,20 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
22032215

22042216
if (*fd == -1)
22052217
{
2206-
XLogRecPtr recptr;
22072218
char path[MAXPGPATH];
22082219

22092220
/* first time in */
22102221
if (*segno == 0)
2211-
{
22122222
XLByteToSeg(txn->first_lsn, *segno);
2213-
}
22142223

22152224
Assert(*segno != 0 || dlist_is_empty(&txn->changes));
2216-
XLogSegNoOffsetToRecPtr(*segno, 0, recptr);
22172225

22182226
/*
22192227
* No need to care about TLIs here, only used during a single run,
22202228
* so each LSN only maps to a specific WAL record.
22212229
*/
2222-
sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2223-
NameStr(MyReplicationSlot->data.name), txn->xid,
2224-
(uint32) (recptr >> 32), (uint32) recptr);
2230+
ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
2231+
*segno);
22252232

22262233
*fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
22272234
if (*fd < 0 && errno == ENOENT)
@@ -2428,20 +2435,72 @@ ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
24282435
for (cur = first; cur <= last; cur++)
24292436
{
24302437
char path[MAXPGPATH];
2431-
XLogRecPtr recptr;
2432-
2433-
XLogSegNoOffsetToRecPtr(cur, 0, recptr);
24342438

2435-
sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2436-
NameStr(MyReplicationSlot->data.name), txn->xid,
2437-
(uint32) (recptr >> 32), (uint32) recptr);
2439+
ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, cur);
24382440
if (unlink(path) != 0 && errno != ENOENT)
24392441
ereport(ERROR,
24402442
(errcode_for_file_access(),
24412443
errmsg("could not remove file \"%s\": %m", path)));
24422444
}
24432445
}
24442446

2447+
/*
2448+
* Remove any leftover serialized reorder buffers from a slot directory after a
2449+
* prior crash or decoding session exit.
2450+
*/
2451+
static void
2452+
ReorderBufferCleanupSerializedTXNs(const char *slotname)
2453+
{
2454+
DIR *spill_dir;
2455+
struct dirent *spill_de;
2456+
struct stat statbuf;
2457+
char path[MAXPGPATH * 2 + 12];
2458+
2459+
sprintf(path, "pg_replslot/%s", slotname);
2460+
2461+
/* we're only handling directories here, skip if it's not ours */
2462+
if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
2463+
return;
2464+
2465+
spill_dir = AllocateDir(path);
2466+
while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
2467+
{
2468+
/* only look at names that can be ours */
2469+
if (strncmp(spill_de->d_name, "xid", 3) == 0)
2470+
{
2471+
snprintf(path, sizeof(path),
2472+
"pg_replslot/%s/%s", slotname,
2473+
spill_de->d_name);
2474+
2475+
if (unlink(path) != 0)
2476+
ereport(ERROR,
2477+
(errcode_for_file_access(),
2478+
errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/*.xid: %m",
2479+
path, slotname)));
2480+
}
2481+
}
2482+
FreeDir(spill_dir);
2483+
}
2484+
2485+
/*
2486+
* Given a replication slot, transaction ID and segment number, fill in the
2487+
* corresponding spill file into 'path', which is a caller-owned buffer of size
2488+
* at least MAXPGPATH.
2489+
*/
2490+
static void
2491+
ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid,
2492+
XLogSegNo segno)
2493+
{
2494+
XLogRecPtr recptr;
2495+
2496+
XLogSegNoOffsetToRecPtr(segno, 0, recptr);
2497+
2498+
snprintf(path, MAXPGPATH, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2499+
NameStr(MyReplicationSlot->data.name),
2500+
xid,
2501+
(uint32) (recptr >> 32), (uint32) recptr);
2502+
}
2503+
24452504
/*
24462505
* Delete all data spilled to disk after we've restarted/crashed. It will be
24472506
* recreated when the respective slots are reused.
@@ -2452,15 +2511,9 @@ StartupReorderBuffer(void)
24522511
DIR *logical_dir;
24532512
struct dirent *logical_de;
24542513

2455-
DIR *spill_dir;
2456-
struct dirent *spill_de;
2457-
24582514
logical_dir = AllocateDir("pg_replslot");
24592515
while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
24602516
{
2461-
struct stat statbuf;
2462-
char path[MAXPGPATH * 2 + 12];
2463-
24642517
if (strcmp(logical_de->d_name, ".") == 0 ||
24652518
strcmp(logical_de->d_name, "..") == 0)
24662519
continue;
@@ -2473,33 +2526,7 @@ StartupReorderBuffer(void)
24732526
* ok, has to be a surviving logical slot, iterate and delete
24742527
* everything starting with xid-*
24752528
*/
2476-
sprintf(path, "pg_replslot/%s", logical_de->d_name);
2477-
2478-
/* we're only creating directories here, skip if it's not our's */
2479-
if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
2480-
continue;
2481-
2482-
spill_dir = AllocateDir(path);
2483-
while ((spill_de = ReadDir(spill_dir, path)) != NULL)
2484-
{
2485-
if (strcmp(spill_de->d_name, ".") == 0 ||
2486-
strcmp(spill_de->d_name, "..") == 0)
2487-
continue;
2488-
2489-
/* only look at names that can be ours */
2490-
if (strncmp(spill_de->d_name, "xid", 3) == 0)
2491-
{
2492-
sprintf(path, "pg_replslot/%s/%s", logical_de->d_name,
2493-
spill_de->d_name);
2494-
2495-
if (unlink(path) != 0)
2496-
ereport(PANIC,
2497-
(errcode_for_file_access(),
2498-
errmsg("could not remove file \"%s\": %m",
2499-
path)));
2500-
}
2501-
}
2502-
FreeDir(spill_dir);
2529+
ReorderBufferCleanupSerializedTXNs(logical_de->d_name);
25032530
}
25042531
FreeDir(logical_dir);
25052532
}

0 commit comments

Comments
 (0)