Skip to content

Commit f8a6d8e

Browse files
author
Amit Kapila
committed
Fix running out of file descriptors for spill files.
Currently while decoding changes, if the number of changes exceeds a certain threshold, we spill those to disk.  And this happens for each (sub)transaction.  Now, while reading all these files, we don't close them until we read all the files.  While reading these files, if the number of such files exceeds the maximum number of file descriptors, the operation errors out. Use PathNameOpenFile interface to open these files as that internally has the mechanism to release kernel FDs as needed to get us under the max_safe_fds limit. Reported-by: Amit Khandekar Author: Amit Khandekar Reviewed-by: Amit Kapila Backpatch-through: 9.4 Discussion: https://postgr.es/m/CAJ3gD9c-sECEn79zXw4yBnBdOttacoE-6gAyP0oy60nfs_sabQ@mail.gmail.com
1 parent 8f3e44a commit f8a6d8e

File tree

2 files changed

+90
-25
lines changed

2 files changed

+90
-25
lines changed

src/backend/replication/logical/reorderbuffer.c

Lines changed: 52 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -103,13 +103,21 @@ typedef struct ReorderBufferTupleCidEnt
103103
CommandId combocid; /* just for debugging */
104104
} ReorderBufferTupleCidEnt;
105105

106+
/* Virtual file descriptor with file offset tracking */
107+
typedef struct TXNEntryFile
108+
{
109+
File vfd; /* -1 when the file is closed */
110+
off_t curOffset; /* offset for next write or read. Reset to 0
111+
* when vfd is opened. */
112+
} TXNEntryFile;
113+
106114
/* k-way in-order change iteration support structures */
107115
typedef struct ReorderBufferIterTXNEntry
108116
{
109117
XLogRecPtr lsn;
110118
ReorderBufferChange *change;
111119
ReorderBufferTXN *txn;
112-
int fd;
120+
TXNEntryFile file;
113121
XLogSegNo segno;
114122
} ReorderBufferIterTXNEntry;
115123

@@ -178,7 +186,8 @@ static void AssertTXNLsnOrder(ReorderBuffer *rb);
178186
* subtransactions
179187
* ---------------------------------------
180188
*/
181-
static ReorderBufferIterTXNState *ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn);
189+
static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
190+
ReorderBufferIterTXNState *volatile *iter_state);
182191
static ReorderBufferChange *ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state);
183192
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb,
184193
ReorderBufferIterTXNState *state);
@@ -194,7 +203,7 @@ static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
194203
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
195204
int fd, ReorderBufferChange *change);
196205
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
197-
int *fd, XLogSegNo *segno);
206+
TXNEntryFile *file, XLogSegNo *segno);
198207
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
199208
char *change);
200209
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
@@ -945,15 +954,23 @@ ReorderBufferIterCompare(Datum a, Datum b, void *arg)
945954
/*
946955
* Allocate & initialize an iterator which iterates in lsn order over a
947956
* transaction and all its subtransactions.
957+
*
958+
* Note: The iterator state is returned through iter_state parameter rather
959+
* than the function's return value. This is because the state gets cleaned up
960+
* in a PG_CATCH block in the caller, so we want to make sure the caller gets
961+
* back the state even if this function throws an exception.
948962
*/
949-
static ReorderBufferIterTXNState *
950-
ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
963+
static void
964+
ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
965+
ReorderBufferIterTXNState *volatile *iter_state)
951966
{
952967
Size nr_txns = 0;
953968
ReorderBufferIterTXNState *state;
954969
dlist_iter cur_txn_i;
955970
int32 off;
956971

972+
*iter_state = NULL;
973+
957974
/*
958975
* Calculate the size of our heap: one element for every transaction that
959976
* contains changes. (Besides the transactions already in the reorder
@@ -988,7 +1005,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
9881005

9891006
for (off = 0; off < state->nr_txns; off++)
9901007
{
991-
state->entries[off].fd = -1;
1008+
state->entries[off].file.vfd = -1;
9921009
state->entries[off].segno = 0;
9931010
}
9941011

@@ -997,6 +1014,9 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
9971014
ReorderBufferIterCompare,
9981015
state);
9991016

1017+
/* Now that the state fields are initialized, it is safe to return it. */
1018+
*iter_state = state;
1019+
10001020
/*
10011021
* Now insert items into the binary heap, in an unordered fashion. (We
10021022
* will run a heap assembly step at the end; this is more efficient.)
@@ -1013,7 +1033,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
10131033
{
10141034
/* serialize remaining changes */
10151035
ReorderBufferSerializeTXN(rb, txn);
1016-
ReorderBufferRestoreChanges(rb, txn, &state->entries[off].fd,
1036+
ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
10171037
&state->entries[off].segno);
10181038
}
10191039

@@ -1043,7 +1063,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
10431063
/* serialize remaining changes */
10441064
ReorderBufferSerializeTXN(rb, cur_txn);
10451065
ReorderBufferRestoreChanges(rb, cur_txn,
1046-
&state->entries[off].fd,
1066+
&state->entries[off].file,
10471067
&state->entries[off].segno);
10481068
}
10491069
cur_change = dlist_head_element(ReorderBufferChange, node,
@@ -1059,8 +1079,6 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
10591079

10601080
/* assemble a valid binary heap */
10611081
binaryheap_build(state->heap);
1062-
1063-
return state;
10641082
}
10651083

