Skip to content

Commit 1ad47e8

Browse files
author
Amit Kapila
committed
Fix running out of file descriptors for spill files.
Currently while decoding changes, if the number of changes exceeds a certain threshold, we spill those to disk.  And this happens for each (sub)transaction.  Now, while reading all these files, we don't close them until we read all the files.  While reading these files, if the number of such files exceeds the maximum number of file descriptors, the operation errors out. Use PathNameOpenFile interface to open these files as that internally has the mechanism to release kernel FDs as needed to get us under the max_safe_fds limit. Reported-by: Amit Khandekar Author: Amit Khandekar Reviewed-by: Amit Kapila Backpatch-through: 9.4 Discussion: https://postgr.es/m/CAJ3gD9c-sECEn79zXw4yBnBdOttacoE-6gAyP0oy60nfs_sabQ@mail.gmail.com
1 parent ce758a3 commit 1ad47e8

File tree

1 file changed

+25
-15
lines changed

1 file changed

+25
-15
lines changed

src/backend/replication/logical/reorderbuffer.c

+25-15
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ typedef struct ReorderBufferIterTXNEntry
101101
XLogRecPtr lsn;
102102
ReorderBufferChange *change;
103103
ReorderBufferTXN *txn;
104-
int fd;
104+
File fd;
105105
XLogSegNo segno;
106106
} ReorderBufferIterTXNEntry;
107107

@@ -182,7 +182,8 @@ static void AssertTXNLsnOrder(ReorderBuffer *rb);
182182
* subtransactions
183183
* ---------------------------------------
184184
*/
185-
static ReorderBufferIterTXNState *ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn);
185+
static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
186+
ReorderBufferIterTXNState *volatile *iter_state);
186187
static ReorderBufferChange *
187188
ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state);
188189
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb,
@@ -199,7 +200,7 @@ static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
199200
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
200201
int fd, ReorderBufferChange *change);
201202
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
202-
int *fd, XLogSegNo *segno);
203+
File *fd, XLogSegNo *segno);
203204
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
204205
char *change);
205206
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
@@ -942,15 +943,23 @@ ReorderBufferIterCompare(Datum a, Datum b, void *arg)
942943
/*
943944
* Allocate & initialize an iterator which iterates in lsn order over a
944945
* transaction and all its subtransactions.
946+
*
947+
* Note: The iterator state is returned through iter_state parameter rather
948+
* than the function's return value. This is because the state gets cleaned up
949+
* in a PG_CATCH block in the caller, so we want to make sure the caller gets
950+
* back the state even if this function throws an exception.
945951
*/
946-
static ReorderBufferIterTXNState *
947-
ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
952+
static void
953+
ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
954+
ReorderBufferIterTXNState *volatile *iter_state)
948955
{
949956
Size nr_txns = 0;
950957
ReorderBufferIterTXNState *state;
951958
dlist_iter cur_txn_i;
952959
int32 off;
953960

961+
*iter_state = NULL;
962+
954963
/*
955964
* Calculate the size of our heap: one element for every transaction that
956965
* contains changes. (Besides the transactions already in the reorder
@@ -994,6 +1003,9 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
9941003
ReorderBufferIterCompare,
9951004
state);
9961005

1006+
/* Now that the state fields are initialized, it is safe to return it. */
1007+
*iter_state = state;
1008+
9971009
/*
9981010
* Now insert items into the binary heap, in an unordered fashion. (We
9991011
* will run a heap assembly step at the end; this is more efficient.)
@@ -1056,8 +1068,6 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
10561068

10571069
/* assemble a valid binary heap */
10581070
binaryheap_build(state->heap);
1059-
1060-
return state;
10611071
}
10621072

10631073
/*
@@ -1161,7 +1171,7 @@ ReorderBufferIterTXNFinish(ReorderBuffer *rb,
11611171
for (off = 0; off < state->nr_txns; off++)
11621172
{
11631173
if (state->entries[off].fd != -1)
1164-
CloseTransientFile(state->entries[off].fd);
1174+
FileClose(state->entries[off].fd);
11651175
}
11661176

11671177
/* free memory we might have "leaked" in the last *Next call */
@@ -1496,7 +1506,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
14961506

14971507
rb->begin(rb, txn);
14981508

1499-
iterstate = ReorderBufferIterTXNInit(rb, txn);
1509+
ReorderBufferIterTXNInit(rb, txn, &iterstate);
15001510
while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
15011511
{
15021512
Relation relation = NULL;
@@ -2338,7 +2348,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
23382348
*/
23392349
static Size
23402350
ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
2341-
int *fd, XLogSegNo *segno)
2351+
File *fd, XLogSegNo *segno)
23422352
{
23432353
Size restored = 0;
23442354
XLogSegNo last_segno;
@@ -2383,7 +2393,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
23832393
ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
23842394
*segno);
23852395

2386-
*fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
2396+
*fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY, 0);
23872397
if (*fd < 0 && errno == ENOENT)
23882398
{
23892399
*fd = -1;
@@ -2404,12 +2414,12 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
24042414
* end of this file.
24052415
*/
24062416
ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange));
2407-
readBytes = read(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange));
2417+
readBytes = FileRead(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange));
24082418

24092419
/* eof */
24102420
if (readBytes == 0)
24112421
{
2412-
CloseTransientFile(*fd);
2422+
FileClose(*fd);
24132423
*fd = -1;
24142424
(*segno)++;
24152425
continue;
@@ -2431,8 +2441,8 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
24312441
sizeof(ReorderBufferDiskChange) + ondisk->size);
24322442
ondisk = (ReorderBufferDiskChange *) rb->outbuf;
24332443

2434-
readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange),
2435-
ondisk->size - sizeof(ReorderBufferDiskChange));
2444+
readBytes = FileRead(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange),
2445+
ondisk->size - sizeof(ReorderBufferDiskChange));
24362446

24372447
if (readBytes < 0)
24382448
ereport(ERROR,

0 commit comments

Comments
 (0)