From bddd73df21f9e797daf39ab46374a631e47fd1ab Mon Sep 17 00:00:00 2001 From: Melanie Plageman Date: Sun, 30 Mar 2025 14:03:55 -0400 Subject: [PATCH 01/12] aio: Add errcontext for processing I/Os for another backend Push an ErrorContextCallback adding additional detail about the process performing the I/O and the owner of the I/O when those are not the same. For io_method worker, this adds context specifying which process owns the I/O that the I/O worker is processing. For io_method io_uring, this adds context only when a backend is *completing* I/O for another backend. It specifies the pid of the owning process. Author: Melanie Plageman Discussion: https://postgr.es/m/20250325141120.8e.nmisch%40google.com --- src/backend/storage/aio/method_io_uring.c | 31 +++++++++++++++++++++++ src/backend/storage/aio/method_worker.c | 29 +++++++++++++++++++++ 2 files changed, 60 insertions(+) diff --git a/src/backend/storage/aio/method_io_uring.c b/src/backend/storage/aio/method_io_uring.c index 0bcdab14ae7e3..c719ba2727a81 100644 --- a/src/backend/storage/aio/method_io_uring.c +++ b/src/backend/storage/aio/method_io_uring.c @@ -302,14 +302,41 @@ pgaio_uring_submit(uint16 num_staged_ios, PgAioHandle **staged_ios) return num_staged_ios; } +static void +pgaio_uring_completion_error_callback(void *arg) +{ + ProcNumber owner; + PGPROC *owner_proc; + int32 owner_pid; + PgAioHandle *ioh = arg; + + if (!ioh) + return; + + /* No need for context if a backend is completing the IO for itself */ + if (ioh->owner_procno == MyProcNumber) + return; + + owner = ioh->owner_procno; + owner_proc = GetPGProcByNumber(owner); + owner_pid = owner_proc->pid; + + errcontext("completing I/O on behalf of process %d", owner_pid); +} + static void pgaio_uring_drain_locked(PgAioUringContext *context) { int ready; int orig_ready; + ErrorContextCallback errcallback = {0}; Assert(LWLockHeldByMeInMode(&context->completion_lock, LW_EXCLUSIVE)); + errcallback.callback = pgaio_uring_completion_error_callback; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + /* * Don't drain more events than available right now. Otherwise it's * plausible that one backend could get stuck, for a while, receiving CQEs @@ -337,9 +364,11 @@ pgaio_uring_drain_locked(PgAioUringContext *context) PgAioHandle *ioh; ioh = io_uring_cqe_get_data(cqe); + errcallback.arg = ioh; io_uring_cqe_seen(&context->io_uring_ring, cqe); pgaio_io_process_completion(ioh, cqe->res); + errcallback.arg = NULL; } END_CRIT_SECTION(); @@ -348,6 +377,8 @@ pgaio_uring_drain_locked(PgAioUringContext *context) "drained %d/%d, now expecting %d", ncqes, orig_ready, io_uring_cq_ready(&context->io_uring_ring)); } + + error_context_stack = errcallback.previous; } static void diff --git a/src/backend/storage/aio/method_worker.c b/src/backend/storage/aio/method_worker.c index 4a7853d13fac9..31d94ac82c540 100644 --- a/src/backend/storage/aio/method_worker.c +++ b/src/backend/storage/aio/method_worker.c @@ -357,11 +357,33 @@ pgaio_worker_register(void) on_shmem_exit(pgaio_worker_die, 0); } +static void +pgaio_worker_error_callback(void *arg) +{ + ProcNumber owner; + PGPROC *owner_proc; + int32 owner_pid; + PgAioHandle *ioh = arg; + + if (!ioh) + return; + + Assert(ioh->owner_procno != MyProcNumber); + Assert(MyBackendType == B_IO_WORKER); + + owner = ioh->owner_procno; + owner_proc = GetPGProcByNumber(owner); + owner_pid = owner_proc->pid; + + errcontext("I/O worker executing I/O on behalf of process %d", owner_pid); +} + void IoWorkerMain(const void *startup_data, size_t startup_data_len) { sigjmp_buf local_sigjmp_buf; PgAioHandle *volatile error_ioh = NULL; + ErrorContextCallback errcallback = {0}; volatile int error_errno = 0; char cmd[128]; @@ -388,6 +410,10 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len) sprintf(cmd, "%d", MyIoWorkerId); set_ps_display(cmd); + errcallback.callback = pgaio_worker_error_callback; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + /* see PostgresMain() */ if (sigsetjmp(local_sigjmp_buf, 1) != 0) { @@ -471,6 +497,7 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len) ioh = &pgaio_ctl->io_handles[io_index]; error_ioh = ioh; + errcallback.arg = ioh; pgaio_debug_io(DEBUG4, ioh, "worker %d processing IO", @@ -511,6 +538,7 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len) pgaio_io_perform_synchronously(ioh); RESUME_INTERRUPTS(); + errcallback.arg = NULL; } else { @@ -522,6 +550,7 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len) CHECK_FOR_INTERRUPTS(); } + error_context_stack = errcallback.previous; proc_exit(0); } From a97261db8ca6299fc4a768b6f1f42e6b88a3fec9 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Tue, 18 Mar 2025 14:40:06 -0400 Subject: [PATCH 02/12] aio: Experimental heuristics to increase batching in read_stream.c Author: Reviewed-by: Discussion: https://postgr.es/m/ Backpatch: --- src/backend/storage/aio/read_stream.c | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index 36c54fb695b05..cec93129f582c 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -404,6 +404,30 @@ read_stream_start_pending_read(ReadStream *stream) static void read_stream_look_ahead(ReadStream *stream) { + /* + * Batch-submitting multiple IOs is more efficient than doing so + * one-by-one. If we just ramp up to the max, we'll only be allowed to + * submit one io_combine_limit sized IO. Defer submitting IO in that case. + * + * FIXME: This needs better heuristics. + */ +#if 1 + if (!stream->sync_mode && stream->distance > (io_combine_limit * 8)) + { + if (stream->pinned_buffers + stream->pending_read_nblocks > ((stream->distance * 3) / 4)) + { +#if 0 + ereport(LOG, + errmsg("reduce reduce reduce: pinned: %d, pending: %d, distance: %d", + stream->pinned_buffers, + stream->pending_read_nblocks, + stream->distance)); +#endif + return; + } + } +#endif + /* * Allow amortizing the cost of submitting IO over multiple IOs. This * requires that we don't do any operations that could lead to a deadlock From aed463d527afc41b15e397e0b526787239f6a7bc Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Sat, 29 Mar 2025 13:28:52 -0400 Subject: [PATCH 03/12] aio: Implement smgr/md/fd write support TODO: - Right now the sync.c integration with smgr.c/md.c isn't properly safe to use in a critical section The only reason it doesn't immediately fail is that it's reasonably rare that RegisterSyncRequest() fails *and* either: - smgropen()->hash_search(HASH_ENTER) decides to resize the hash table, even though the lookup is guaranteed to succeed for io_method=worker. - an io_method=uring completion is run in a different backend and smgropen() needs to build a new entry and thus needs to allocate memory For a bit I thought this could be worked around easily enough by not doing an smgropen() in mdsyncfiletag(), or adding a "fallible" smgropen() and instead just opening the file directly. That actually does kinda solve the problem, but only because the memory allocation in PathNameOpenFile() uses malloc(), not palloc() and thus doesn't trigger - temp_file_limit implementation --- src/backend/storage/aio/aio_callback.c | 1 + src/backend/storage/file/fd.c | 28 ++++ src/backend/storage/smgr/md.c | 199 +++++++++++++++++++++++++ src/backend/storage/smgr/smgr.c | 29 ++++ src/include/storage/aio.h | 1 + src/include/storage/fd.h | 1 + src/include/storage/md.h | 5 + src/include/storage/smgr.h | 5 + 8 files changed, 269 insertions(+) diff --git a/src/backend/storage/aio/aio_callback.c b/src/backend/storage/aio/aio_callback.c index bf42778a48c70..abe5b191b4270 100644 --- a/src/backend/storage/aio/aio_callback.c +++ b/src/backend/storage/aio/aio_callback.c @@ -41,6 +41,7 @@ static const PgAioHandleCallbacksEntry aio_handle_cbs[] = { CALLBACK_ENTRY(PGAIO_HCB_INVALID, aio_invalid_cb), CALLBACK_ENTRY(PGAIO_HCB_MD_READV, aio_md_readv_cb), + CALLBACK_ENTRY(PGAIO_HCB_MD_WRITEV, aio_md_writev_cb), CALLBACK_ENTRY(PGAIO_HCB_SHARED_BUFFER_READV, aio_shared_buffer_readv_cb), diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c index 0e8299dd55646..2138d47dab930 100644 --- a/src/backend/storage/file/fd.c +++ b/src/backend/storage/file/fd.c @@ -2348,6 +2348,34 @@ FileWriteV(File file, const struct iovec *iov, int iovcnt, off_t offset, return returnCode; } +int +FileStartWriteV(PgAioHandle *ioh, File file, + int iovcnt, off_t offset, + uint32 wait_event_info) +{ + int returnCode; + Vfd *vfdP; + + Assert(FileIsValid(file)); + + DO_DB(elog(LOG, "FileStartWriteV: %d (%s) " INT64_FORMAT " %d", + file, VfdCache[file].fileName, + (int64) offset, + iovcnt)); + + returnCode = FileAccess(file); + if (returnCode < 0) + return returnCode; + + vfdP = &VfdCache[file]; + + /* FIXME: think about / reimplement temp_file_limit */ + + pgaio_io_start_writev(ioh, vfdP->fd, iovcnt, offset); + + return 0; +} + int FileSync(File file, uint32 wait_event_info) { diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c index f99c9d9001342..f1b09ec3e984f 100644 --- a/src/backend/storage/smgr/md.c +++ b/src/backend/storage/smgr/md.c @@ -155,12 +155,19 @@ static BlockNumber _mdnblocks(SMgrRelation reln, ForkNumber forknum, static PgAioResult md_readv_complete(PgAioHandle *ioh, PgAioResult prior_result, uint8 cb_data); static void md_readv_report(PgAioResult result, const PgAioTargetData *target_data, int elevel); +static PgAioResult md_writev_complete(PgAioHandle *ioh, PgAioResult prior_result, uint8 cb_data); +static void md_writev_report(PgAioResult result, const PgAioTargetData *target_data, int elevel); const PgAioHandleCallbacks aio_md_readv_cb = { .complete_shared = md_readv_complete, .report = md_readv_report, }; +const PgAioHandleCallbacks aio_md_writev_cb = { + .complete_shared = md_writev_complete, + .report = md_writev_report, +}; + static inline int _mdfd_open_flags(void) @@ -1143,6 +1150,64 @@ mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, } } +/* + * mdstartwritev() -- Asynchronous version of mdrwritev(). + */ +void +mdstartwritev(PgAioHandle *ioh, + SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, + const void **buffers, BlockNumber nblocks, bool skipFsync) +{ + off_t seekpos; + MdfdVec *v; + BlockNumber nblocks_this_segment; + struct iovec *iov; + int iovcnt; + int ret; + + v = _mdfd_getseg(reln, forknum, blocknum, false, + EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY); + + seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE)); + + Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE); + + nblocks_this_segment = + Min(nblocks, + RELSEG_SIZE - (blocknum % ((BlockNumber) RELSEG_SIZE))); + + if (nblocks_this_segment != nblocks) + elog(ERROR, "write crossing segment boundary"); + + iovcnt = pgaio_io_get_iovec(ioh, &iov); + + Assert(nblocks <= iovcnt); + + iovcnt = buffers_to_iovec(iov, unconstify(void **, buffers), nblocks_this_segment); + + Assert(iovcnt <= nblocks_this_segment); + + if (!(io_direct_flags & IO_DIRECT_DATA)) + pgaio_io_set_flag(ioh, PGAIO_HF_BUFFERED); + + pgaio_io_set_target_smgr(ioh, + reln, + forknum, + blocknum, + nblocks, + skipFsync); + pgaio_io_register_callbacks(ioh, PGAIO_HCB_MD_WRITEV, 0); + + ret = FileStartWriteV(ioh, v->mdfd_vfd, iovcnt, seekpos, WAIT_EVENT_DATA_FILE_WRITE); + if (ret != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not start writing blocks %u..%u in file \"%s\": %m", + blocknum, + blocknum + nblocks_this_segment - 1, + FilePathName(v->mdfd_vfd)))); +} + /* * mdwriteback() -- Tell the kernel to write pages back to storage. @@ -1531,6 +1596,40 @@ register_dirty_segment(SMgrRelation reln, ForkNumber forknum, MdfdVec *seg) } } +/* + * Like register_dirty_segment(), except for use by AIO. In the completion + * callback we don't have access to the MdfdVec (the completion callback might + * be executed in a different backend than the issuing backend), therefore we + * have to implement this slightly differently. + */ +static void +register_dirty_segment_aio(RelFileLocator locator, ForkNumber forknum, uint64 segno) +{ + FileTag tag; + + INIT_MD_FILETAG(tag, locator, forknum, segno); + + /* + * Can't block here waiting for checkpointer to accept our sync request, + * as checkpointer might be waiting for this AIO to finish if offloaded to + * a worker. + */ + if (!RegisterSyncRequest(&tag, SYNC_REQUEST, false /* retryOnError */ )) + { + char path[MAXPGPATH]; + + ereport(DEBUG1, + (errmsg_internal("could not forward fsync request because request queue is full"))); + + /* reuse mdsyncfiletag() to avoid duplicating code */ + if (mdsyncfiletag(&tag, path)) + ereport(data_sync_elevel(ERROR), + (errcode_for_file_access(), + errmsg("could not fsync file \"%s\": %m", + path))); + } +} + /* * register_unlink_segment() -- Schedule a file to be deleted after next checkpoint */ @@ -2065,3 +2164,103 @@ md_readv_report(PgAioResult result, const PgAioTargetData *td, int elevel) td->smgr.nblocks * (size_t) BLCKSZ)); } } + +/* + * AIO completion callback for mdstartwritev(). + */ +static PgAioResult +md_writev_complete(PgAioHandle *ioh, PgAioResult prior_result, uint8 cb_data) +{ + PgAioTargetData *td = pgaio_io_get_target_data(ioh); + PgAioResult result = prior_result; + + if (prior_result.result < 0) + { + result.status = PGAIO_RS_ERROR; + result.id = PGAIO_HCB_MD_WRITEV; + /* For "hard" errors, track the error number in error_data */ + result.error_data = -prior_result.result; + result.result = 0; + + pgaio_result_report(result, td, LOG); + + return result; + } + + /* + * As explained above smgrstartwritev(), the smgr API operates on the + * level of blocks, rather than bytes. Convert. + */ + result.result /= BLCKSZ; + + Assert(result.result <= td->smgr.nblocks); + + if (result.result == 0) + { + /* consider 0 blocks written a failure */ + result.status = PGAIO_RS_ERROR; + result.id = PGAIO_HCB_MD_WRITEV; + result.error_data = 0; + + pgaio_result_report(result, td, LOG); + + return result; + } + + if (result.status != PGAIO_RS_ERROR && + result.result < td->smgr.nblocks) + { + /* partial writes should be retried at upper level */ + result.status = PGAIO_RS_PARTIAL; + result.id = PGAIO_HCB_MD_WRITEV; + } + + if (!td->smgr.skip_fsync) + register_dirty_segment_aio(td->smgr.rlocator, td->smgr.forkNum, + td->smgr.blockNum / ((BlockNumber) RELSEG_SIZE)); + + return result; +} + +/* + * AIO error reporting callback for mdstartwritev(). + */ +static void +md_writev_report(PgAioResult result, const PgAioTargetData *td, int elevel) +{ + RelPathStr path; + + path = relpathbackend(td->smgr.rlocator, + td->smgr.is_temp ? MyProcNumber : INVALID_PROC_NUMBER, + td->smgr.forkNum); + + if (result.error_data != 0) + { + errno = result.error_data; /* for errcode_for_file_access() */ + + ereport(elevel, + errcode_for_file_access(), + errmsg("could not write blocks %u..%u in file \"%s\": %m", + td->smgr.blockNum, + td->smgr.blockNum + td->smgr.nblocks, + path.str) + ); + } + else + { + /* + * NB: This will typically only be output in debug messages, while + * retrying a partial IO. + */ + ereport(elevel, + errcode(ERRCODE_DATA_CORRUPTED), + errmsg("could not write blocks %u..%u in file \"%s\": wrote only %zu of %zu bytes", + td->smgr.blockNum, + td->smgr.blockNum + td->smgr.nblocks - 1, + path.str, + result.result * (size_t) BLCKSZ, + td->smgr.nblocks * (size_t) BLCKSZ + ) + ); + } +} diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c index 4540284f5817c..abc9fdc88dcba 100644 --- a/src/backend/storage/smgr/smgr.c +++ b/src/backend/storage/smgr/smgr.c @@ -115,6 +115,11 @@ typedef struct f_smgr BlockNumber blocknum, const void **buffers, BlockNumber nblocks, bool skipFsync); + void (*smgr_startwritev) (PgAioHandle *ioh, + SMgrRelation reln, ForkNumber forknum, + BlockNumber blocknum, + const void **buffers, BlockNumber nblocks, + bool skipFsync); void (*smgr_writeback) (SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, BlockNumber nblocks); BlockNumber (*smgr_nblocks) (SMgrRelation reln, ForkNumber forknum); @@ -142,6 +147,7 @@ static const f_smgr smgrsw[] = { .smgr_readv = mdreadv, .smgr_startreadv = mdstartreadv, .smgr_writev = mdwritev, + .smgr_startwritev = mdstartwritev, .smgr_writeback = mdwriteback, .smgr_nblocks = mdnblocks, .smgr_truncate = mdtruncate, @@ -795,6 +801,29 @@ smgrwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, RESUME_INTERRUPTS(); } +/* + * smgrstartwritev() -- asynchronous version of smgrwritev() + * + * This starts an asynchronous writev IO using the IO handle `ioh`. Other than + * `ioh` all parameters are the same as smgrwritev(). + * + * Completion callbacks above smgr will be passed the result as the number of + * successfully written blocks if the write [partially] succeeds. This + * maintains the abstraction that smgr operates on the level of blocks, rather + * than bytes. + */ +void +smgrstartwritev(PgAioHandle *ioh, + SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, + const void **buffers, BlockNumber nblocks, bool skipFsync) +{ + HOLD_INTERRUPTS(); + smgrsw[reln->smgr_which].smgr_startwritev(ioh, + reln, forknum, blocknum, buffers, + nblocks, skipFsync); + RESUME_INTERRUPTS(); +} + /* * smgrwriteback() -- Trigger kernel writeback for the supplied range of * blocks. diff --git a/src/include/storage/aio.h b/src/include/storage/aio.h index 9fe9d9ad9fa49..bfe0d93683b1a 100644 --- a/src/include/storage/aio.h +++ b/src/include/storage/aio.h @@ -194,6 +194,7 @@ typedef enum PgAioHandleCallbackID PGAIO_HCB_INVALID = 0, PGAIO_HCB_MD_READV, + PGAIO_HCB_MD_WRITEV, PGAIO_HCB_SHARED_BUFFER_READV, diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h index b77d8e5e30e3a..2cc7c5a476175 100644 --- a/src/include/storage/fd.h +++ b/src/include/storage/fd.h @@ -112,6 +112,7 @@ extern int FilePrefetch(File file, off_t offset, off_t amount, uint32 wait_event extern ssize_t FileReadV(File file, const struct iovec *iov, int iovcnt, off_t offset, uint32 wait_event_info); extern ssize_t FileWriteV(File file, const struct iovec *iov, int iovcnt, off_t offset, uint32 wait_event_info); extern int FileStartReadV(struct PgAioHandle *ioh, File file, int iovcnt, off_t offset, uint32 wait_event_info); +extern int FileStartWriteV(struct PgAioHandle *ioh, File file, int iovcnt, off_t offset, uint32 wait_event_info); extern int FileSync(File file, uint32 wait_event_info); extern int FileZero(File file, off_t offset, off_t amount, uint32 wait_event_info); extern int FileFallocate(File file, off_t offset, off_t amount, uint32 wait_event_info); diff --git a/src/include/storage/md.h b/src/include/storage/md.h index 9d7131eff4384..47ae6c36c94f7 100644 --- a/src/include/storage/md.h +++ b/src/include/storage/md.h @@ -21,6 +21,7 @@ #include "storage/sync.h" extern const PgAioHandleCallbacks aio_md_readv_cb; +extern const PgAioHandleCallbacks aio_md_writev_cb; /* md storage manager functionality */ extern void mdinit(void); @@ -45,6 +46,10 @@ extern void mdstartreadv(PgAioHandle *ioh, extern void mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const void **buffers, BlockNumber nblocks, bool skipFsync); +extern void mdstartwritev(PgAioHandle *ioh, + SMgrRelation reln, ForkNumber forknum, + BlockNumber blocknum, + const void **buffers, BlockNumber nblocks, bool skipFsync); extern void mdwriteback(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, BlockNumber nblocks); extern BlockNumber mdnblocks(SMgrRelation reln, ForkNumber forknum); diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h index 856ebcda350e9..f00b3763ac99e 100644 --- a/src/include/storage/smgr.h +++ b/src/include/storage/smgr.h @@ -108,6 +108,11 @@ extern void smgrwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const void **buffers, BlockNumber nblocks, bool skipFsync); +extern void smgrstartwritev(PgAioHandle *ioh, + SMgrRelation reln, ForkNumber forknum, + BlockNumber blocknum, + const void **buffers, BlockNumber nblocks, + bool skipFsync); extern void smgrwriteback(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, BlockNumber nblocks); extern BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum); From aab6be7b4892efcd886f92a5c74f1b32cf24c766 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Tue, 18 Mar 2025 14:40:06 -0400 Subject: [PATCH 04/12] aio: Add bounce buffers --- src/backend/storage/aio/README.md | 27 +++ src/backend/storage/aio/aio.c | 178 ++++++++++++++++++ src/backend/storage/aio/aio_init.c | 123 ++++++++++++ src/backend/utils/misc/guc_tables.c | 13 ++ src/backend/utils/misc/postgresql.conf.sample | 2 + src/backend/utils/resowner/resowner.c | 25 ++- src/include/storage/aio.h | 15 ++ src/include/storage/aio_internal.h | 34 ++++ src/include/storage/aio_types.h | 2 + src/include/utils/resowner.h | 2 + src/test/modules/test_aio/test_aio--1.0.sql | 21 +++ src/test/modules/test_aio/test_aio.c | 55 ++++++ src/tools/pgindent/typedefs.list | 1 + 13 files changed, 496 insertions(+), 2 deletions(-) diff --git a/src/backend/storage/aio/README.md b/src/backend/storage/aio/README.md index b00de269ad926..7d9ea23228eac 100644 --- a/src/backend/storage/aio/README.md +++ b/src/backend/storage/aio/README.md @@ -406,6 +406,33 @@ shared memory no less!), completion callbacks instead have to encode errors in a more compact format that can be converted into an error message. +### AIO Bounce Buffers + +For some uses of AIO there is no convenient memory location as the source / +destination of an AIO. E.g. when data checksums are enabled, writes from +shared buffers currently cannot be done directly from shared buffers, as a +shared buffer lock still allows some modification, e.g., for hint bits (see +`FlushBuffer()`). If the write were done in-place, such modifications can +cause the checksum to fail. + +For synchronous IO this is solved by copying the buffer to separate memory +before computing the checksum and using that copy as the source buffer for the +AIO. + +However, for AIO that is not a workable solution: +- Instead of a single buffer many buffers are required, as many IOs might be + in flight +- When using the [worker method](#worker), the source/target of IO needs to be + in shared memory, otherwise the workers won't be able to access the memory. + +The AIO subsystem addresses this by providing a limited number of bounce +buffers that can be used as the source / target for IO. A bounce buffer can be +acquired with `pgaio_bounce_buffer_get()` and multiple bounce buffers can be +associated with an AIO Handle with `pgaio_io_assoc_bounce_buffer()`. + +Bounce buffers are automatically released when the IO completes. + + ## Helpers Using the low-level AIO API introduces too much complexity to do so all over diff --git a/src/backend/storage/aio/aio.c b/src/backend/storage/aio/aio.c index 86f7250b7a5f2..cff48964d0760 100644 --- a/src/backend/storage/aio/aio.c +++ b/src/backend/storage/aio/aio.c @@ -62,6 +62,8 @@ static PgAioHandle *pgaio_io_from_wref(PgAioWaitRef *iow, uint64 *ref_generation static const char *pgaio_io_state_get_name(PgAioHandleState s); static void pgaio_io_wait(PgAioHandle *ioh, uint64 ref_generation); +static void pgaio_bounce_buffer_wait_for_free(void); + /* Options for io_method. */ const struct config_enum_entry io_method_options[] = { @@ -76,6 +78,7 @@ const struct config_enum_entry io_method_options[] = { /* GUCs */ int io_method = DEFAULT_IO_METHOD; int io_max_concurrency = -1; +int io_bounce_buffers = -1; /* global control for AIO */ PgAioCtl *pgaio_ctl; @@ -662,6 +665,21 @@ pgaio_io_reclaim(PgAioHandle *ioh) if (ioh->state != PGAIO_HS_HANDED_OUT) dclist_delete_from(&pgaio_my_backend->in_flight_ios, &ioh->node); + /* reclaim all associated bounce buffers */ + if (!slist_is_empty(&ioh->bounce_buffers)) + { + slist_mutable_iter it; + + slist_foreach_modify(it, &ioh->bounce_buffers) + { + PgAioBounceBuffer *bb = slist_container(PgAioBounceBuffer, node, it.cur); + + slist_delete_current(&it); + + slist_push_head(&pgaio_my_backend->idle_bbs, &bb->node); + } + } + if (ioh->resowner) { ResourceOwnerForgetAioHandle(ioh->resowner, &ioh->resowner_node); @@ -1046,6 +1064,166 @@ pgaio_submit_staged(void) +/* -------------------------------------------------------------------------------- + * Functions primarily related to PgAioBounceBuffer + * -------------------------------------------------------------------------------- + */ + +PgAioBounceBuffer * +pgaio_bounce_buffer_get(void) +{ + PgAioBounceBuffer *bb = NULL; + slist_node *node; + + if (pgaio_my_backend->handed_out_bb != NULL) + elog(ERROR, "can only hand out one BB"); + + /* + * XXX: It probably is not a good idea to have bounce buffers be per + * backend, that's a fair bit of memory. + */ + if (slist_is_empty(&pgaio_my_backend->idle_bbs)) + { + pgaio_bounce_buffer_wait_for_free(); + } + + node = slist_pop_head_node(&pgaio_my_backend->idle_bbs); + bb = slist_container(PgAioBounceBuffer, node, node); + + pgaio_my_backend->handed_out_bb = bb; + + bb->resowner = CurrentResourceOwner; + ResourceOwnerRememberAioBounceBuffer(bb->resowner, &bb->resowner_node); + + return bb; +} + +void +pgaio_io_assoc_bounce_buffer(PgAioHandle *ioh, PgAioBounceBuffer *bb) +{ + if (pgaio_my_backend->handed_out_bb != bb) + elog(ERROR, "can only assign handed out BB"); + pgaio_my_backend->handed_out_bb = NULL; + + /* + * There can be many bounce buffers assigned in case of vectorized IOs. + */ + slist_push_head(&ioh->bounce_buffers, &bb->node); + + /* once associated with an IO, the IO has ownership */ + ResourceOwnerForgetAioBounceBuffer(bb->resowner, &bb->resowner_node); + bb->resowner = NULL; +} + +uint32 +pgaio_bounce_buffer_id(PgAioBounceBuffer *bb) +{ + return bb - pgaio_ctl->bounce_buffers; +} + +void +pgaio_bounce_buffer_release(PgAioBounceBuffer *bb) +{ + if (pgaio_my_backend->handed_out_bb != bb) + elog(ERROR, "can only release handed out BB"); + + slist_push_head(&pgaio_my_backend->idle_bbs, &bb->node); + pgaio_my_backend->handed_out_bb = NULL; + + ResourceOwnerForgetAioBounceBuffer(bb->resowner, &bb->resowner_node); + bb->resowner = NULL; +} + +void +pgaio_bounce_buffer_release_resowner(dlist_node *bb_node, bool on_error) +{ + PgAioBounceBuffer *bb = dlist_container(PgAioBounceBuffer, resowner_node, bb_node); + + Assert(bb->resowner); + + if (!on_error) + elog(WARNING, "leaked AIO bounce buffer"); + + pgaio_bounce_buffer_release(bb); +} + +char * +pgaio_bounce_buffer_buffer(PgAioBounceBuffer *bb) +{ + return bb->buffer; +} + +static void +pgaio_bounce_buffer_wait_for_free(void) +{ + static uint32 lastpos = 0; + + if (pgaio_my_backend->num_staged_ios > 0) + { + pgaio_debug(DEBUG2, "submitting %d, while acquiring free bb", + pgaio_my_backend->num_staged_ios); + pgaio_submit_staged(); + } + + for (uint32 i = lastpos; i < lastpos + io_max_concurrency; i++) + { + uint32 thisoff = pgaio_my_backend->io_handle_off + (i % io_max_concurrency); + PgAioHandle *ioh = &pgaio_ctl->io_handles[thisoff]; + + switch (ioh->state) + { + case PGAIO_HS_IDLE: + case PGAIO_HS_HANDED_OUT: + continue; + case PGAIO_HS_DEFINED: /* should have been submitted above */ + case PGAIO_HS_STAGED: + elog(ERROR, "shouldn't get here with io:%d in state %d", + pgaio_io_get_id(ioh), ioh->state); + break; + case PGAIO_HS_COMPLETED_IO: + case PGAIO_HS_SUBMITTED: + if (!slist_is_empty(&ioh->bounce_buffers)) + { + pgaio_debug_io(DEBUG2, ioh, + "waiting for IO to reclaim BB with %d in flight", + dclist_count(&pgaio_my_backend->in_flight_ios)); + + /* see comment in pgaio_io_wait_for_free() about raciness */ + pgaio_io_wait(ioh, ioh->generation); + + if (slist_is_empty(&pgaio_my_backend->idle_bbs)) + elog(WARNING, "empty after wait"); + + if (!slist_is_empty(&pgaio_my_backend->idle_bbs)) + { + lastpos = i; + return; + } + } + break; + case PGAIO_HS_COMPLETED_SHARED: + case PGAIO_HS_COMPLETED_LOCAL: + /* reclaim */ + pgaio_io_reclaim(ioh); + + if (!slist_is_empty(&pgaio_my_backend->idle_bbs)) + { + lastpos = i; + return; + } + break; + } + } + + /* + * The submission above could have caused the IO to complete at any time. + */ + if (slist_is_empty(&pgaio_my_backend->idle_bbs)) + elog(PANIC, "no more bbs"); +} + + + /* -------------------------------------------------------------------------------- * Other * -------------------------------------------------------------------------------- diff --git a/src/backend/storage/aio/aio_init.c b/src/backend/storage/aio/aio_init.c index 885c3940c6626..95b10933fedbf 100644 --- a/src/backend/storage/aio/aio_init.c +++ b/src/backend/storage/aio/aio_init.c @@ -88,6 +88,32 @@ AioHandleDataShmemSize(void) io_max_concurrency)); } +static Size +AioBounceBufferDescShmemSize(void) +{ + Size sz; + + /* PgAioBounceBuffer itself */ + sz = mul_size(sizeof(PgAioBounceBuffer), + mul_size(AioProcs(), io_bounce_buffers)); + + return sz; +} + +static Size +AioBounceBufferDataShmemSize(void) +{ + Size sz; + + /* and the associated buffer */ + sz = mul_size(BLCKSZ, + mul_size(io_bounce_buffers, AioProcs())); + /* memory for alignment */ + sz += BLCKSZ; + + return sz; +} + /* * Choose a suitable value for io_max_concurrency. * @@ -113,6 +139,33 @@ AioChooseMaxConcurrency(void) return Min(max_proportional_pins, 64); } +/* + * Choose a suitable value for io_bounce_buffers. + * + * It's very unlikely to be useful to allocate more bounce buffers for each + * backend than the backend is allowed to pin. Additionally, bounce buffers + * currently are used for writes, it seems very uncommon for more than 10% of + * shared_buffers to be written out concurrently. + * + * XXX: This quickly can take up significant amounts of memory, the logic + * should probably fine tuned. + */ +static int +AioChooseBounceBuffers(void) +{ + uint32 max_backends; + int max_proportional_pins; + + /* Similar logic to LimitAdditionalPins() */ + max_backends = MaxBackends + NUM_AUXILIARY_PROCS; + max_proportional_pins = (NBuffers / 10) / max_backends; + + max_proportional_pins = Max(max_proportional_pins, 1); + + /* apply upper limit */ + return Min(max_proportional_pins, 256); +} + Size AioShmemSize(void) { @@ -136,11 +189,31 @@ AioShmemSize(void) PGC_S_OVERRIDE); } + + /* + * If io_bounce_buffers is -1, we automatically choose a suitable value. + * + * See also comment above. + */ + if (io_bounce_buffers == -1) + { + char buf[32]; + + snprintf(buf, sizeof(buf), "%d", AioChooseBounceBuffers()); + SetConfigOption("io_bounce_buffers", buf, PGC_POSTMASTER, + PGC_S_DYNAMIC_DEFAULT); + if (io_bounce_buffers == -1) /* failed to apply it? */ + SetConfigOption("io_bounce_buffers", buf, PGC_POSTMASTER, + PGC_S_OVERRIDE); + } + sz = add_size(sz, AioCtlShmemSize()); sz = add_size(sz, AioBackendShmemSize()); sz = add_size(sz, AioHandleShmemSize()); sz = add_size(sz, AioHandleIOVShmemSize()); sz = add_size(sz, AioHandleDataShmemSize()); + sz = add_size(sz, AioBounceBufferDescShmemSize()); + sz = add_size(sz, AioBounceBufferDataShmemSize()); /* Reserve space for method specific resources. */ if (pgaio_method_ops->shmem_size) @@ -156,6 +229,9 @@ AioShmemInit(void) uint32 io_handle_off = 0; uint32 iovec_off = 0; uint32 per_backend_iovecs = io_max_concurrency * io_max_combine_limit; + uint32 bounce_buffers_off = 0; + uint32 per_backend_bb = io_bounce_buffers; + char *bounce_buffers_data; pgaio_ctl = (PgAioCtl *) ShmemInitStruct("AioCtl", AioCtlShmemSize(), &found); @@ -167,6 +243,7 @@ AioShmemInit(void) pgaio_ctl->io_handle_count = AioProcs() * io_max_concurrency; pgaio_ctl->iovec_count = AioProcs() * per_backend_iovecs; + pgaio_ctl->bounce_buffers_count = AioProcs() * per_backend_bb; pgaio_ctl->backend_state = (PgAioBackend *) ShmemInitStruct("AioBackend", AioBackendShmemSize(), &found); @@ -179,6 +256,40 @@ AioShmemInit(void) pgaio_ctl->handle_data = (uint64 *) ShmemInitStruct("AioHandleData", AioHandleDataShmemSize(), &found); + pgaio_ctl->bounce_buffers = (PgAioBounceBuffer *) + ShmemInitStruct("AioBounceBufferDesc", AioBounceBufferDescShmemSize(), + &found); + + bounce_buffers_data = + ShmemInitStruct("AioBounceBufferData", AioBounceBufferDataShmemSize(), + &found); + bounce_buffers_data = + (char *) TYPEALIGN(BLCKSZ, (uintptr_t) bounce_buffers_data); + pgaio_ctl->bounce_buffers_data = bounce_buffers_data; + + + /* Initialize IO handles. */ + for (uint64 i = 0; i < pgaio_ctl->io_handle_count; i++) + { + PgAioHandle *ioh = &pgaio_ctl->io_handles[i]; + + ioh->op = PGAIO_OP_INVALID; + ioh->target = PGAIO_TID_INVALID; + ioh->state = PGAIO_HS_IDLE; + + slist_init(&ioh->bounce_buffers); + } + + /* Initialize Bounce Buffers. */ + for (uint64 i = 0; i < pgaio_ctl->bounce_buffers_count; i++) + { + PgAioBounceBuffer *bb = &pgaio_ctl->bounce_buffers[i]; + + bb->buffer = bounce_buffers_data; + bounce_buffers_data += BLCKSZ; + } + + for (int procno = 0; procno < AioProcs(); procno++) { PgAioBackend *bs = &pgaio_ctl->backend_state[procno]; @@ -186,9 +297,13 @@ AioShmemInit(void) bs->io_handle_off = io_handle_off; io_handle_off += io_max_concurrency; + bs->bounce_buffers_off = bounce_buffers_off; + bounce_buffers_off += per_backend_bb; + dclist_init(&bs->idle_ios); memset(bs->staged_ios, 0, sizeof(PgAioHandle *) * PGAIO_SUBMIT_BATCH_SIZE); dclist_init(&bs->in_flight_ios); + slist_init(&bs->idle_bbs); /* initialize per-backend IOs */ for (int i = 0; i < io_max_concurrency; i++) @@ -210,6 +325,14 @@ AioShmemInit(void) dclist_push_tail(&bs->idle_ios, &ioh->node); iovec_off += io_max_combine_limit; } + + /* initialize per-backend bounce buffers */ + for (int i = 0; i < per_backend_bb; i++) + { + PgAioBounceBuffer *bb = &pgaio_ctl->bounce_buffers[bs->bounce_buffers_off + i]; + + slist_push_head(&bs->idle_bbs, &bb->node); + } } out: diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 4eaeca89f2c7a..bd49b302293c4 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -3293,6 +3293,19 @@ struct config_int ConfigureNamesInt[] = check_io_max_concurrency, NULL, NULL }, + { + {"io_bounce_buffers", + PGC_POSTMASTER, + RESOURCES_IO, + gettext_noop("Number of IO Bounce Buffers reserved for each backend."), + NULL, + GUC_UNIT_BLOCKS + }, + &io_bounce_buffers, + -1, -1, 4096, + NULL, NULL, NULL + }, + { {"io_workers", PGC_SIGHUP, diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index ff56a1f0732c5..2c6456e907f54 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -211,6 +211,8 @@ # -1 sets based on shared_buffers # (change requires restart) #io_workers = 3 # 1-32; +#io_bounce_buffers = -1 # -1 sets based on shared_buffers + # (change requires restart) # - Worker Processes - diff --git a/src/backend/utils/resowner/resowner.c b/src/backend/utils/resowner/resowner.c index d39f3e1b655cd..81e7e27965a6f 100644 --- a/src/backend/utils/resowner/resowner.c +++ b/src/backend/utils/resowner/resowner.c @@ -159,10 +159,11 @@ struct ResourceOwnerData LOCALLOCK *locks[MAX_RESOWNER_LOCKS]; /* list of owned locks */ /* - * AIO handles need be registered in critical sections and therefore - * cannot use the normal ResourceElem mechanism. + * AIO handles & bounce buffers need be registered in critical sections + * and therefore cannot use the normal ResourceElem mechanism. */ dlist_head aio_handles; + dlist_head aio_bounce_buffers; }; @@ -434,6 +435,7 @@ ResourceOwnerCreate(ResourceOwner parent, const char *name) } dlist_init(&owner->aio_handles); + dlist_init(&owner->aio_bounce_buffers); return owner; } @@ -742,6 +744,13 @@ ResourceOwnerReleaseInternal(ResourceOwner owner, pgaio_io_release_resowner(node, !isCommit); } + + while (!dlist_is_empty(&owner->aio_bounce_buffers)) + { + dlist_node *node = dlist_head_node(&owner->aio_bounce_buffers); + + pgaio_bounce_buffer_release_resowner(node, !isCommit); + } } else if (phase == RESOURCE_RELEASE_LOCKS) { @@ -1111,3 +1120,15 @@ ResourceOwnerForgetAioHandle(ResourceOwner owner, struct dlist_node *ioh_node) { dlist_delete_from(&owner->aio_handles, ioh_node); } + +void +ResourceOwnerRememberAioBounceBuffer(ResourceOwner owner, struct dlist_node *ioh_node) +{ + dlist_push_tail(&owner->aio_bounce_buffers, ioh_node); +} + +void +ResourceOwnerForgetAioBounceBuffer(ResourceOwner owner, struct dlist_node *ioh_node) +{ + dlist_delete_from(&owner->aio_bounce_buffers, ioh_node); +} diff --git a/src/include/storage/aio.h b/src/include/storage/aio.h index bfe0d93683b1a..f91f0afc5a580 100644 --- a/src/include/storage/aio.h +++ b/src/include/storage/aio.h @@ -353,6 +353,20 @@ extern bool pgaio_have_staged(void); +/* -------------------------------------------------------------------------------- + * Bounce Buffers + * -------------------------------------------------------------------------------- + */ + +extern PgAioBounceBuffer *pgaio_bounce_buffer_get(void); +extern void pgaio_io_assoc_bounce_buffer(PgAioHandle *ioh, PgAioBounceBuffer *bb); +extern uint32 pgaio_bounce_buffer_id(PgAioBounceBuffer *bb); +extern void pgaio_bounce_buffer_release(PgAioBounceBuffer *bb); +extern char *pgaio_bounce_buffer_buffer(PgAioBounceBuffer *bb); +extern void pgaio_bounce_buffer_release_resowner(struct dlist_node *bb_node, bool on_error); + + + /* -------------------------------------------------------------------------------- * Other * -------------------------------------------------------------------------------- @@ -365,6 +379,7 @@ extern void pgaio_closing_fd(int fd); /* GUCs */ extern PGDLLIMPORT int io_method; extern PGDLLIMPORT int io_max_concurrency; +extern PGDLLIMPORT int io_bounce_buffers; #endif /* AIO_H */ diff --git a/src/include/storage/aio_internal.h b/src/include/storage/aio_internal.h index 7f18da2c85651..833f97361a1ff 100644 --- a/src/include/storage/aio_internal.h +++ b/src/include/storage/aio_internal.h @@ -127,6 +127,12 @@ struct PgAioHandle /* raw result of the IO operation */ int32 result; + /* + * List of bounce_buffers owned by IO. It would suffice to use an index + * based linked list here. + */ + slist_head bounce_buffers; + /** * In which list the handle is registered, depends on the state: * - IDLE, in per-backend list @@ -182,11 +188,24 @@ struct PgAioHandle }; +/* typedef is in aio_types.h */ +struct PgAioBounceBuffer +{ + slist_node node; + struct ResourceOwnerData *resowner; + dlist_node resowner_node; + char *buffer; +}; + + typedef struct PgAioBackend { /* index into PgAioCtl->io_handles */ uint32 io_handle_off; + /* index into PgAioCtl->bounce_buffers */ + uint32 bounce_buffers_off; + /* IO Handles that currently are not used */ dclist_head idle_ios; @@ -217,6 +236,12 @@ typedef struct PgAioBackend * IOs being appended at the end. */ dclist_head in_flight_ios; + + /* Bounce Buffers that currently are not used */ + slist_head idle_bbs; + + /* see handed_out_io */ + PgAioBounceBuffer *handed_out_bb; } PgAioBackend; @@ -244,6 +269,15 @@ typedef struct PgAioCtl uint32 io_handle_count; PgAioHandle *io_handles; + + /* + * To perform AIO on buffers that are not located in shared memory (either + * because they are not in shared memory or because we need to operate on + * a copy, as e.g. the case for writes when checksums are in use) + */ + uint32 bounce_buffers_count; + PgAioBounceBuffer *bounce_buffers; + char *bounce_buffers_data; } PgAioCtl; diff --git a/src/include/storage/aio_types.h b/src/include/storage/aio_types.h index 181833660778e..3c18dade49cb9 100644 --- a/src/include/storage/aio_types.h +++ b/src/include/storage/aio_types.h @@ -134,4 +134,6 @@ typedef struct PgAioReturn } PgAioReturn; +typedef struct PgAioBounceBuffer PgAioBounceBuffer; + #endif /* AIO_TYPES_H */ diff --git a/src/include/utils/resowner.h b/src/include/utils/resowner.h index aede4bfc820a3..7e2ec2241693f 100644 --- a/src/include/utils/resowner.h +++ b/src/include/utils/resowner.h @@ -168,5 +168,7 @@ extern void ResourceOwnerForgetLock(ResourceOwner owner, struct LOCALLOCK *local struct dlist_node; extern void ResourceOwnerRememberAioHandle(ResourceOwner owner, struct dlist_node *ioh_node); extern void ResourceOwnerForgetAioHandle(ResourceOwner owner, struct dlist_node *ioh_node); +extern void ResourceOwnerRememberAioBounceBuffer(ResourceOwner owner, struct dlist_node *bb_node); +extern void ResourceOwnerForgetAioBounceBuffer(ResourceOwner owner, struct dlist_node *bb_node); #endif /* RESOWNER_H */ diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql index e495481c41e43..e2a8123516626 100644 --- a/src/test/modules/test_aio/test_aio--1.0.sql +++ b/src/test/modules/test_aio/test_aio--1.0.sql @@ -87,6 +87,27 @@ RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME' LANGUAGE C; +CREATE FUNCTION bb_get_and_error() +RETURNS pg_catalog.void STRICT +AS 'MODULE_PATHNAME' LANGUAGE C; + +CREATE FUNCTION bb_get_twice() +RETURNS pg_catalog.void STRICT +AS 'MODULE_PATHNAME' LANGUAGE C; + +CREATE FUNCTION bb_get() +RETURNS pg_catalog.void STRICT +AS 'MODULE_PATHNAME' LANGUAGE C; + +CREATE FUNCTION bb_get_release() +RETURNS pg_catalog.void STRICT +AS 'MODULE_PATHNAME' LANGUAGE C; + +CREATE FUNCTION bb_release_last() +RETURNS pg_catalog.void STRICT +AS 'MODULE_PATHNAME' LANGUAGE C; + + /* * Injection point related functions diff --git a/src/test/modules/test_aio/test_aio.c b/src/test/modules/test_aio/test_aio.c index bef0ecd90078f..d5ad374b3cef1 100644 --- a/src/test/modules/test_aio/test_aio.c +++ b/src/test/modules/test_aio/test_aio.c @@ -52,6 +52,7 @@ static shmem_startup_hook_type prev_shmem_startup_hook = NULL; static PgAioHandle *last_handle; +static PgAioBounceBuffer *last_bb; @@ -671,6 +672,60 @@ batch_end(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } +PG_FUNCTION_INFO_V1(bb_get); +Datum +bb_get(PG_FUNCTION_ARGS) +{ + last_bb = pgaio_bounce_buffer_get(); + + PG_RETURN_VOID(); +} + +PG_FUNCTION_INFO_V1(bb_release_last); +Datum +bb_release_last(PG_FUNCTION_ARGS) +{ + if (!last_bb) + elog(ERROR, "no bb"); + + pgaio_bounce_buffer_release(last_bb); + + PG_RETURN_VOID(); +} + +PG_FUNCTION_INFO_V1(bb_get_and_error); +Datum +bb_get_and_error(PG_FUNCTION_ARGS) +{ + pgaio_bounce_buffer_get(); + + elog(ERROR, "as you command"); + PG_RETURN_VOID(); +} + +PG_FUNCTION_INFO_V1(bb_get_twice); +Datum +bb_get_twice(PG_FUNCTION_ARGS) +{ + pgaio_bounce_buffer_get(); + pgaio_bounce_buffer_get(); + + PG_RETURN_VOID(); +} + + +PG_FUNCTION_INFO_V1(bb_get_release); +Datum +bb_get_release(PG_FUNCTION_ARGS) +{ + PgAioBounceBuffer *bb; + + bb = pgaio_bounce_buffer_get(); + pgaio_bounce_buffer_release(bb); + + PG_RETURN_VOID(); +} + #ifdef USE_INJECTION_POINTS extern PGDLLEXPORT void inj_io_short_read(const char *name, const void *private_data); extern PGDLLEXPORT void inj_io_reopen(const char *name, const void *private_data); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index b66cecd879910..3a67ee01b4692 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2137,6 +2137,7 @@ PermutationStep PermutationStepBlocker PermutationStepBlockerType PgAioBackend +PgAioBounceBuffer PgAioCtl PgAioHandle PgAioHandleCallbackID From 39400aebdd4fd7dc2206784a45acbd25b6798bfb Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Sun, 30 Mar 2025 16:43:54 -0400 Subject: [PATCH 05/12] bufmgr: Implement AIO write support As of this commit there are no users of these AIO facilities, that'll come in later commits. Problems with AIO writes: - Write logic needs to be rebased on-top of the patch series to not hit bit dirty buffers while IO is going on The performance impact of doing the memory copies is rather substantial, as on intel memory bandwidth is *the* IO bottleneck even just for the checksum computation, without a copy. That makes the memory copy for something like bounce buffers hurt really badly. And the memory usage of bounce buffers is also really concerning. And even without checksums, several filesystems *really* don't like buffers getting modified during DIO writes. Which I think would mean we ought to use bounce buffers for *all* writes, which would impose a *very* substantial overhead (basically removing the benefit of DMA happening off-cpu). - I think it requires new lwlock.c infrastructure (as v1 of aio had), to make LockBuffer(BUFFER_LOCK_EXCLUSIVE) etc wait in a concurrency safe manner for in-progress writes I can think of ways to solve this purely in bufmgr.c, but only in ways that would cause other problems (e.g. setting BM_IO_IN_PROGRESS before waiting for an exclusive lock) and/or expensive. --- src/backend/storage/aio/aio_callback.c | 2 + src/backend/storage/buffer/bufmgr.c | 189 ++++++++++++++++++++++++- src/include/storage/aio.h | 4 +- src/include/storage/bufmgr.h | 2 + 4 files changed, 190 insertions(+), 7 deletions(-) diff --git a/src/backend/storage/aio/aio_callback.c b/src/backend/storage/aio/aio_callback.c index abe5b191b4270..21665e7eccb31 100644 --- a/src/backend/storage/aio/aio_callback.c +++ b/src/backend/storage/aio/aio_callback.c @@ -44,8 +44,10 @@ static const PgAioHandleCallbacksEntry aio_handle_cbs[] = { CALLBACK_ENTRY(PGAIO_HCB_MD_WRITEV, aio_md_writev_cb), CALLBACK_ENTRY(PGAIO_HCB_SHARED_BUFFER_READV, aio_shared_buffer_readv_cb), + CALLBACK_ENTRY(PGAIO_HCB_SHARED_BUFFER_WRITEV, aio_shared_buffer_writev_cb), CALLBACK_ENTRY(PGAIO_HCB_LOCAL_BUFFER_READV, aio_local_buffer_readv_cb), + CALLBACK_ENTRY(PGAIO_HCB_LOCAL_BUFFER_WRITEV, aio_local_buffer_writev_cb), #undef CALLBACK_ENTRY }; diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 1c37d7dfe2f92..9bf30c44af015 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -5536,7 +5536,15 @@ LockBuffer(Buffer buffer, int mode) else if (mode == BUFFER_LOCK_SHARE) LWLockAcquire(BufferDescriptorGetContentLock(buf), LW_SHARED); else if (mode == BUFFER_LOCK_EXCLUSIVE) + { + /* + * FIXME: Wait for AIO writes, otherwise there would be a risk of + * deadlock. This isn't entirely trivial to do in a race-free way, IO + * could be started between us checking whether there is IO and + * enqueueing ourselves for the lock. + */ LWLockAcquire(BufferDescriptorGetContentLock(buf), LW_EXCLUSIVE); + } else elog(ERROR, "unrecognized buffer lock mode: %d", mode); } @@ -5551,6 +5559,19 @@ ConditionalLockBuffer(Buffer buffer) { BufferDesc *buf; + /* + * FIXME: Wait for AIO writes. Some code does not deal well + * ConditionalLockBuffer() continuously failing, e.g. + * spginsert()->spgdoinsert() ends up busy-looping (easily reproducible by + * just making this function always fail and running the regression + * tests). While that code could be fixed, it'd be hard to find all + * problematic places. + * + * It would be OK to wait for the IO as waiting for IO completion does not + * need to wait for any locks that could lead to an undetected deadlock or + * such. + */ + Assert(BufferIsPinned(buffer)); if (BufferIsLocal(buffer)) return true; /* act as though we got it */ @@ -5614,10 +5635,8 @@ LockBufferForCleanup(Buffer buffer) CheckBufferIsPinnedOnce(buffer); /* - * We do not yet need to be worried about in-progress AIOs holding a pin, - * as we, so far, only support doing reads via AIO and this function can - * only be called once the buffer is valid (i.e. no read can be in - * flight). + * FIXME: See AIO related comments in LockBuffer() and + * ConditionalLockBuffer() */ /* Nobody else to wait for */ @@ -5630,6 +5649,11 @@ LockBufferForCleanup(Buffer buffer) { uint32 buf_state; + /* + * FIXME: LockBuffer()'s handling of in-progress writes (once + * implemented) should suffice to deal with deadlock risk. + */ + /* Try to acquire lock */ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); buf_state = LockBufHdr(bufHdr); @@ -5777,7 +5801,13 @@ ConditionalLockBufferForCleanup(Buffer buffer) Assert(BufferIsValid(buffer)); - /* see AIO related comment in LockBufferForCleanup() */ + /* + * FIXME: Should wait for IO for the same reason as in + * ConditionalLockBuffer(). Needs to happen before the + * ConditionalLockBuffer() call below, as we'd never reach the + * ConditionalLockBuffer() call due the buffer pin held for the duration + * of the IO. + */ if (BufferIsLocal(buffer)) { @@ -5834,7 +5864,10 @@ IsBufferCleanupOK(Buffer buffer) Assert(BufferIsValid(buffer)); - /* see AIO related comment in LockBufferForCleanup() */ + /* + * FIXME: See AIO related comments in LockBuffer() and + * ConditionalLockBuffer() + */ if (BufferIsLocal(buffer)) { @@ -7140,12 +7173,129 @@ buffer_readv_report(PgAioResult result, const PgAioTargetData *td, affected_count > 1 ? errhint_internal(hint_mult, affected_count - 1) : 0); } +/* + * Helper for AIO writev completion callbacks, supporting both shared and temp + * buffers. Gets called once for each buffer in a multi-page write. + */ +static pg_attribute_always_inline PgAioResult +buffer_writev_complete_one(uint8 buf_off, Buffer buffer, uint8 flags, + bool failed, bool is_temp) +{ + BufferDesc *buf_hdr = is_temp ? + GetLocalBufferDescriptor(-buffer - 1) + : GetBufferDescriptor(buffer - 1); + PgAioResult result = {.status = PGAIO_RS_OK}; + bool clear_dirty; + uint32 set_flag_bits; + +#ifdef USE_ASSERT_CHECKING + { + uint32 buf_state = pg_atomic_read_u32(&buf_hdr->state); + + Assert(buf_state & BM_VALID); + Assert(buf_state & BM_TAG_VALID); + /* temp buffers don't use BM_IO_IN_PROGRESS */ + if (!is_temp) + Assert(buf_state & BM_IO_IN_PROGRESS); + Assert(buf_state & BM_DIRTY); + } +#endif + + clear_dirty = failed ? false : true; + set_flag_bits = failed ? BM_IO_ERROR : 0; + + if (is_temp) + TerminateLocalBufferIO(buf_hdr, clear_dirty, set_flag_bits, true); + else + TerminateBufferIO(buf_hdr, clear_dirty, set_flag_bits, false, true); + + /* + * The initiator of IO is not managing the lock (i.e. we called + * LWLockDisown()), we are. + */ + if (!is_temp) + LWLockReleaseDisowned(BufferDescriptorGetContentLock(buf_hdr), + LW_SHARED); + + /* FIXME: tracepoint */ + + return result; +} + +/* + * Perform completion handling of a single AIO write. This write may cover + * multiple blocks / buffers. + * + * Shared between shared and local buffers, to reduce code duplication. + */ +static pg_attribute_always_inline PgAioResult +buffer_writev_complete(PgAioHandle *ioh, PgAioResult prior_result, + uint8 cb_data, bool is_temp) +{ + PgAioResult result = prior_result; + PgAioTargetData *td = pgaio_io_get_target_data(ioh); + uint64 *io_data; + uint8 handle_data_len; + + if (is_temp) + { + Assert(td->smgr.is_temp); + Assert(pgaio_io_get_owner(ioh) == MyProcNumber); + } + else + Assert(!td->smgr.is_temp); + + /* + * Iterate over all the buffers affected by this IO and call appropriate + * per-buffer completion function for each buffer. + */ + io_data = pgaio_io_get_handle_data(ioh, &handle_data_len); + for (uint8 buf_off = 0; buf_off < handle_data_len; buf_off++) + { + Buffer buf = io_data[buf_off]; + PgAioResult buf_result; + bool failed; + + Assert(BufferIsValid(buf)); + + /* + * If the entire failed on a lower-level, each buffer needs to be + * marked as failed. In case of a partial read, some buffers may be + * ok. + */ + failed = + prior_result.status == PGAIO_RS_ERROR + || prior_result.result <= buf_off; + + buf_result = buffer_writev_complete_one(buf_off, buf, cb_data, failed, + is_temp); + + /* + * If there wasn't any prior error and the IO for this page failed in + * some form, set the whole IO's to the page's result. + */ + if (result.status != PGAIO_RS_ERROR && buf_result.status != PGAIO_RS_OK) + { + result = buf_result; + pgaio_result_report(result, td, LOG); + } + } + + return result; +} + static void shared_buffer_readv_stage(PgAioHandle *ioh, uint8 cb_data) { buffer_stage_common(ioh, false, false); } +static void +shared_buffer_writev_stage(PgAioHandle *ioh, uint8 cb_data) +{ + buffer_stage_common(ioh, true, false); +} + static PgAioResult shared_buffer_readv_complete(PgAioHandle *ioh, PgAioResult prior_result, uint8 cb_data) @@ -7191,6 +7341,13 @@ shared_buffer_readv_complete_local(PgAioHandle *ioh, PgAioResult prior_result, return prior_result; } +static PgAioResult +shared_buffer_writev_complete(PgAioHandle *ioh, PgAioResult prior_result, + uint8 cb_data) +{ + return buffer_writev_complete(ioh, prior_result, cb_data, false); +} + static void local_buffer_readv_stage(PgAioHandle *ioh, uint8 cb_data) { @@ -7204,6 +7361,17 @@ local_buffer_readv_complete(PgAioHandle *ioh, PgAioResult prior_result, return buffer_readv_complete(ioh, prior_result, cb_data, true); } +static void +local_buffer_writev_stage(PgAioHandle *ioh, uint8 cb_data) +{ + /* + * Currently this is unreachable as the only write support is for + * checkpointer / bgwriter, which don't deal with local buffers. + */ + elog(ERROR, "should be unreachable"); +} + + /* readv callback is passed READ_BUFFERS_* flags as callback data */ const PgAioHandleCallbacks aio_shared_buffer_readv_cb = { .stage = shared_buffer_readv_stage, @@ -7213,6 +7381,11 @@ const PgAioHandleCallbacks aio_shared_buffer_readv_cb = { .report = buffer_readv_report, }; +const PgAioHandleCallbacks aio_shared_buffer_writev_cb = { + .stage = shared_buffer_writev_stage, + .complete_shared = shared_buffer_writev_complete, +}; + /* readv callback is passed READ_BUFFERS_* flags as callback data */ const PgAioHandleCallbacks aio_local_buffer_readv_cb = { .stage = local_buffer_readv_stage, @@ -7226,3 +7399,7 @@ const PgAioHandleCallbacks aio_local_buffer_readv_cb = { .complete_local = local_buffer_readv_complete, .report = buffer_readv_report, }; + +const PgAioHandleCallbacks aio_local_buffer_writev_cb = { + .stage = local_buffer_writev_stage, +}; diff --git a/src/include/storage/aio.h b/src/include/storage/aio.h index f91f0afc5a580..72d5680e767e9 100644 --- a/src/include/storage/aio.h +++ b/src/include/storage/aio.h @@ -197,11 +197,13 @@ typedef enum PgAioHandleCallbackID PGAIO_HCB_MD_WRITEV, PGAIO_HCB_SHARED_BUFFER_READV, + PGAIO_HCB_SHARED_BUFFER_WRITEV, PGAIO_HCB_LOCAL_BUFFER_READV, + PGAIO_HCB_LOCAL_BUFFER_WRITEV, } PgAioHandleCallbackID; -#define PGAIO_HCB_MAX PGAIO_HCB_LOCAL_BUFFER_READV +#define PGAIO_HCB_MAX PGAIO_HCB_LOCAL_BUFFER_WRITEV StaticAssertDecl(PGAIO_HCB_MAX <= (1 << PGAIO_RESULT_ID_BITS), "PGAIO_HCB_MAX is too big for PGAIO_RESULT_ID_BITS"); diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index f2192ceb2719b..492feab0cb561 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -174,7 +174,9 @@ extern PGDLLIMPORT int backend_flush_after; extern PGDLLIMPORT int bgwriter_flush_after; extern const PgAioHandleCallbacks aio_shared_buffer_readv_cb; +extern const PgAioHandleCallbacks aio_shared_buffer_writev_cb; extern const PgAioHandleCallbacks aio_local_buffer_readv_cb; +extern const PgAioHandleCallbacks aio_local_buffer_writev_cb; /* in buf_init.c */ extern PGDLLIMPORT char *BufferBlocks; From 66d2e25a83daad923c0cbd77871b5bc074ae972d Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Tue, 18 Mar 2025 14:40:06 -0400 Subject: [PATCH 06/12] aio: Add IO queue helper This is likely never going to anywhere - Thomas Munro is working on something more complete. But I needed a way to exercise aio for checkpointer / bgwriter. --- src/backend/storage/aio/Makefile | 1 + src/backend/storage/aio/io_queue.c | 204 ++++++++++++++++++++++++++++ src/backend/storage/aio/meson.build | 1 + src/include/storage/io_queue.h | 33 +++++ src/tools/pgindent/typedefs.list | 2 + 5 files changed, 241 insertions(+) create mode 100644 src/backend/storage/aio/io_queue.c create mode 100644 src/include/storage/io_queue.h diff --git a/src/backend/storage/aio/Makefile b/src/backend/storage/aio/Makefile index 3f2469cc39945..86fa4276fda99 100644 --- a/src/backend/storage/aio/Makefile +++ b/src/backend/storage/aio/Makefile @@ -15,6 +15,7 @@ OBJS = \ aio_init.o \ aio_io.o \ aio_target.o \ + io_queue.o \ method_io_uring.o \ method_sync.o \ method_worker.o \ diff --git a/src/backend/storage/aio/io_queue.c b/src/backend/storage/aio/io_queue.c new file mode 100644 index 0000000000000..526aa1d5e06da --- /dev/null +++ b/src/backend/storage/aio/io_queue.c @@ -0,0 +1,204 @@ +/*------------------------------------------------------------------------- + * + * io_queue.c + * AIO - Mechanism for tracking many IOs + * + * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/storage/aio/io_queue.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "lib/ilist.h" +#include "storage/aio.h" +#include "storage/io_queue.h" +#include "utils/resowner.h" + + + +typedef struct TrackedIO +{ + PgAioWaitRef iow; + dlist_node node; +} TrackedIO; + +struct IOQueue +{ + int depth; + int unsubmitted; + + bool has_reserved; + + dclist_head idle; + dclist_head in_progress; + + TrackedIO tracked_ios[FLEXIBLE_ARRAY_MEMBER]; +}; + + +IOQueue * +io_queue_create(int depth, int flags) +{ + size_t sz; + IOQueue *ioq; + + sz = offsetof(IOQueue, tracked_ios) + + sizeof(TrackedIO) * depth; + + ioq = palloc0(sz); + + ioq->depth = 0; + + for (int i = 0; i < depth; i++) + { + TrackedIO *tio = &ioq->tracked_ios[i]; + + pgaio_wref_clear(&tio->iow); + dclist_push_tail(&ioq->idle, &tio->node); + } + + return ioq; +} + +void +io_queue_wait_one(IOQueue *ioq) +{ + /* submit all pending IO before waiting */ + pgaio_submit_staged(); + + while (!dclist_is_empty(&ioq->in_progress)) + { + /* FIXME: Should we really pop here already? */ + dlist_node *node = dclist_pop_head_node(&ioq->in_progress); + TrackedIO *tio = dclist_container(TrackedIO, node, node); + + pgaio_wref_wait(&tio->iow); + dclist_push_head(&ioq->idle, &tio->node); + } +} + +void +io_queue_reserve(IOQueue *ioq) +{ + if (ioq->has_reserved) + return; + + if (dclist_is_empty(&ioq->idle)) + io_queue_wait_one(ioq); + + Assert(!dclist_is_empty(&ioq->idle)); + + ioq->has_reserved = true; +} + +PgAioHandle * +io_queue_acquire_io(IOQueue *ioq) +{ + PgAioHandle *ioh; + + io_queue_reserve(ioq); + + Assert(!dclist_is_empty(&ioq->idle)); + + if (!io_queue_is_empty(ioq)) + { + ioh = pgaio_io_acquire_nb(CurrentResourceOwner, NULL); + if (ioh == NULL) + { + /* + * Need to wait for all IOs, blocking might not be legal in the + * context. + * + * XXX: This doesn't make a whole lot of sense, we're also + * blocking here. What was I smoking when I wrote the above? + */ + io_queue_wait_all(ioq); + ioh = pgaio_io_acquire(CurrentResourceOwner, NULL); + } + } + else + { + ioh = pgaio_io_acquire(CurrentResourceOwner, NULL); + } + + return ioh; +} + +void +io_queue_track(IOQueue *ioq, const struct PgAioWaitRef *iow) +{ + dlist_node *node; + TrackedIO *tio; + + Assert(ioq->has_reserved); + ioq->has_reserved = false; + + Assert(!dclist_is_empty(&ioq->idle)); + + node = dclist_pop_head_node(&ioq->idle); + tio = dclist_container(TrackedIO, node, node); + + tio->iow = *iow; + + dclist_push_tail(&ioq->in_progress, &tio->node); + + ioq->unsubmitted++; + + /* + * XXX: Should have some smarter logic here. We don't want to wait too + * long to submit, that'll mean we're more likely to block. But we also + * don't want to have the overhead of submitting every IO individually. + */ + if (ioq->unsubmitted >= 4) + { + pgaio_submit_staged(); + ioq->unsubmitted = 0; + } +} + +void +io_queue_wait_all(IOQueue *ioq) +{ + /* submit all pending IO before waiting */ + pgaio_submit_staged(); + + while (!dclist_is_empty(&ioq->in_progress)) + { + /* wait for the last IO to minimize unnecessary wakeups */ + dlist_node *node = dclist_tail_node(&ioq->in_progress); + TrackedIO *tio = dclist_container(TrackedIO, node, node); + + if (!pgaio_wref_check_done(&tio->iow)) + { + ereport(DEBUG3, + errmsg("io_queue_wait_all for io:%d", + pgaio_wref_get_id(&tio->iow)), + errhidestmt(true), + errhidecontext(true)); + + pgaio_wref_wait(&tio->iow); + } + + dclist_delete_from(&ioq->in_progress, &tio->node); + dclist_push_head(&ioq->idle, &tio->node); + } +} + +bool +io_queue_is_empty(IOQueue *ioq) +{ + return dclist_is_empty(&ioq->in_progress); +} + +void +io_queue_free(IOQueue *ioq) +{ + io_queue_wait_all(ioq); + + pfree(ioq); +} diff --git a/src/backend/storage/aio/meson.build b/src/backend/storage/aio/meson.build index da6df2d3654f9..270c4a64428b1 100644 --- a/src/backend/storage/aio/meson.build +++ b/src/backend/storage/aio/meson.build @@ -7,6 +7,7 @@ backend_sources += files( 'aio_init.c', 'aio_io.c', 'aio_target.c', + 'io_queue.c', 'method_io_uring.c', 'method_sync.c', 'method_worker.c', diff --git a/src/include/storage/io_queue.h b/src/include/storage/io_queue.h new file mode 100644 index 0000000000000..92b1e9afe6f59 --- /dev/null +++ b/src/include/storage/io_queue.h @@ -0,0 +1,33 @@ +/*------------------------------------------------------------------------- + * + * io_queue.h + * Mechanism for tracking many IOs + * + * + * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/storage/io_queue.h + * + *------------------------------------------------------------------------- + */ +#ifndef IO_QUEUE_H +#define IO_QUEUE_H + +#include "storage/aio_types.h" + +struct IOQueue; +typedef struct IOQueue IOQueue; + +struct PgAioWaitRef; + +extern IOQueue *io_queue_create(int depth, int flags); +extern void io_queue_track(IOQueue *ioq, const PgAioWaitRef *iow); +extern void io_queue_wait_one(IOQueue *ioq); +extern void io_queue_wait_all(IOQueue *ioq); +extern bool io_queue_is_empty(IOQueue *ioq); +extern void io_queue_reserve(IOQueue *ioq); +extern PgAioHandle *io_queue_acquire_io(IOQueue *ioq); +extern void io_queue_free(IOQueue *ioq); + +#endif /* IO_QUEUE_H */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 3a67ee01b4692..0c6ddadc51daa 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1196,6 +1196,7 @@ IOContext IOFuncSelector IOObject IOOp +IOQueue IO_STATUS_BLOCK IPCompareMethod ITEM @@ -3022,6 +3023,7 @@ TocEntry TokenAuxData TokenizedAuthLine TrackItem +TrackedIO TransApplyAction TransInvalidationInfo TransState From 5097acd9f621e38407a2d369c75a005d43d37fdd Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Tue, 18 Mar 2025 14:40:06 -0400 Subject: [PATCH 07/12] bufmgr: use AIO in checkpointer, bgwriter This is far from ready - just included to be able to exercise AIO writes and get some preliminary numbers. In all likelihood this will instead be based on-top of work by Thomas Munro instead of the preceding commit. TODO; - This doesn't implement bgwriter_flush_after, checkpointer_flush_after I think that's not too hard to do, it's mainly round tuits. - The queuing logic doesn't carefully respect pin limits That might be ok for checkpointer and bgwriter, but the infrastructure should be usable outside of this as well. --- src/backend/postmaster/bgwriter.c | 19 +- src/backend/postmaster/checkpointer.c | 11 +- src/backend/storage/buffer/bufmgr.c | 594 +++++++++++++++++++++++--- src/backend/storage/page/bufpage.c | 10 + src/include/postmaster/bgwriter.h | 3 +- src/include/storage/buf_internals.h | 2 + src/include/storage/bufmgr.h | 3 +- src/include/storage/bufpage.h | 1 + src/tools/pgindent/typedefs.list | 1 + 9 files changed, 586 insertions(+), 58 deletions(-) diff --git a/src/backend/postmaster/bgwriter.c b/src/backend/postmaster/bgwriter.c index 72f5acceec78d..6e8801a39e329 100644 --- a/src/backend/postmaster/bgwriter.c +++ b/src/backend/postmaster/bgwriter.c @@ -38,11 +38,13 @@ #include "postmaster/auxprocess.h" #include "postmaster/bgwriter.h" #include "postmaster/interrupt.h" +#include "storage/aio.h" #include "storage/aio_subsys.h" #include "storage/buf_internals.h" #include "storage/bufmgr.h" #include "storage/condition_variable.h" #include "storage/fd.h" +#include "storage/io_queue.h" #include "storage/lwlock.h" #include "storage/proc.h" #include "storage/procsignal.h" @@ -90,6 +92,7 @@ BackgroundWriterMain(const void *startup_data, size_t startup_data_len) sigjmp_buf local_sigjmp_buf; MemoryContext bgwriter_context; bool prev_hibernate; + IOQueue *ioq; WritebackContext wb_context; Assert(startup_data_len == 0); @@ -131,6 +134,7 @@ BackgroundWriterMain(const void *startup_data, size_t startup_data_len) ALLOCSET_DEFAULT_SIZES); MemoryContextSwitchTo(bgwriter_context); + ioq = io_queue_create(128, 0); WritebackContextInit(&wb_context, &bgwriter_flush_after); /* @@ -228,12 +232,22 @@ BackgroundWriterMain(const void *startup_data, size_t startup_data_len) /* Clear any already-pending wakeups */ ResetLatch(MyLatch); + /* + * FIXME: this is theoretically racy, but I didn't want to copy + * ProcessMainLoopInterrupts() remaining body here. + */ + if (ShutdownRequestPending) + { + io_queue_wait_all(ioq); + io_queue_free(ioq); + } + ProcessMainLoopInterrupts(); /* * Do one cycle of dirty-buffer writing. */ - can_hibernate = BgBufferSync(&wb_context); + can_hibernate = BgBufferSync(ioq, &wb_context); /* Report pending statistics to the cumulative stats system */ pgstat_report_bgwriter(); @@ -250,6 +264,9 @@ BackgroundWriterMain(const void *startup_data, size_t startup_data_len) smgrdestroyall(); } + /* finish IO before sleeping, to avoid blocking other backends */ + io_queue_wait_all(ioq); + /* * Log a new xl_running_xacts every now and then so replication can * get into a consistent state faster (think of suboverflowed diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c index fda91ffd1ce2d..904fe167eb4d0 100644 --- a/src/backend/postmaster/checkpointer.c +++ b/src/backend/postmaster/checkpointer.c @@ -49,10 +49,12 @@ #include "postmaster/bgwriter.h" #include "postmaster/interrupt.h" #include "replication/syncrep.h" +#include "storage/aio.h" #include "storage/aio_subsys.h" #include "storage/bufmgr.h" #include "storage/condition_variable.h" #include "storage/fd.h" +#include "storage/io_queue.h" #include "storage/ipc.h" #include "storage/lwlock.h" #include "storage/pmsignal.h" @@ -766,7 +768,7 @@ ImmediateCheckpointRequested(void) * fraction between 0.0 meaning none, and 1.0 meaning all done. */ void -CheckpointWriteDelay(int flags, double progress) +CheckpointWriteDelay(IOQueue *ioq, int flags, double progress) { static int absorb_counter = WRITES_PER_ABSORB; @@ -800,6 +802,13 @@ CheckpointWriteDelay(int flags, double progress) /* Report interim statistics to the cumulative stats system */ pgstat_report_checkpointer(); + /* + * Ensure all pending IO is submitted to avoid unnecessary delays for + * other processes. + */ + io_queue_wait_all(ioq); + + /* * This sleep used to be connected to bgwriter_delay, typically 200ms. * That resulted in more frequent wakeups if not much work to do. diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 9bf30c44af015..d9a362d755393 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -52,6 +52,7 @@ #include "storage/buf_internals.h" #include "storage/bufmgr.h" #include "storage/fd.h" +#include "storage/io_queue.h" #include "storage/ipc.h" #include "storage/lmgr.h" #include "storage/proc.h" @@ -76,6 +77,7 @@ /* Bits in SyncOneBuffer's return value */ #define BUF_WRITTEN 0x01 #define BUF_REUSABLE 0x02 +#define BUF_CANT_MERGE 0x04 #define RELS_BSEARCH_THRESHOLD 20 @@ -515,8 +517,6 @@ static void UnpinBuffer(BufferDesc *buf); static void UnpinBufferNoOwner(BufferDesc *buf); static void BufferSync(int flags); static uint32 WaitBufHdrUnlocked(BufferDesc *buf); -static int SyncOneBuffer(int buf_id, bool skip_recently_used, - WritebackContext *wb_context); static void WaitIO(BufferDesc *buf); static void AbortBufferIO(Buffer buffer); static void shared_buffer_write_error_callback(void *arg); @@ -532,6 +532,7 @@ static void CheckReadBuffersOperation(ReadBuffersOperation *operation, bool is_c static Buffer GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context); static void FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, IOContext io_context); + static void FindAndDropRelationBuffers(RelFileLocator rlocator, ForkNumber forkNum, BlockNumber nForkBlock, @@ -3321,6 +3322,57 @@ UnpinBufferNoOwner(BufferDesc *buf) } } +typedef struct BuffersToWrite +{ + int nbuffers; + BufferTag start_at_tag; + uint32 max_combine; + + XLogRecPtr max_lsn; + + PgAioHandle *ioh; + PgAioWaitRef iow; + + uint64 total_writes; + + Buffer buffers[IOV_MAX]; + PgAioBounceBuffer *bounce_buffers[IOV_MAX]; + const void *data_ptrs[IOV_MAX]; +} BuffersToWrite; + +static int PrepareToWriteBuffer(BuffersToWrite *to_write, Buffer buf, + bool skip_recently_used, + IOQueue *ioq, WritebackContext *wb_context); + +static void WriteBuffers(BuffersToWrite *to_write, + IOQueue *ioq, WritebackContext *wb_context); + +static void +BuffersToWriteInit(BuffersToWrite *to_write, + IOQueue *ioq, WritebackContext *wb_context) +{ + to_write->total_writes = 0; + to_write->nbuffers = 0; + to_write->ioh = NULL; + pgaio_wref_clear(&to_write->iow); + to_write->max_lsn = InvalidXLogRecPtr; + + pgaio_enter_batchmode(); +} + +static void +BuffersToWriteEnd(BuffersToWrite *to_write) +{ + if (to_write->ioh != NULL) + { + pgaio_io_release(to_write->ioh); + to_write->ioh = NULL; + } + + pgaio_exit_batchmode(); +} + + #define ST_SORT sort_checkpoint_bufferids #define ST_ELEMENT_TYPE CkptSortItem #define ST_COMPARE(a, b) ckpt_buforder_comparator(a, b) @@ -3352,7 +3404,10 @@ BufferSync(int flags) binaryheap *ts_heap; int i; int mask = BM_DIRTY; + IOQueue *ioq; WritebackContext wb_context; + BuffersToWrite to_write; + int max_combine; /* * Unless this is a shutdown checkpoint or we have been explicitly told, @@ -3414,7 +3469,9 @@ BufferSync(int flags) if (num_to_scan == 0) return; /* nothing to do */ + ioq = io_queue_create(512, 0); WritebackContextInit(&wb_context, &checkpoint_flush_after); + max_combine = Min(io_bounce_buffers, io_combine_limit); TRACE_POSTGRESQL_BUFFER_SYNC_START(NBuffers, num_to_scan); @@ -3522,48 +3579,91 @@ BufferSync(int flags) */ num_processed = 0; num_written = 0; + + BuffersToWriteInit(&to_write, ioq, &wb_context); + while (!binaryheap_empty(ts_heap)) { BufferDesc *bufHdr = NULL; CkptTsStatus *ts_stat = (CkptTsStatus *) DatumGetPointer(binaryheap_first(ts_heap)); + bool batch_continue = true; - buf_id = CkptBufferIds[ts_stat->index].buf_id; - Assert(buf_id != -1); - - bufHdr = GetBufferDescriptor(buf_id); - - num_processed++; + Assert(ts_stat->num_scanned <= ts_stat->num_to_scan); /* - * We don't need to acquire the lock here, because we're only looking - * at a single bit. It's possible that someone else writes the buffer - * and clears the flag right after we check, but that doesn't matter - * since SyncOneBuffer will then do nothing. However, there is a - * further race condition: it's conceivable that between the time we - * examine the bit here and the time SyncOneBuffer acquires the lock, - * someone else not only wrote the buffer but replaced it with another - * page and dirtied it. In that improbable case, SyncOneBuffer will - * write the buffer though we didn't need to. It doesn't seem worth - * guarding against this, though. + * Collect a batch of buffers to write out from the current + * tablespace. That causes some imbalance between the tablespaces, but + * that's more than outweighed by the efficiency gain due to batching. */ - if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED) + while (batch_continue && + to_write.nbuffers < max_combine && + ts_stat->num_scanned < ts_stat->num_to_scan) { - if (SyncOneBuffer(buf_id, false, &wb_context) & BUF_WRITTEN) + buf_id = CkptBufferIds[ts_stat->index].buf_id; + Assert(buf_id != -1); + + bufHdr = GetBufferDescriptor(buf_id); + + num_processed++; + + /* + * We don't need to acquire the lock here, because we're only + * looking at a single bit. It's possible that someone else writes + * the buffer and clears the flag right after we check, but that + * doesn't matter since SyncOneBuffer will then do nothing. + * However, there is a further race condition: it's conceivable + * that between the time we examine the bit here and the time + * SyncOneBuffer acquires the lock, someone else not only wrote + * the buffer but replaced it with another page and dirtied it. In + * that improbable case, SyncOneBuffer will write the buffer + * though we didn't need to. It doesn't seem worth guarding + * against this, though. + */ + if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED) + { + int result = PrepareToWriteBuffer(&to_write, buf_id + 1, false, + ioq, &wb_context); + + if (result & BUF_CANT_MERGE) + { + Assert(to_write.nbuffers > 0); + WriteBuffers(&to_write, ioq, &wb_context); + + result = PrepareToWriteBuffer(&to_write, buf_id + 1, false, + ioq, &wb_context); + Assert(result != BUF_CANT_MERGE); + } + + if (result & BUF_WRITTEN) + { + TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id); + PendingCheckpointerStats.buffers_written++; + num_written++; + } + else + { + batch_continue = false; + } + } + else { - TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id); - PendingCheckpointerStats.buffers_written++; - num_written++; + if (to_write.nbuffers > 0) + WriteBuffers(&to_write, ioq, &wb_context); } + + /* + * Measure progress independent of actually having to flush the + * buffer - otherwise writing become unbalanced. + */ + ts_stat->progress += ts_stat->progress_slice; + ts_stat->num_scanned++; + ts_stat->index++; } - /* - * Measure progress independent of actually having to flush the buffer - * - otherwise writing become unbalanced. - */ - ts_stat->progress += ts_stat->progress_slice; - ts_stat->num_scanned++; - ts_stat->index++; + if (to_write.nbuffers > 0) + WriteBuffers(&to_write, ioq, &wb_context); + /* Have all the buffers from the tablespace been processed? */ if (ts_stat->num_scanned == ts_stat->num_to_scan) @@ -3581,15 +3681,23 @@ BufferSync(int flags) * * (This will check for barrier events even if it doesn't sleep.) */ - CheckpointWriteDelay(flags, (double) num_processed / num_to_scan); + CheckpointWriteDelay(ioq, flags, (double) num_processed / num_to_scan); } + Assert(to_write.nbuffers == 0); + io_queue_wait_all(ioq); + /* * Issue all pending flushes. Only checkpointer calls BufferSync(), so * IOContext will always be IOCONTEXT_NORMAL. */ IssuePendingWritebacks(&wb_context, IOCONTEXT_NORMAL); + io_queue_wait_all(ioq); /* IssuePendingWritebacks might have added + * more */ + io_queue_free(ioq); + BuffersToWriteEnd(&to_write); + pfree(per_ts_stat); per_ts_stat = NULL; binaryheap_free(ts_heap); @@ -3615,7 +3723,7 @@ BufferSync(int flags) * bgwriter_lru_maxpages to 0.) */ bool -BgBufferSync(WritebackContext *wb_context) +BgBufferSync(IOQueue *ioq, WritebackContext *wb_context) { /* info obtained from freelist.c */ int strategy_buf_id; @@ -3658,6 +3766,9 @@ BgBufferSync(WritebackContext *wb_context) long new_strategy_delta; uint32 new_recent_alloc; + BuffersToWrite to_write; + int max_combine; + /* * Find out where the freelist clock sweep currently is, and how many * buffer allocations have happened since our last call. @@ -3678,6 +3789,8 @@ BgBufferSync(WritebackContext *wb_context) return true; } + max_combine = Min(io_bounce_buffers, io_combine_limit); + /* * Compute strategy_delta = how many buffers have been scanned by the * clock sweep since last time. If first time through, assume none. Then @@ -3834,11 +3947,25 @@ BgBufferSync(WritebackContext *wb_context) num_written = 0; reusable_buffers = reusable_buffers_est; + BuffersToWriteInit(&to_write, ioq, wb_context); + /* Execute the LRU scan */ while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est) { - int sync_state = SyncOneBuffer(next_to_clean, true, - wb_context); + int sync_state; + + sync_state = PrepareToWriteBuffer(&to_write, next_to_clean + 1, + true, ioq, wb_context); + if (sync_state & BUF_CANT_MERGE) + { + Assert(to_write.nbuffers > 0); + + WriteBuffers(&to_write, ioq, wb_context); + + sync_state = PrepareToWriteBuffer(&to_write, next_to_clean + 1, + true, ioq, wb_context); + Assert(sync_state != BUF_CANT_MERGE); + } if (++next_to_clean >= NBuffers) { @@ -3849,6 +3976,13 @@ BgBufferSync(WritebackContext *wb_context) if (sync_state & BUF_WRITTEN) { + Assert(sync_state & BUF_REUSABLE); + + if (to_write.nbuffers == max_combine) + { + WriteBuffers(&to_write, ioq, wb_context); + } + reusable_buffers++; if (++num_written >= bgwriter_lru_maxpages) { @@ -3860,6 +3994,11 @@ BgBufferSync(WritebackContext *wb_context) reusable_buffers++; } + if (to_write.nbuffers > 0) + WriteBuffers(&to_write, ioq, wb_context); + + BuffersToWriteEnd(&to_write); + PendingBgWriterStats.buf_written_clean += num_written; #ifdef BGW_DEBUG @@ -3898,8 +4037,66 @@ BgBufferSync(WritebackContext *wb_context) return (bufs_to_lap == 0 && recent_alloc == 0); } +static inline bool +BufferTagsSameRel(const BufferTag *tag1, const BufferTag *tag2) +{ + return (tag1->spcOid == tag2->spcOid) && + (tag1->dbOid == tag2->dbOid) && + (tag1->relNumber == tag2->relNumber) && + (tag1->forkNum == tag2->forkNum) + ; +} + +static bool +CanMergeWrite(BuffersToWrite *to_write, BufferDesc *cur_buf_hdr) +{ + BlockNumber cur_block = cur_buf_hdr->tag.blockNum; + + Assert(to_write->nbuffers > 0); /* can't merge with nothing */ + Assert(to_write->start_at_tag.relNumber != InvalidOid); + Assert(to_write->start_at_tag.blockNum != InvalidBlockNumber); + + Assert(to_write->ioh != NULL); + + /* + * First check if the blocknumber is one that we could actually merge, + * that's cheaper than checking the tablespace/db/relnumber/fork match. + */ + if (to_write->start_at_tag.blockNum + to_write->nbuffers != cur_block) + return false; + + if (!BufferTagsSameRel(&to_write->start_at_tag, &cur_buf_hdr->tag)) + return false; + + /* + * Need to check with smgr how large a write we're allowed to make. To + * reduce the overhead of the smgr check, only inquire once, when + * processing the first to-be-merged buffer. That avoids the overhead in + * the common case of writing out buffers that definitely not mergeable. + */ + if (to_write->nbuffers == 1) + { + SMgrRelation smgr; + + smgr = smgropen(BufTagGetRelFileLocator(&to_write->start_at_tag), INVALID_PROC_NUMBER); + + to_write->max_combine = smgrmaxcombine(smgr, + to_write->start_at_tag.forkNum, + to_write->start_at_tag.blockNum); + } + else + { + Assert(to_write->max_combine > 0); + } + + if (to_write->start_at_tag.blockNum + to_write->max_combine <= cur_block) + return false; + + return true; +} + /* - * SyncOneBuffer -- process a single buffer during syncing. + * PrepareToWriteBuffer -- process a single buffer during syncing. * * If skip_recently_used is true, we don't write currently-pinned buffers, nor * buffers marked recently used, as these are not replacement candidates. @@ -3908,22 +4105,50 @@ BgBufferSync(WritebackContext *wb_context) * BUF_WRITTEN: we wrote the buffer. * BUF_REUSABLE: buffer is available for replacement, ie, it has * pin count 0 and usage count 0. + * BUF_CANT_MERGE: can't combine this write with prior writes, caller needs + * to issue those first * * (BUF_WRITTEN could be set in error if FlushBuffer finds the buffer clean * after locking it, but we don't care all that much.) */ static int -SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context) +PrepareToWriteBuffer(BuffersToWrite *to_write, Buffer buf, + bool skip_recently_used, + IOQueue *ioq, WritebackContext *wb_context) { - BufferDesc *bufHdr = GetBufferDescriptor(buf_id); - int result = 0; + BufferDesc *cur_buf_hdr = GetBufferDescriptor(buf - 1); uint32 buf_state; - BufferTag tag; + int result = 0; + XLogRecPtr cur_buf_lsn; + LWLock *content_lock; + bool may_block; + + /* + * Check if this buffer can be written out together with already prepared + * writes. We check before we have pinned the buffer, so the buffer can be + * written out and replaced between this check and us pinning the buffer - + * we'll recheck below. The reason for the pre-check is that we don't want + * to pin the buffer just to find out that we can't merge the IO. + */ + if (to_write->nbuffers != 0) + { + if (!CanMergeWrite(to_write, cur_buf_hdr)) + { + result |= BUF_CANT_MERGE; + return result; + } + } + else + { + to_write->start_at_tag = cur_buf_hdr->tag; + } /* Make sure we can handle the pin */ ReservePrivateRefCountEntry(); ResourceOwnerEnlarge(CurrentResourceOwner); + /* XXX: Should also check if we are allowed to pin one more buffer */ + /* * Check whether buffer needs writing. * @@ -3933,7 +4158,7 @@ SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context) * don't worry because our checkpoint.redo points before log record for * upcoming changes and so we are not required to write such dirty buffer. */ - buf_state = LockBufHdr(bufHdr); + buf_state = LockBufHdr(cur_buf_hdr); if (BUF_STATE_GET_REFCOUNT(buf_state) == 0 && BUF_STATE_GET_USAGECOUNT(buf_state) == 0) @@ -3942,40 +4167,300 @@ SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context) } else if (skip_recently_used) { +#if 0 + elog(LOG, "at block %d: skip recent with nbuffers %d", + cur_buf_hdr->tag.blockNum, to_write->nbuffers); +#endif /* Caller told us not to write recently-used buffers */ - UnlockBufHdr(bufHdr, buf_state); + UnlockBufHdr(cur_buf_hdr, buf_state); return result; } if (!(buf_state & BM_VALID) || !(buf_state & BM_DIRTY)) { /* It's clean, so nothing to do */ - UnlockBufHdr(bufHdr, buf_state); + UnlockBufHdr(cur_buf_hdr, buf_state); return result; } + /* pin the buffer, from now on its identity can't change anymore */ + PinBuffer_Locked(cur_buf_hdr); + /* - * Pin it, share-lock it, write it. (FlushBuffer will do nothing if the - * buffer is clean by the time we've locked it.) + * Acquire IO, if needed, now that it's likely that we'll need to write. */ - PinBuffer_Locked(bufHdr); - LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED); + if (to_write->ioh == NULL) + { + /* otherwise we should already have acquired a handle */ + Assert(to_write->nbuffers == 0); - FlushBuffer(bufHdr, NULL, IOOBJECT_RELATION, IOCONTEXT_NORMAL); + to_write->ioh = io_queue_acquire_io(ioq); + pgaio_io_get_wref(to_write->ioh, &to_write->iow); + } - LWLockRelease(BufferDescriptorGetContentLock(bufHdr)); + /* + * If we are merging, check if the buffer's identity possibly changed + * while we hadn't yet pinned it. + * + * XXX: It might be worth checking if we still want to write the buffer + * out, e.g. it could have been replaced with a buffer that doesn't have + * BM_CHECKPOINT_NEEDED set. + */ + if (to_write->nbuffers != 0) + { + if (!CanMergeWrite(to_write, cur_buf_hdr)) + { + elog(LOG, "changed identity"); + UnpinBuffer(cur_buf_hdr); + + result |= BUF_CANT_MERGE; + + return result; + } + } + + may_block = to_write->nbuffers == 0 + && !pgaio_have_staged() + && io_queue_is_empty(ioq) + ; + content_lock = BufferDescriptorGetContentLock(cur_buf_hdr); + + if (!may_block) + { + if (LWLockConditionalAcquire(content_lock, LW_SHARED)) + { + /* done */ + } + else if (to_write->nbuffers == 0) + { + /* + * Need to wait for all prior IO to finish before blocking for + * lock acquisition, to avoid the risk a deadlock due to us + * waiting for another backend that is waiting for our unsubmitted + * IO to complete. + */ + pgaio_submit_staged(); + io_queue_wait_all(ioq); - tag = bufHdr->tag; + elog(DEBUG2, "at block %u: can't block, nbuffers = 0", + cur_buf_hdr->tag.blockNum + ); - UnpinBuffer(bufHdr); + may_block = to_write->nbuffers == 0 + && !pgaio_have_staged() + && io_queue_is_empty(ioq) + ; + Assert(may_block); + + LWLockAcquire(content_lock, LW_SHARED); + } + else + { + elog(DEBUG2, "at block %d: can't block nbuffers = %d", + cur_buf_hdr->tag.blockNum, + to_write->nbuffers); + + UnpinBuffer(cur_buf_hdr); + result |= BUF_CANT_MERGE; + Assert(to_write->nbuffers > 0); + + return result; + } + } + else + { + LWLockAcquire(content_lock, LW_SHARED); + } + + if (!may_block) + { + if (!StartBufferIO(cur_buf_hdr, false, !may_block)) + { + pgaio_submit_staged(); + io_queue_wait_all(ioq); + + may_block = io_queue_is_empty(ioq) && to_write->nbuffers == 0 && !pgaio_have_staged(); + + if (!StartBufferIO(cur_buf_hdr, false, !may_block)) + { + elog(DEBUG2, "at block %d: non-waitable StartBufferIO returns false, %d", + cur_buf_hdr->tag.blockNum, + may_block); + + /* + * FIXME: can't tell whether this is because the buffer has + * been cleaned + */ + if (!may_block) + { + result |= BUF_CANT_MERGE; + Assert(to_write->nbuffers > 0); + } + LWLockRelease(content_lock); + UnpinBuffer(cur_buf_hdr); + + return result; + } + } + } + else + { + if (!StartBufferIO(cur_buf_hdr, false, false)) + { + elog(DEBUG2, "waitable StartBufferIO returns false"); + LWLockRelease(content_lock); + UnpinBuffer(cur_buf_hdr); + + /* + * FIXME: Historically we returned BUF_WRITTEN in this case, which + * seems wrong + */ + return result; + } + } /* - * SyncOneBuffer() is only called by checkpointer and bgwriter, so - * IOContext will always be IOCONTEXT_NORMAL. + * Run PageGetLSN while holding header lock, since we don't have the + * buffer locked exclusively in all cases. + */ + buf_state = LockBufHdr(cur_buf_hdr); + + cur_buf_lsn = BufferGetLSN(cur_buf_hdr); + + /* To check if block content changes while flushing. - vadim 01/17/97 */ + buf_state &= ~BM_JUST_DIRTIED; + + UnlockBufHdr(cur_buf_hdr, buf_state); + + to_write->buffers[to_write->nbuffers] = buf; + to_write->nbuffers++; + + if (buf_state & BM_PERMANENT && + (to_write->max_lsn == InvalidXLogRecPtr || to_write->max_lsn < cur_buf_lsn)) + { + to_write->max_lsn = cur_buf_lsn; + } + + result |= BUF_WRITTEN; + + return result; +} + +static void +WriteBuffers(BuffersToWrite *to_write, + IOQueue *ioq, WritebackContext *wb_context) +{ + SMgrRelation smgr; + Buffer first_buf; + BufferDesc *first_buf_hdr; + bool needs_checksum; + + Assert(to_write->nbuffers > 0 && to_write->nbuffers <= io_combine_limit); + + first_buf = to_write->buffers[0]; + first_buf_hdr = GetBufferDescriptor(first_buf - 1); + + smgr = smgropen(BufTagGetRelFileLocator(&first_buf_hdr->tag), INVALID_PROC_NUMBER); + + /* + * Force XLOG flush up to buffer's LSN. This implements the basic WAL + * rule that log updates must hit disk before any of the data-file changes + * they describe do. + * + * However, this rule does not apply to unlogged relations, which will be + * lost after a crash anyway. Most unlogged relation pages do not bear + * LSNs since we never emit WAL records for them, and therefore flushing + * up through the buffer LSN would be useless, but harmless. However, + * GiST indexes use LSNs internally to track page-splits, and therefore + * unlogged GiST pages bear "fake" LSNs generated by + * GetFakeLSNForUnloggedRel. It is unlikely but possible that the fake + * LSN counter could advance past the WAL insertion point; and if it did + * happen, attempting to flush WAL through that location would fail, with + * disastrous system-wide consequences. To make sure that can't happen, + * skip the flush if the buffer isn't permanent. */ - ScheduleBufferTagForWriteback(wb_context, IOCONTEXT_NORMAL, &tag); + if (to_write->max_lsn != InvalidXLogRecPtr) + XLogFlush(to_write->max_lsn); - return result | BUF_WRITTEN; + /* + * Now it's safe to write the buffer to disk. Note that no one else should + * have been able to write it, while we were busy with log flushing, + * because we got the exclusive right to perform I/O by setting the + * BM_IO_IN_PROGRESS bit. + */ + + for (int nbuf = 0; nbuf < to_write->nbuffers; nbuf++) + { + Buffer cur_buf = to_write->buffers[nbuf]; + BufferDesc *cur_buf_hdr = GetBufferDescriptor(cur_buf - 1); + Block bufBlock; + char *bufToWrite; + + bufBlock = BufHdrGetBlock(cur_buf_hdr); + needs_checksum = PageNeedsChecksumCopy((Page) bufBlock); + + /* + * Update page checksum if desired. Since we have only shared lock on + * the buffer, other processes might be updating hint bits in it, so + * we must copy the page to a bounce buffer if we do checksumming. + */ + if (needs_checksum) + { + PgAioBounceBuffer *bb = pgaio_bounce_buffer_get(); + + pgaio_io_assoc_bounce_buffer(to_write->ioh, bb); + + bufToWrite = pgaio_bounce_buffer_buffer(bb); + memcpy(bufToWrite, bufBlock, BLCKSZ); + PageSetChecksumInplace((Page) bufToWrite, cur_buf_hdr->tag.blockNum); + } + else + { + bufToWrite = bufBlock; + } + + to_write->data_ptrs[nbuf] = bufToWrite; + } + + pgaio_io_set_handle_data_32(to_write->ioh, + (uint32 *) to_write->buffers, + to_write->nbuffers); + pgaio_io_register_callbacks(to_write->ioh, PGAIO_HCB_SHARED_BUFFER_WRITEV, 0); + + smgrstartwritev(to_write->ioh, smgr, + BufTagGetForkNum(&first_buf_hdr->tag), + first_buf_hdr->tag.blockNum, + to_write->data_ptrs, + to_write->nbuffers, + false); + pgstat_count_io_op(IOOBJECT_RELATION, IOCONTEXT_NORMAL, + IOOP_WRITE, 1, BLCKSZ * to_write->nbuffers); + + + for (int nbuf = 0; nbuf < to_write->nbuffers; nbuf++) + { + Buffer cur_buf = to_write->buffers[nbuf]; + BufferDesc *cur_buf_hdr = GetBufferDescriptor(cur_buf - 1); + + UnpinBuffer(cur_buf_hdr); + } + + io_queue_track(ioq, &to_write->iow); + to_write->total_writes++; + + /* clear state for next write */ + to_write->nbuffers = 0; + to_write->start_at_tag.relNumber = InvalidOid; + to_write->start_at_tag.blockNum = InvalidBlockNumber; + to_write->max_combine = 0; + to_write->max_lsn = InvalidXLogRecPtr; + to_write->ioh = NULL; + pgaio_wref_clear(&to_write->iow); + + /* + * FIXME: Implement issuing writebacks (note wb_context isn't used here). + * Possibly needs to be integrated with io_queue.c. + */ } /* @@ -4349,6 +4834,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, error_context_stack = errcallback.previous; } + /* * RelationGetNumberOfBlocksInFork * Determines the current number of pages in the specified relation fork. diff --git a/src/backend/storage/page/bufpage.c b/src/backend/storage/page/bufpage.c index 82457bacc62fa..d4bbd2cdf00e9 100644 --- a/src/backend/storage/page/bufpage.c +++ b/src/backend/storage/page/bufpage.c @@ -1490,6 +1490,16 @@ PageIndexTupleOverwrite(Page page, OffsetNumber offnum, return true; } +bool +PageNeedsChecksumCopy(Page page) +{ + if (PageIsNew(page)) + return false; + + /* If we don't need a checksum, just return the passed-in data */ + return DataChecksumsEnabled(); +} + /* * Set checksum for a page in shared buffers. diff --git a/src/include/postmaster/bgwriter.h b/src/include/postmaster/bgwriter.h index 800ecbfd13b31..a8081d411b68d 100644 --- a/src/include/postmaster/bgwriter.h +++ b/src/include/postmaster/bgwriter.h @@ -31,7 +31,8 @@ pg_noreturn extern void BackgroundWriterMain(const void *startup_data, size_t st pg_noreturn extern void CheckpointerMain(const void *startup_data, size_t startup_data_len); extern void RequestCheckpoint(int flags); -extern void CheckpointWriteDelay(int flags, double progress); +struct IOQueue; +extern void CheckpointWriteDelay(struct IOQueue *ioq, int flags, double progress); extern bool ForwardSyncRequest(const FileTag *ftag, SyncRequestType type); diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index 0dec7d93b3b27..45c2b70b73602 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -21,6 +21,8 @@ #include "storage/buf.h" #include "storage/bufmgr.h" #include "storage/condition_variable.h" +#include "storage/io_queue.h" +#include "storage/latch.h" #include "storage/lwlock.h" #include "storage/procnumber.h" #include "storage/shmem.h" diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index 492feab0cb561..89e0ca11288b4 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -297,7 +297,8 @@ extern bool ConditionalLockBufferForCleanup(Buffer buffer); extern bool IsBufferCleanupOK(Buffer buffer); extern bool HoldingBufferPinThatDelaysRecovery(void); -extern bool BgBufferSync(struct WritebackContext *wb_context); +struct IOQueue; +extern bool BgBufferSync(struct IOQueue *ioq, struct WritebackContext *wb_context); extern uint32 GetPinLimit(void); extern uint32 GetLocalPinLimit(void); diff --git a/src/include/storage/bufpage.h b/src/include/storage/bufpage.h index aeb67c498c59f..000a3ab23f905 100644 --- a/src/include/storage/bufpage.h +++ b/src/include/storage/bufpage.h @@ -507,5 +507,6 @@ extern bool PageIndexTupleOverwrite(Page page, OffsetNumber offnum, Item newtup, Size newsize); extern char *PageSetChecksumCopy(Page page, BlockNumber blkno); extern void PageSetChecksumInplace(Page page, BlockNumber blkno); +extern bool PageNeedsChecksumCopy(Page page); #endif /* BUFPAGE_H */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 0c6ddadc51daa..e89c501fa8aff 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -349,6 +349,7 @@ BufferManagerRelation BufferStrategyControl BufferTag BufferUsage +BuffersToWrite BuildAccumulator BuiltinScript BulkInsertState From 1ad522c732af95ee7aef800097743a051dd23f1d Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Tue, 18 Mar 2025 14:40:06 -0400 Subject: [PATCH 08/12] Ensure a resowner exists for all paths that may perform AIO Reviewed-by: Noah Misch Discussion: https://postgr.es/m/1f6b50a7-38ef-4d87-8246-786d39f46ab9@iki.fi --- src/backend/bootstrap/bootstrap.c | 7 +++++++ src/backend/replication/logical/logical.c | 6 ++++++ src/backend/utils/init/postinit.c | 6 +++++- 3 files changed, 18 insertions(+), 1 deletion(-) diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c index 6db864892d0dd..e554504e1f0db 100644 --- a/src/backend/bootstrap/bootstrap.c +++ b/src/backend/bootstrap/bootstrap.c @@ -361,8 +361,15 @@ BootstrapModeMain(int argc, char *argv[], bool check_only) BaseInit(); bootstrap_signals(); + + /* need a resowner for IO during BootStrapXLOG() */ + CreateAuxProcessResourceOwner(); + BootStrapXLOG(bootstrap_data_checksum_version); + ReleaseAuxProcessResources(true); + CurrentResourceOwner = NULL; + /* * To ensure that src/common/link-canary.c is linked into the backend, we * must call it from somewhere. Here is as good as anywhere. diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index a8d2e024d3444..72c32f5c32e8c 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -386,6 +386,12 @@ CreateInitDecodingContext(const char *plugin, slot->data.plugin = plugin_name; SpinLockRelease(&slot->mutex); + if (CurrentResourceOwner == NULL) + { + Assert(am_walsender); + CurrentResourceOwner = AuxProcessResourceOwner; + } + if (XLogRecPtrIsInvalid(restart_lsn)) ReplicationSlotReserveWal(); else diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c index 7958ea11b7354..222e24bcb08bf 100644 --- a/src/backend/utils/init/postinit.c +++ b/src/backend/utils/init/postinit.c @@ -785,8 +785,12 @@ InitPostgres(const char *in_dbname, Oid dboid, * We don't yet have an aux-process resource owner, but StartupXLOG * and ShutdownXLOG will need one. Hence, create said resource owner * (and register a callback to clean it up after ShutdownXLOG runs). + * + * In bootstrap mode CreateAuxProcessResourceOwner() was already + * called in BootstrapModeMain(). */ - CreateAuxProcessResourceOwner(); + if (!bootstrap) + CreateAuxProcessResourceOwner(); StartupXLOG(); /* Release (and warn about) any buffer pins leaked in StartupXLOG */ From 2ed454d5d570828f18892702902998d05df4d59b Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Tue, 18 Mar 2025 14:40:06 -0400 Subject: [PATCH 09/12] Temporary: Increase BAS_BULKREAD size Without this we only can execute very little AIO for sequential scans, as there's just not enough buffers in the ring. This isn't the right fix, as just increasing the ring size can have negative performance implications in workloads where the kernel has all the data cached. Author: Reviewed-By: Discussion: https://postgr.es/m/ Backpatch: --- src/backend/storage/buffer/freelist.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/backend/storage/buffer/freelist.c b/src/backend/storage/buffer/freelist.c index 336715b6c6342..b72a5957a206a 100644 --- a/src/backend/storage/buffer/freelist.c +++ b/src/backend/storage/buffer/freelist.c @@ -555,7 +555,12 @@ GetAccessStrategy(BufferAccessStrategyType btype) return NULL; case BAS_BULKREAD: - ring_size_kb = 256; + + /* + * FIXME: Temporary increase to allow large enough streaming reads + * to actually benefit from AIO. This needs a better solution. + */ + ring_size_kb = 2 * 1024; break; case BAS_BULKWRITE: ring_size_kb = 16 * 1024; From f5a9dba80db57a52e2fe7921c856ddeba83f738a Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Tue, 18 Mar 2025 14:40:06 -0400 Subject: [PATCH 10/12] WIP: Use MAP_POPULATE For benchmarking it's quite annoying that the first time a memory is touched has completely different perf characteristics than subsequent accesses. Using MAP_POPULATE reduces that substantially. --- src/backend/port/sysv_shmem.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/backend/port/sysv_shmem.c b/src/backend/port/sysv_shmem.c index 197926d44f6bc..a700b02d5a186 100644 --- a/src/backend/port/sysv_shmem.c +++ b/src/backend/port/sysv_shmem.c @@ -620,7 +620,7 @@ CreateAnonymousSegment(Size *size) allocsize += hugepagesize - (allocsize % hugepagesize); ptr = mmap(NULL, allocsize, PROT_READ | PROT_WRITE, - PG_MMAP_FLAGS | mmap_flags, -1, 0); + PG_MMAP_FLAGS | MAP_POPULATE | mmap_flags, -1, 0); mmap_errno = errno; if (huge_pages == HUGE_PAGES_TRY && ptr == MAP_FAILED) elog(DEBUG1, "mmap(%zu) with MAP_HUGETLB failed, huge pages disabled: %m", From d82472603a24939d9191b8d393b7ea123730803f Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Tue, 18 Mar 2025 14:40:06 -0400 Subject: [PATCH 11/12] StartReadBuffers debug stuff --- src/backend/storage/buffer/bufmgr.c | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index d9a362d755393..23de216f35849 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -1777,6 +1777,18 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) IOObject io_object; bool did_start_io; +#if 0 + ereport(DEBUG3, + errmsg("%s: op->blocks: %d, op->blocks_done: %d, *nblocks_progress: %d, first buf %d", + __func__, + operation->nblocks, + nblocks_done, + *nblocks_progress, + buffers[0]), + errhidestmt(true), + errhidecontext(true)); +#endif + /* * When this IO is executed synchronously, either because the caller will * immediately block waiting for the IO or because IOMETHOD_SYNC is used, @@ -1870,6 +1882,13 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) operation->nblocks_done += 1; *nblocks_progress = 1; + ereport(DEBUG3, + errmsg("%s - trunc: %d", + __func__, + operation->nblocks_done), + errhidestmt(true), + errhidecontext(true)); + pgaio_io_release(ioh); pgaio_wref_clear(&operation->io_wref); did_start_io = false; @@ -1916,6 +1935,12 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) */ for (int i = nblocks_done + 1; i < operation->nblocks; i++) { +#if 0 + /* FIXME: Remove forced short read */ + if (i > 3) + break; +#endif + if (!ReadBuffersCanStartIO(buffers[i], true)) break; /* Must be consecutive block numbers. */ From 880fb4e41105245fd5db8c8bb00f9e3ca79efbb0 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Fri, 21 Mar 2025 17:54:13 -0400 Subject: [PATCH 12/12] local: iwyu-ify --- src/backend/storage/aio/aio.c | 5 ++--- src/backend/storage/aio/aio_callback.c | 1 - src/backend/storage/aio/aio_init.c | 3 +-- src/backend/storage/aio/aio_io.c | 2 -- src/backend/storage/aio/aio_target.c | 1 - src/backend/storage/aio/method_io_uring.c | 3 ++- src/backend/storage/aio/method_worker.c | 6 +++++- src/include/libpq/pqsignal.h | 2 +- src/include/storage/aio.h | 4 ++-- src/include/storage/aio_internal.h | 5 +++++ src/include/utils/elog.h | 4 ++-- 11 files changed, 20 insertions(+), 16 deletions(-) diff --git a/src/backend/storage/aio/aio.c b/src/backend/storage/aio/aio.c index cff48964d0760..43f1e2a77855c 100644 --- a/src/backend/storage/aio/aio.c +++ b/src/backend/storage/aio/aio.c @@ -38,16 +38,15 @@ #include "postgres.h" -#include "lib/ilist.h" #include "miscadmin.h" #include "port/atomics.h" -#include "storage/aio.h" #include "storage/aio_internal.h" #include "storage/aio_subsys.h" +#include "storage/procnumber.h" #include "utils/guc.h" #include "utils/guc_hooks.h" #include "utils/resowner.h" -#include "utils/wait_event_types.h" +#include "utils/wait_event.h" #ifdef USE_INJECTION_POINTS #include "utils/injection_point.h" diff --git a/src/backend/storage/aio/aio_callback.c b/src/backend/storage/aio/aio_callback.c index 21665e7eccb31..4e0f2224c175d 100644 --- a/src/backend/storage/aio/aio_callback.c +++ b/src/backend/storage/aio/aio_callback.c @@ -16,7 +16,6 @@ #include "postgres.h" #include "miscadmin.h" -#include "storage/aio.h" #include "storage/aio_internal.h" #include "storage/bufmgr.h" #include "storage/md.h" diff --git a/src/backend/storage/aio/aio_init.c b/src/backend/storage/aio/aio_init.c index 95b10933fedbf..d0e982322b80e 100644 --- a/src/backend/storage/aio/aio_init.c +++ b/src/backend/storage/aio/aio_init.c @@ -15,13 +15,12 @@ #include "postgres.h" #include "miscadmin.h" -#include "storage/aio.h" #include "storage/aio_internal.h" #include "storage/aio_subsys.h" #include "storage/bufmgr.h" -#include "storage/io_worker.h" #include "storage/ipc.h" #include "storage/proc.h" +#include "storage/procnumber.h" #include "storage/shmem.h" #include "utils/guc.h" diff --git a/src/backend/storage/aio/aio_io.c b/src/backend/storage/aio/aio_io.c index 4d31392ddc704..09fa56063988a 100644 --- a/src/backend/storage/aio/aio_io.c +++ b/src/backend/storage/aio/aio_io.c @@ -19,9 +19,7 @@ #include "postgres.h" #include "miscadmin.h" -#include "storage/aio.h" #include "storage/aio_internal.h" -#include "storage/fd.h" #include "utils/wait_event.h" diff --git a/src/backend/storage/aio/aio_target.c b/src/backend/storage/aio/aio_target.c index ac6c74f4ff264..2213bdadaa07c 100644 --- a/src/backend/storage/aio/aio_target.c +++ b/src/backend/storage/aio/aio_target.c @@ -14,7 +14,6 @@ #include "postgres.h" -#include "storage/aio.h" #include "storage/aio_internal.h" #include "storage/smgr.h" diff --git a/src/backend/storage/aio/method_io_uring.c b/src/backend/storage/aio/method_io_uring.c index c719ba2727a81..244918e1883df 100644 --- a/src/backend/storage/aio/method_io_uring.c +++ b/src/backend/storage/aio/method_io_uring.c @@ -25,11 +25,12 @@ #include "postgres.h" /* included early, for IOMETHOD_IO_URING_ENABLED */ -#include "storage/aio.h" +#include "storage/aio.h" /* IWYU pragma: keep */ #ifdef IOMETHOD_IO_URING_ENABLED #include +/* IWYU pragma: no_include */ #include "miscadmin.h" #include "storage/aio_internal.h" diff --git a/src/backend/storage/aio/method_worker.c b/src/backend/storage/aio/method_worker.c index 31d94ac82c540..52f901ed4c2c8 100644 --- a/src/backend/storage/aio/method_worker.c +++ b/src/backend/storage/aio/method_worker.c @@ -34,13 +34,17 @@ #include "port/pg_bitutils.h" #include "postmaster/auxprocess.h" #include "postmaster/interrupt.h" -#include "storage/aio.h" #include "storage/aio_internal.h" #include "storage/aio_subsys.h" #include "storage/io_worker.h" #include "storage/ipc.h" #include "storage/latch.h" +#include "storage/lwlock.h" +#include "storage/lwlocknames.h" #include "storage/proc.h" +#include "storage/procsignal.h" +#include "storage/shmem.h" +#include "storage/waiteventset.h" #include "tcop/tcopprot.h" #include "utils/ps_status.h" #include "utils/wait_event.h" diff --git a/src/include/libpq/pqsignal.h b/src/include/libpq/pqsignal.h index 5be7870156ee0..89276242fb4f5 100644 --- a/src/include/libpq/pqsignal.h +++ b/src/include/libpq/pqsignal.h @@ -13,7 +13,7 @@ #ifndef PQSIGNAL_H #define PQSIGNAL_H -#include +#include /* IWYU pragma: export */ #ifdef WIN32 /* Emulate POSIX sigset_t APIs on Windows */ diff --git a/src/include/storage/aio.h b/src/include/storage/aio.h index 72d5680e767e9..aa242676ccfdd 100644 --- a/src/include/storage/aio.h +++ b/src/include/storage/aio.h @@ -18,7 +18,7 @@ #ifndef AIO_H #define AIO_H -#include "storage/aio_types.h" +#include "storage/aio_types.h" /* IWYU pragma: export */ #include "storage/procnumber.h" @@ -293,7 +293,7 @@ extern ProcNumber pgaio_io_get_owner(PgAioHandle *ioh); extern void pgaio_io_get_wref(PgAioHandle *ioh, PgAioWaitRef *iow); /* functions in aio_io.c */ -struct iovec; +struct iovec; /* IWYU pragma: export */ extern int pgaio_io_get_iovec(PgAioHandle *ioh, struct iovec **iov); extern PgAioOp pgaio_io_get_op(PgAioHandle *ioh); diff --git a/src/include/storage/aio_internal.h b/src/include/storage/aio_internal.h index 833f97361a1ff..8432ebbc67942 100644 --- a/src/include/storage/aio_internal.h +++ b/src/include/storage/aio_internal.h @@ -16,10 +16,12 @@ #define AIO_INTERNAL_H +/* IWYU pragma: begin_exports */ #include "lib/ilist.h" #include "port/pg_iovec.h" #include "storage/aio.h" #include "storage/condition_variable.h" +/* IWYU pragma: end_exports */ /* @@ -90,8 +92,11 @@ typedef enum PgAioHandleState } PgAioHandleState; +/* IWYU pragma: begin_exports */ struct ResourceOwnerData; +/* IWYU pragma: end_exports */ + /* typedef is in aio_types.h */ struct PgAioHandle { diff --git a/src/include/utils/elog.h b/src/include/utils/elog.h index a5313c5d2d5ae..78cdd90c71807 100644 --- a/src/include/utils/elog.h +++ b/src/include/utils/elog.h @@ -14,7 +14,7 @@ #ifndef ELOG_H #define ELOG_H -#include +#include /* IWYU pragma: export */ #include "lib/stringinfo.h" @@ -76,7 +76,7 @@ struct Node; #define ERRCODE_IS_CATEGORY(ec) (((ec) & ~((1 << 12) - 1)) == 0) /* SQLSTATE codes for errors are defined in a separate file */ -#include "utils/errcodes.h" +#include "utils/errcodes.h" /* IWYU pragma: export */ /* * Provide a way to prevent "errno" from being accidentally used inside an