10661084
/*
@@ -1124,7 +1142,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
11241142
dlist_delete(&change->node);
11251143
dlist_push_tail(&state->old_change, &change->node);
11261144

1127-
if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->fd,
1145+
if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
11281146
&state->entries[off].segno))
11291147
{
11301148
/* successfully restored changes from disk */
@@ -1163,8 +1181,8 @@ ReorderBufferIterTXNFinish(ReorderBuffer *rb,
11631181

11641182
for (off = 0; off < state->nr_txns; off++)
11651183
{
1166-
if (state->entries[off].fd != -1)
1167-
CloseTransientFile(state->entries[off].fd);
1184+
if (state->entries[off].file.vfd != -1)
1185+
FileClose(state->entries[off].file.vfd);
11681186
}
11691187

11701188
/* free memory we might have "leaked" in the last *Next call */
@@ -1500,7 +1518,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
15001518

15011519
rb->begin(rb, txn);
15021520

1503-
iterstate = ReorderBufferIterTXNInit(rb, txn);
1521+
ReorderBufferIterTXNInit(rb, txn, &iterstate);
15041522
while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
15051523
{
15061524
Relation relation = NULL;
@@ -2517,11 +2535,12 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
25172535
*/
25182536
static Size
25192537
ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
2520-
int *fd, XLogSegNo *segno)
2538+
TXNEntryFile *file, XLogSegNo *segno)
25212539
{
25222540
Size restored = 0;
25232541
XLogSegNo last_segno;
25242542
dlist_mutable_iter cleanup_iter;
2543+
File *fd = &file->vfd;
25252544

25262545
Assert(txn->first_lsn != InvalidXLogRecPtr);
25272546
Assert(txn->final_lsn != InvalidXLogRecPtr);
@@ -2562,7 +2581,11 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
25622581
ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
25632582
*segno);
25642583

2565-
*fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
2584+
*fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
2585+
2586+
/* No harm in resetting the offset even in case of failure */
2587+
file->curOffset = 0;
2588+
25662589
if (*fd < 0 && errno == ENOENT)
25672590
{
25682591
*fd = -1;
@@ -2582,14 +2605,14 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
25822605
* end of this file.
25832606
*/
25842607
ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange));
2585-
pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ);
2586-
readBytes = read(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange));
2587-
pgstat_report_wait_end();
2608+
readBytes = FileRead(file->vfd, rb->outbuf,
2609+
sizeof(ReorderBufferDiskChange),
2610+
file->curOffset, WAIT_EVENT_REORDER_BUFFER_READ);
25882611

25892612
/* eof */
25902613
if (readBytes == 0)
25912614
{
2592-
CloseTransientFile(*fd);
2615+
FileClose(*fd);
25932616
*fd = -1;
25942617
(*segno)++;
25952618
continue;
@@ -2605,16 +2628,19 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
26052628
readBytes,
26062629
(uint32) sizeof(ReorderBufferDiskChange))));
26072630

2631+
file->curOffset += readBytes;
2632+
26082633
ondisk = (ReorderBufferDiskChange *) rb->outbuf;
26092634

26102635
ReorderBufferSerializeReserve(rb,
26112636
sizeof(ReorderBufferDiskChange) + ondisk->size);
26122637
ondisk = (ReorderBufferDiskChange *) rb->outbuf;
26132638

2614-
pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ);
2615-
readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange),
2616-
ondisk->size - sizeof(ReorderBufferDiskChange));
2617-
pgstat_report_wait_end();
2639+
readBytes = FileRead(file->vfd,
2640+
rb->outbuf + sizeof(ReorderBufferDiskChange),
2641+
ondisk->size - sizeof(ReorderBufferDiskChange),
2642+
file->curOffset,
2643+
WAIT_EVENT_REORDER_BUFFER_READ);
26182644

26192645
if (readBytes < 0)
26202646
ereport(ERROR,
@@ -2627,6 +2653,8 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
26272653
readBytes,
26282654
(uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
26292655

2656+
file->curOffset += readBytes;
2657+
26302658
/*
26312659
* ok, read a full change from disk, now restore it into proper
26322660
* in-memory format

src/test/recovery/t/006_logical_decoding.pl

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
use warnings;
88
use PostgresNode;
99
use TestLib;
10-
use Test::More tests => 10;
10+
use Test::More tests => 11;
1111
use Config;
1212

1313
# Initialize master node
@@ -135,5 +135,42 @@
135135
is($node_master->slot('otherdb_slot')->{'slot_name'},
136136
undef, 'logical slot was actually dropped with DB');
137137

138+
# Test to ensure that we don't run out of file descriptors even if there
139+
# are more spill files than maxAllocatedDescs.
140+
141+
# Set max_files_per_process to a small value to make it more likely to run out
142+
# of max open file descriptors.
143+
$node_master->safe_psql('postgres',
144+
'ALTER SYSTEM SET max_files_per_process = 26;');
145+
$node_master->restart;
146+
147+
$node_master->safe_psql(
148+
'postgres', q{
149+
do $$
150+
BEGIN
151+
FOR i IN 1..10 LOOP
152+
BEGIN
153+
INSERT INTO decoding_test(x) SELECT generate_series(1,5000);
154+
EXCEPTION
155+
when division_by_zero then perform 'dummy';
156+
END;
157+
END LOOP;
158+
END $$;
159+
});
160+
161+
$result = $node_master->safe_psql('postgres',
162+
qq[
163+
SELECT data from pg_logical_slot_get_changes('test_slot', NULL, NULL)
164+
WHERE data LIKE '%INSERT%' ORDER BY lsn LIMIT 1;
165+
]);
166+
167+
$expected = q{table public.decoding_test: INSERT: x[integer]:1 y[text]:null};
168+
is($result, $expected, 'got expected output from spilling subxacts session');
169+
170+
# Reset back max_files_per_process
171+
$node_master->safe_psql('postgres',
172+
'ALTER SYSTEM SET max_files_per_process = DEFAULT;');
173+
$node_master->restart;
174+
138175
# done with the node
139176
$node_master->stop;

0 commit comments

Comments
 (0)