diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c index 6db864892d0dd..e554504e1f0db 100644 --- a/src/backend/bootstrap/bootstrap.c +++ b/src/backend/bootstrap/bootstrap.c @@ -361,8 +361,15 @@ BootstrapModeMain(int argc, char *argv[], bool check_only) BaseInit(); bootstrap_signals(); + + /* need a resowner for IO during BootStrapXLOG() */ + CreateAuxProcessResourceOwner(); + BootStrapXLOG(bootstrap_data_checksum_version); + ReleaseAuxProcessResources(true); + CurrentResourceOwner = NULL; + /* * To ensure that src/common/link-canary.c is linked into the backend, we * must call it from somewhere. Here is as good as anywhere. diff --git a/src/backend/port/sysv_shmem.c b/src/backend/port/sysv_shmem.c index 197926d44f6bc..a700b02d5a186 100644 --- a/src/backend/port/sysv_shmem.c +++ b/src/backend/port/sysv_shmem.c @@ -620,7 +620,7 @@ CreateAnonymousSegment(Size *size) allocsize += hugepagesize - (allocsize % hugepagesize); ptr = mmap(NULL, allocsize, PROT_READ | PROT_WRITE, - PG_MMAP_FLAGS | mmap_flags, -1, 0); + PG_MMAP_FLAGS | MAP_POPULATE | mmap_flags, -1, 0); mmap_errno = errno; if (huge_pages == HUGE_PAGES_TRY && ptr == MAP_FAILED) elog(DEBUG1, "mmap(%zu) with MAP_HUGETLB failed, huge pages disabled: %m", diff --git a/src/backend/postmaster/bgwriter.c b/src/backend/postmaster/bgwriter.c index 72f5acceec78d..6e8801a39e329 100644 --- a/src/backend/postmaster/bgwriter.c +++ b/src/backend/postmaster/bgwriter.c @@ -38,11 +38,13 @@ #include "postmaster/auxprocess.h" #include "postmaster/bgwriter.h" #include "postmaster/interrupt.h" +#include "storage/aio.h" #include "storage/aio_subsys.h" #include "storage/buf_internals.h" #include "storage/bufmgr.h" #include "storage/condition_variable.h" #include "storage/fd.h" +#include "storage/io_queue.h" #include "storage/lwlock.h" #include "storage/proc.h" #include "storage/procsignal.h" @@ -90,6 +92,7 @@ BackgroundWriterMain(const void *startup_data, size_t startup_data_len) sigjmp_buf local_sigjmp_buf; MemoryContext bgwriter_context; bool prev_hibernate; + IOQueue *ioq; WritebackContext wb_context; Assert(startup_data_len == 0); @@ -131,6 +134,7 @@ BackgroundWriterMain(const void *startup_data, size_t startup_data_len) ALLOCSET_DEFAULT_SIZES); MemoryContextSwitchTo(bgwriter_context); + ioq = io_queue_create(128, 0); WritebackContextInit(&wb_context, &bgwriter_flush_after); /* @@ -228,12 +232,22 @@ BackgroundWriterMain(const void *startup_data, size_t startup_data_len) /* Clear any already-pending wakeups */ ResetLatch(MyLatch); + /* + * FIXME: this is theoretically racy, but I didn't want to copy + * ProcessMainLoopInterrupts() remaining body here. + */ + if (ShutdownRequestPending) + { + io_queue_wait_all(ioq); + io_queue_free(ioq); + } + ProcessMainLoopInterrupts(); /* * Do one cycle of dirty-buffer writing. */ - can_hibernate = BgBufferSync(&wb_context); + can_hibernate = BgBufferSync(ioq, &wb_context); /* Report pending statistics to the cumulative stats system */ pgstat_report_bgwriter(); @@ -250,6 +264,9 @@ BackgroundWriterMain(const void *startup_data, size_t startup_data_len) smgrdestroyall(); } + /* finish IO before sleeping, to avoid blocking other backends */ + io_queue_wait_all(ioq); + /* * Log a new xl_running_xacts every now and then so replication can * get into a consistent state faster (think of suboverflowed diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c index fda91ffd1ce2d..904fe167eb4d0 100644 --- a/src/backend/postmaster/checkpointer.c +++ b/src/backend/postmaster/checkpointer.c @@ -49,10 +49,12 @@ #include "postmaster/bgwriter.h" #include "postmaster/interrupt.h" #include "replication/syncrep.h" +#include "storage/aio.h" #include "storage/aio_subsys.h" #include "storage/bufmgr.h" #include "storage/condition_variable.h" #include "storage/fd.h" +#include "storage/io_queue.h" #include "storage/ipc.h" #include "storage/lwlock.h" #include "storage/pmsignal.h" @@ -766,7 +768,7 @@ ImmediateCheckpointRequested(void) * fraction between 0.0 meaning none, and 1.0 meaning all done. */ void -CheckpointWriteDelay(int flags, double progress) +CheckpointWriteDelay(IOQueue *ioq, int flags, double progress) { static int absorb_counter = WRITES_PER_ABSORB; @@ -800,6 +802,13 @@ CheckpointWriteDelay(int flags, double progress) /* Report interim statistics to the cumulative stats system */ pgstat_report_checkpointer(); + /* + * Ensure all pending IO is submitted to avoid unnecessary delays for + * other processes. + */ + io_queue_wait_all(ioq); + + /* * This sleep used to be connected to bgwriter_delay, typically 200ms. * That resulted in more frequent wakeups if not much work to do. diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index a8d2e024d3444..72c32f5c32e8c 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -386,6 +386,12 @@ CreateInitDecodingContext(const char *plugin, slot->data.plugin = plugin_name; SpinLockRelease(&slot->mutex); + if (CurrentResourceOwner == NULL) + { + Assert(am_walsender); + CurrentResourceOwner = AuxProcessResourceOwner; + } + if (XLogRecPtrIsInvalid(restart_lsn)) ReplicationSlotReserveWal(); else diff --git a/src/backend/storage/aio/Makefile b/src/backend/storage/aio/Makefile index 3f2469cc39945..86fa4276fda99 100644 --- a/src/backend/storage/aio/Makefile +++ b/src/backend/storage/aio/Makefile @@ -15,6 +15,7 @@ OBJS = \ aio_init.o \ aio_io.o \ aio_target.o \ + io_queue.o \ method_io_uring.o \ method_sync.o \ method_worker.o \ diff --git a/src/backend/storage/aio/README.md b/src/backend/storage/aio/README.md index b00de269ad926..7d9ea23228eac 100644 --- a/src/backend/storage/aio/README.md +++ b/src/backend/storage/aio/README.md @@ -406,6 +406,33 @@ shared memory no less!), completion callbacks instead have to encode errors in a more compact format that can be converted into an error message. +### AIO Bounce Buffers + +For some uses of AIO there is no convenient memory location as the source / +destination of an AIO. E.g. when data checksums are enabled, writes from +shared buffers currently cannot be done directly from shared buffers, as a +shared buffer lock still allows some modification, e.g., for hint bits (see +`FlushBuffer()`). If the write were done in-place, such modifications can +cause the checksum to fail. + +For synchronous IO this is solved by copying the buffer to separate memory +before computing the checksum and using that copy as the source buffer for the +AIO. + +However, for AIO that is not a workable solution: +- Instead of a single buffer many buffers are required, as many IOs might be + in flight +- When using the [worker method](#worker), the source/target of IO needs to be + in shared memory, otherwise the workers won't be able to access the memory. + +The AIO subsystem addresses this by providing a limited number of bounce +buffers that can be used as the source / target for IO. A bounce buffer can be +acquired with `pgaio_bounce_buffer_get()` and multiple bounce buffers can be +associated with an AIO Handle with `pgaio_io_assoc_bounce_buffer()`. + +Bounce buffers are automatically released when the IO completes. + + ## Helpers Using the low-level AIO API introduces too much complexity to do so all over diff --git a/src/backend/storage/aio/aio.c b/src/backend/storage/aio/aio.c index 86f7250b7a5f2..43f1e2a77855c 100644 --- a/src/backend/storage/aio/aio.c +++ b/src/backend/storage/aio/aio.c @@ -38,16 +38,15 @@ #include "postgres.h" -#include "lib/ilist.h" #include "miscadmin.h" #include "port/atomics.h" -#include "storage/aio.h" #include "storage/aio_internal.h" #include "storage/aio_subsys.h" +#include "storage/procnumber.h" #include "utils/guc.h" #include "utils/guc_hooks.h" #include "utils/resowner.h" -#include "utils/wait_event_types.h" +#include "utils/wait_event.h" #ifdef USE_INJECTION_POINTS #include "utils/injection_point.h" @@ -62,6 +61,8 @@ static PgAioHandle *pgaio_io_from_wref(PgAioWaitRef *iow, uint64 *ref_generation static const char *pgaio_io_state_get_name(PgAioHandleState s); static void pgaio_io_wait(PgAioHandle *ioh, uint64 ref_generation); +static void pgaio_bounce_buffer_wait_for_free(void); + /* Options for io_method. */ const struct config_enum_entry io_method_options[] = { @@ -76,6 +77,7 @@ const struct config_enum_entry io_method_options[] = { /* GUCs */ int io_method = DEFAULT_IO_METHOD; int io_max_concurrency = -1; +int io_bounce_buffers = -1; /* global control for AIO */ PgAioCtl *pgaio_ctl; @@ -662,6 +664,21 @@ pgaio_io_reclaim(PgAioHandle *ioh) if (ioh->state != PGAIO_HS_HANDED_OUT) dclist_delete_from(&pgaio_my_backend->in_flight_ios, &ioh->node); + /* reclaim all associated bounce buffers */ + if (!slist_is_empty(&ioh->bounce_buffers)) + { + slist_mutable_iter it; + + slist_foreach_modify(it, &ioh->bounce_buffers) + { + PgAioBounceBuffer *bb = slist_container(PgAioBounceBuffer, node, it.cur); + + slist_delete_current(&it); + + slist_push_head(&pgaio_my_backend->idle_bbs, &bb->node); + } + } + if (ioh->resowner) { ResourceOwnerForgetAioHandle(ioh->resowner, &ioh->resowner_node); @@ -1046,6 +1063,166 @@ pgaio_submit_staged(void) +/* -------------------------------------------------------------------------------- + * Functions primarily related to PgAioBounceBuffer + * -------------------------------------------------------------------------------- + */ + +PgAioBounceBuffer * +pgaio_bounce_buffer_get(void) +{ + PgAioBounceBuffer *bb = NULL; + slist_node *node; + + if (pgaio_my_backend->handed_out_bb != NULL) + elog(ERROR, "can only hand out one BB"); + + /* + * XXX: It probably is not a good idea to have bounce buffers be per + * backend, that's a fair bit of memory. + */ + if (slist_is_empty(&pgaio_my_backend->idle_bbs)) + { + pgaio_bounce_buffer_wait_for_free(); + } + + node = slist_pop_head_node(&pgaio_my_backend->idle_bbs); + bb = slist_container(PgAioBounceBuffer, node, node); + + pgaio_my_backend->handed_out_bb = bb; + + bb->resowner = CurrentResourceOwner; + ResourceOwnerRememberAioBounceBuffer(bb->resowner, &bb->resowner_node); + + return bb; +} + +void +pgaio_io_assoc_bounce_buffer(PgAioHandle *ioh, PgAioBounceBuffer *bb) +{ + if (pgaio_my_backend->handed_out_bb != bb) + elog(ERROR, "can only assign handed out BB"); + pgaio_my_backend->handed_out_bb = NULL; + + /* + * There can be many bounce buffers assigned in case of vectorized IOs. + */ + slist_push_head(&ioh->bounce_buffers, &bb->node); + + /* once associated with an IO, the IO has ownership */ + ResourceOwnerForgetAioBounceBuffer(bb->resowner, &bb->resowner_node); + bb->resowner = NULL; +} + +uint32 +pgaio_bounce_buffer_id(PgAioBounceBuffer *bb) +{ + return bb - pgaio_ctl->bounce_buffers; +} + +void +pgaio_bounce_buffer_release(PgAioBounceBuffer *bb) +{ + if (pgaio_my_backend->handed_out_bb != bb) + elog(ERROR, "can only release handed out BB"); + + slist_push_head(&pgaio_my_backend->idle_bbs, &bb->node); + pgaio_my_backend->handed_out_bb = NULL; + + ResourceOwnerForgetAioBounceBuffer(bb->resowner, &bb->resowner_node); + bb->resowner = NULL; +} + +void +pgaio_bounce_buffer_release_resowner(dlist_node *bb_node, bool on_error) +{ + PgAioBounceBuffer *bb = dlist_container(PgAioBounceBuffer, resowner_node, bb_node); + + Assert(bb->resowner); + + if (!on_error) + elog(WARNING, "leaked AIO bounce buffer"); + + pgaio_bounce_buffer_release(bb); +} + +char * +pgaio_bounce_buffer_buffer(PgAioBounceBuffer *bb) +{ + return bb->buffer; +} + +static void +pgaio_bounce_buffer_wait_for_free(void) +{ + static uint32 lastpos = 0; + + if (pgaio_my_backend->num_staged_ios > 0) + { + pgaio_debug(DEBUG2, "submitting %d, while acquiring free bb", + pgaio_my_backend->num_staged_ios); + pgaio_submit_staged(); + } + + for (uint32 i = lastpos; i < lastpos + io_max_concurrency; i++) + { + uint32 thisoff = pgaio_my_backend->io_handle_off + (i % io_max_concurrency); + PgAioHandle *ioh = &pgaio_ctl->io_handles[thisoff]; + + switch (ioh->state) + { + case PGAIO_HS_IDLE: + case PGAIO_HS_HANDED_OUT: + continue; + case PGAIO_HS_DEFINED: /* should have been submitted above */ + case PGAIO_HS_STAGED: + elog(ERROR, "shouldn't get here with io:%d in state %d", + pgaio_io_get_id(ioh), ioh->state); + break; + case PGAIO_HS_COMPLETED_IO: + case PGAIO_HS_SUBMITTED: + if (!slist_is_empty(&ioh->bounce_buffers)) + { + pgaio_debug_io(DEBUG2, ioh, + "waiting for IO to reclaim BB with %d in flight", + dclist_count(&pgaio_my_backend->in_flight_ios)); + + /* see comment in pgaio_io_wait_for_free() about raciness */ + pgaio_io_wait(ioh, ioh->generation); + + if (slist_is_empty(&pgaio_my_backend->idle_bbs)) + elog(WARNING, "empty after wait"); + + if (!slist_is_empty(&pgaio_my_backend->idle_bbs)) + { + lastpos = i; + return; + } + } + break; + case PGAIO_HS_COMPLETED_SHARED: + case PGAIO_HS_COMPLETED_LOCAL: + /* reclaim */ + pgaio_io_reclaim(ioh); + + if (!slist_is_empty(&pgaio_my_backend->idle_bbs)) + { + lastpos = i; + return; + } + break; + } + } + + /* + * The submission above could have caused the IO to complete at any time. + */ + if (slist_is_empty(&pgaio_my_backend->idle_bbs)) + elog(PANIC, "no more bbs"); +} + + + /* -------------------------------------------------------------------------------- * Other * -------------------------------------------------------------------------------- diff --git a/src/backend/storage/aio/aio_callback.c b/src/backend/storage/aio/aio_callback.c index bf42778a48c70..4e0f2224c175d 100644 --- a/src/backend/storage/aio/aio_callback.c +++ b/src/backend/storage/aio/aio_callback.c @@ -16,7 +16,6 @@ #include "postgres.h" #include "miscadmin.h" -#include "storage/aio.h" #include "storage/aio_internal.h" #include "storage/bufmgr.h" #include "storage/md.h" @@ -41,10 +40,13 @@ static const PgAioHandleCallbacksEntry aio_handle_cbs[] = { CALLBACK_ENTRY(PGAIO_HCB_INVALID, aio_invalid_cb), CALLBACK_ENTRY(PGAIO_HCB_MD_READV, aio_md_readv_cb), + CALLBACK_ENTRY(PGAIO_HCB_MD_WRITEV, aio_md_writev_cb), CALLBACK_ENTRY(PGAIO_HCB_SHARED_BUFFER_READV, aio_shared_buffer_readv_cb), + CALLBACK_ENTRY(PGAIO_HCB_SHARED_BUFFER_WRITEV, aio_shared_buffer_writev_cb), CALLBACK_ENTRY(PGAIO_HCB_LOCAL_BUFFER_READV, aio_local_buffer_readv_cb), + CALLBACK_ENTRY(PGAIO_HCB_LOCAL_BUFFER_WRITEV, aio_local_buffer_writev_cb), #undef CALLBACK_ENTRY }; diff --git a/src/backend/storage/aio/aio_init.c b/src/backend/storage/aio/aio_init.c index 885c3940c6626..d0e982322b80e 100644 --- a/src/backend/storage/aio/aio_init.c +++ b/src/backend/storage/aio/aio_init.c @@ -15,13 +15,12 @@ #include "postgres.h" #include "miscadmin.h" -#include "storage/aio.h" #include "storage/aio_internal.h" #include "storage/aio_subsys.h" #include "storage/bufmgr.h" -#include "storage/io_worker.h" #include "storage/ipc.h" #include "storage/proc.h" +#include "storage/procnumber.h" #include "storage/shmem.h" #include "utils/guc.h" @@ -88,6 +87,32 @@ AioHandleDataShmemSize(void) io_max_concurrency)); } +static Size +AioBounceBufferDescShmemSize(void) +{ + Size sz; + + /* PgAioBounceBuffer itself */ + sz = mul_size(sizeof(PgAioBounceBuffer), + mul_size(AioProcs(), io_bounce_buffers)); + + return sz; +} + +static Size +AioBounceBufferDataShmemSize(void) +{ + Size sz; + + /* and the associated buffer */ + sz = mul_size(BLCKSZ, + mul_size(io_bounce_buffers, AioProcs())); + /* memory for alignment */ + sz += BLCKSZ; + + return sz; +} + /* * Choose a suitable value for io_max_concurrency. * @@ -113,6 +138,33 @@ AioChooseMaxConcurrency(void) return Min(max_proportional_pins, 64); } +/* + * Choose a suitable value for io_bounce_buffers. + * + * It's very unlikely to be useful to allocate more bounce buffers for each + * backend than the backend is allowed to pin. Additionally, bounce buffers + * currently are used for writes, it seems very uncommon for more than 10% of + * shared_buffers to be written out concurrently. + * + * XXX: This quickly can take up significant amounts of memory, the logic + * should probably fine tuned. + */ +static int +AioChooseBounceBuffers(void) +{ + uint32 max_backends; + int max_proportional_pins; + + /* Similar logic to LimitAdditionalPins() */ + max_backends = MaxBackends + NUM_AUXILIARY_PROCS; + max_proportional_pins = (NBuffers / 10) / max_backends; + + max_proportional_pins = Max(max_proportional_pins, 1); + + /* apply upper limit */ + return Min(max_proportional_pins, 256); +} + Size AioShmemSize(void) { @@ -136,11 +188,31 @@ AioShmemSize(void) PGC_S_OVERRIDE); } + + /* + * If io_bounce_buffers is -1, we automatically choose a suitable value. + * + * See also comment above. + */ + if (io_bounce_buffers == -1) + { + char buf[32]; + + snprintf(buf, sizeof(buf), "%d", AioChooseBounceBuffers()); + SetConfigOption("io_bounce_buffers", buf, PGC_POSTMASTER, + PGC_S_DYNAMIC_DEFAULT); + if (io_bounce_buffers == -1) /* failed to apply it? */ + SetConfigOption("io_bounce_buffers", buf, PGC_POSTMASTER, + PGC_S_OVERRIDE); + } + sz = add_size(sz, AioCtlShmemSize()); sz = add_size(sz, AioBackendShmemSize()); sz = add_size(sz, AioHandleShmemSize()); sz = add_size(sz, AioHandleIOVShmemSize()); sz = add_size(sz, AioHandleDataShmemSize()); + sz = add_size(sz, AioBounceBufferDescShmemSize()); + sz = add_size(sz, AioBounceBufferDataShmemSize()); /* Reserve space for method specific resources. */ if (pgaio_method_ops->shmem_size) @@ -156,6 +228,9 @@ AioShmemInit(void) uint32 io_handle_off = 0; uint32 iovec_off = 0; uint32 per_backend_iovecs = io_max_concurrency * io_max_combine_limit; + uint32 bounce_buffers_off = 0; + uint32 per_backend_bb = io_bounce_buffers; + char *bounce_buffers_data; pgaio_ctl = (PgAioCtl *) ShmemInitStruct("AioCtl", AioCtlShmemSize(), &found); @@ -167,6 +242,7 @@ AioShmemInit(void) pgaio_ctl->io_handle_count = AioProcs() * io_max_concurrency; pgaio_ctl->iovec_count = AioProcs() * per_backend_iovecs; + pgaio_ctl->bounce_buffers_count = AioProcs() * per_backend_bb; pgaio_ctl->backend_state = (PgAioBackend *) ShmemInitStruct("AioBackend", AioBackendShmemSize(), &found); @@ -179,6 +255,40 @@ AioShmemInit(void) pgaio_ctl->handle_data = (uint64 *) ShmemInitStruct("AioHandleData", AioHandleDataShmemSize(), &found); + pgaio_ctl->bounce_buffers = (PgAioBounceBuffer *) + ShmemInitStruct("AioBounceBufferDesc", AioBounceBufferDescShmemSize(), + &found); + + bounce_buffers_data = + ShmemInitStruct("AioBounceBufferData", AioBounceBufferDataShmemSize(), + &found); + bounce_buffers_data = + (char *) TYPEALIGN(BLCKSZ, (uintptr_t) bounce_buffers_data); + pgaio_ctl->bounce_buffers_data = bounce_buffers_data; + + + /* Initialize IO handles. */ + for (uint64 i = 0; i < pgaio_ctl->io_handle_count; i++) + { + PgAioHandle *ioh = &pgaio_ctl->io_handles[i]; + + ioh->op = PGAIO_OP_INVALID; + ioh->target = PGAIO_TID_INVALID; + ioh->state = PGAIO_HS_IDLE; + + slist_init(&ioh->bounce_buffers); + } + + /* Initialize Bounce Buffers. */ + for (uint64 i = 0; i < pgaio_ctl->bounce_buffers_count; i++) + { + PgAioBounceBuffer *bb = &pgaio_ctl->bounce_buffers[i]; + + bb->buffer = bounce_buffers_data; + bounce_buffers_data += BLCKSZ; + } + + for (int procno = 0; procno < AioProcs(); procno++) { PgAioBackend *bs = &pgaio_ctl->backend_state[procno]; @@ -186,9 +296,13 @@ AioShmemInit(void) bs->io_handle_off = io_handle_off; io_handle_off += io_max_concurrency; + bs->bounce_buffers_off = bounce_buffers_off; + bounce_buffers_off += per_backend_bb; + dclist_init(&bs->idle_ios); memset(bs->staged_ios, 0, sizeof(PgAioHandle *) * PGAIO_SUBMIT_BATCH_SIZE); dclist_init(&bs->in_flight_ios); + slist_init(&bs->idle_bbs); /* initialize per-backend IOs */ for (int i = 0; i < io_max_concurrency; i++) @@ -210,6 +324,14 @@ AioShmemInit(void) dclist_push_tail(&bs->idle_ios, &ioh->node); iovec_off += io_max_combine_limit; } + + /* initialize per-backend bounce buffers */ + for (int i = 0; i < per_backend_bb; i++) + { + PgAioBounceBuffer *bb = &pgaio_ctl->bounce_buffers[bs->bounce_buffers_off + i]; + + slist_push_head(&bs->idle_bbs, &bb->node); + } } out: diff --git a/src/backend/storage/aio/aio_io.c b/src/backend/storage/aio/aio_io.c index 4d31392ddc704..09fa56063988a 100644 --- a/src/backend/storage/aio/aio_io.c +++ b/src/backend/storage/aio/aio_io.c @@ -19,9 +19,7 @@ #include "postgres.h" #include "miscadmin.h" -#include "storage/aio.h" #include "storage/aio_internal.h" -#include "storage/fd.h" #include "utils/wait_event.h" diff --git a/src/backend/storage/aio/aio_target.c b/src/backend/storage/aio/aio_target.c index ac6c74f4ff264..2213bdadaa07c 100644 --- a/src/backend/storage/aio/aio_target.c +++ b/src/backend/storage/aio/aio_target.c @@ -14,7 +14,6 @@ #include "postgres.h" -#include "storage/aio.h" #include "storage/aio_internal.h" #include "storage/smgr.h" diff --git a/src/backend/storage/aio/io_queue.c b/src/backend/storage/aio/io_queue.c new file mode 100644 index 0000000000000..526aa1d5e06da --- /dev/null +++ b/src/backend/storage/aio/io_queue.c @@ -0,0 +1,204 @@ +/*------------------------------------------------------------------------- + * + * io_queue.c + * AIO - Mechanism for tracking many IOs + * + * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/storage/aio/io_queue.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "lib/ilist.h" +#include "storage/aio.h" +#include "storage/io_queue.h" +#include "utils/resowner.h" + + + +typedef struct TrackedIO +{ + PgAioWaitRef iow; + dlist_node node; +} TrackedIO; + +struct IOQueue +{ + int depth; + int unsubmitted; + + bool has_reserved; + + dclist_head idle; + dclist_head in_progress; + + TrackedIO tracked_ios[FLEXIBLE_ARRAY_MEMBER]; +}; + + +IOQueue * +io_queue_create(int depth, int flags) +{ + size_t sz; + IOQueue *ioq; + + sz = offsetof(IOQueue, tracked_ios) + + sizeof(TrackedIO) * depth; + + ioq = palloc0(sz); + + ioq->depth = 0; + + for (int i = 0; i < depth; i++) + { + TrackedIO *tio = &ioq->tracked_ios[i]; + + pgaio_wref_clear(&tio->iow); + dclist_push_tail(&ioq->idle, &tio->node); + } + + return ioq; +} + +void +io_queue_wait_one(IOQueue *ioq) +{ + /* submit all pending IO before waiting */ + pgaio_submit_staged(); + + while (!dclist_is_empty(&ioq->in_progress)) + { + /* FIXME: Should we really pop here already? */ + dlist_node *node = dclist_pop_head_node(&ioq->in_progress); + TrackedIO *tio = dclist_container(TrackedIO, node, node); + + pgaio_wref_wait(&tio->iow); + dclist_push_head(&ioq->idle, &tio->node); + } +} + +void +io_queue_reserve(IOQueue *ioq) +{ + if (ioq->has_reserved) + return; + + if (dclist_is_empty(&ioq->idle)) + io_queue_wait_one(ioq); + + Assert(!dclist_is_empty(&ioq->idle)); + + ioq->has_reserved = true; +} + +PgAioHandle * +io_queue_acquire_io(IOQueue *ioq) +{ + PgAioHandle *ioh; + + io_queue_reserve(ioq); + + Assert(!dclist_is_empty(&ioq->idle)); + + if (!io_queue_is_empty(ioq)) + { + ioh = pgaio_io_acquire_nb(CurrentResourceOwner, NULL); + if (ioh == NULL) + { + /* + * Need to wait for all IOs, blocking might not be legal in the + * context. + * + * XXX: This doesn't make a whole lot of sense, we're also + * blocking here. What was I smoking when I wrote the above? + */ + io_queue_wait_all(ioq); + ioh = pgaio_io_acquire(CurrentResourceOwner, NULL); + } + } + else + { + ioh = pgaio_io_acquire(CurrentResourceOwner, NULL); + } + + return ioh; +} + +void +io_queue_track(IOQueue *ioq, const struct PgAioWaitRef *iow) +{ + dlist_node *node; + TrackedIO *tio; + + Assert(ioq->has_reserved); + ioq->has_reserved = false; + + Assert(!dclist_is_empty(&ioq->idle)); + + node = dclist_pop_head_node(&ioq->idle); + tio = dclist_container(TrackedIO, node, node); + + tio->iow = *iow; + + dclist_push_tail(&ioq->in_progress, &tio->node); + + ioq->unsubmitted++; + + /* + * XXX: Should have some smarter logic here. We don't want to wait too + * long to submit, that'll mean we're more likely to block. But we also + * don't want to have the overhead of submitting every IO individually. + */ + if (ioq->unsubmitted >= 4) + { + pgaio_submit_staged(); + ioq->unsubmitted = 0; + } +} + +void +io_queue_wait_all(IOQueue *ioq) +{ + /* submit all pending IO before waiting */ + pgaio_submit_staged(); + + while (!dclist_is_empty(&ioq->in_progress)) + { + /* wait for the last IO to minimize unnecessary wakeups */ + dlist_node *node = dclist_tail_node(&ioq->in_progress); + TrackedIO *tio = dclist_container(TrackedIO, node, node); + + if (!pgaio_wref_check_done(&tio->iow)) + { + ereport(DEBUG3, + errmsg("io_queue_wait_all for io:%d", + pgaio_wref_get_id(&tio->iow)), + errhidestmt(true), + errhidecontext(true)); + + pgaio_wref_wait(&tio->iow); + } + + dclist_delete_from(&ioq->in_progress, &tio->node); + dclist_push_head(&ioq->idle, &tio->node); + } +} + +bool +io_queue_is_empty(IOQueue *ioq) +{ + return dclist_is_empty(&ioq->in_progress); +} + +void +io_queue_free(IOQueue *ioq) +{ + io_queue_wait_all(ioq); + + pfree(ioq); +} diff --git a/src/backend/storage/aio/meson.build b/src/backend/storage/aio/meson.build index da6df2d3654f9..270c4a64428b1 100644 --- a/src/backend/storage/aio/meson.build +++ b/src/backend/storage/aio/meson.build @@ -7,6 +7,7 @@ backend_sources += files( 'aio_init.c', 'aio_io.c', 'aio_target.c', + 'io_queue.c', 'method_io_uring.c', 'method_sync.c', 'method_worker.c', diff --git a/src/backend/storage/aio/method_io_uring.c b/src/backend/storage/aio/method_io_uring.c index 0bcdab14ae7e3..244918e1883df 100644 --- a/src/backend/storage/aio/method_io_uring.c +++ b/src/backend/storage/aio/method_io_uring.c @@ -25,11 +25,12 @@ #include "postgres.h" /* included early, for IOMETHOD_IO_URING_ENABLED */ -#include "storage/aio.h" +#include "storage/aio.h" /* IWYU pragma: keep */ #ifdef IOMETHOD_IO_URING_ENABLED #include +/* IWYU pragma: no_include */ #include "miscadmin.h" #include "storage/aio_internal.h" @@ -302,14 +303,41 @@ pgaio_uring_submit(uint16 num_staged_ios, PgAioHandle **staged_ios) return num_staged_ios; } +static void +pgaio_uring_completion_error_callback(void *arg) +{ + ProcNumber owner; + PGPROC *owner_proc; + int32 owner_pid; + PgAioHandle *ioh = arg; + + if (!ioh) + return; + + /* No need for context if a backend is completing the IO for itself */ + if (ioh->owner_procno == MyProcNumber) + return; + + owner = ioh->owner_procno; + owner_proc = GetPGProcByNumber(owner); + owner_pid = owner_proc->pid; + + errcontext("completing I/O on behalf of process %d", owner_pid); +} + static void pgaio_uring_drain_locked(PgAioUringContext *context) { int ready; int orig_ready; + ErrorContextCallback errcallback = {0}; Assert(LWLockHeldByMeInMode(&context->completion_lock, LW_EXCLUSIVE)); + errcallback.callback = pgaio_uring_completion_error_callback; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + /* * Don't drain more events than available right now. Otherwise it's * plausible that one backend could get stuck, for a while, receiving CQEs @@ -337,9 +365,11 @@ pgaio_uring_drain_locked(PgAioUringContext *context) PgAioHandle *ioh; ioh = io_uring_cqe_get_data(cqe); + errcallback.arg = ioh; io_uring_cqe_seen(&context->io_uring_ring, cqe); pgaio_io_process_completion(ioh, cqe->res); + errcallback.arg = NULL; } END_CRIT_SECTION(); @@ -348,6 +378,8 @@ pgaio_uring_drain_locked(PgAioUringContext *context) "drained %d/%d, now expecting %d", ncqes, orig_ready, io_uring_cq_ready(&context->io_uring_ring)); } + + error_context_stack = errcallback.previous; } static void diff --git a/src/backend/storage/aio/method_worker.c b/src/backend/storage/aio/method_worker.c index 4a7853d13fac9..52f901ed4c2c8 100644 --- a/src/backend/storage/aio/method_worker.c +++ b/src/backend/storage/aio/method_worker.c @@ -34,13 +34,17 @@ #include "port/pg_bitutils.h" #include "postmaster/auxprocess.h" #include "postmaster/interrupt.h" -#include "storage/aio.h" #include "storage/aio_internal.h" #include "storage/aio_subsys.h" #include "storage/io_worker.h" #include "storage/ipc.h" #include "storage/latch.h" +#include "storage/lwlock.h" +#include "storage/lwlocknames.h" #include "storage/proc.h" +#include "storage/procsignal.h" +#include "storage/shmem.h" +#include "storage/waiteventset.h" #include "tcop/tcopprot.h" #include "utils/ps_status.h" #include "utils/wait_event.h" @@ -357,11 +361,33 @@ pgaio_worker_register(void) on_shmem_exit(pgaio_worker_die, 0); } +static void +pgaio_worker_error_callback(void *arg) +{ + ProcNumber owner; + PGPROC *owner_proc; + int32 owner_pid; + PgAioHandle *ioh = arg; + + if (!ioh) + return; + + Assert(ioh->owner_procno != MyProcNumber); + Assert(MyBackendType == B_IO_WORKER); + + owner = ioh->owner_procno; + owner_proc = GetPGProcByNumber(owner); + owner_pid = owner_proc->pid; + + errcontext("I/O worker executing I/O on behalf of process %d", owner_pid); +} + void IoWorkerMain(const void *startup_data, size_t startup_data_len) { sigjmp_buf local_sigjmp_buf; PgAioHandle *volatile error_ioh = NULL; + ErrorContextCallback errcallback = {0}; volatile int error_errno = 0; char cmd[128]; @@ -388,6 +414,10 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len) sprintf(cmd, "%d", MyIoWorkerId); set_ps_display(cmd); + errcallback.callback = pgaio_worker_error_callback; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + /* see PostgresMain() */ if (sigsetjmp(local_sigjmp_buf, 1) != 0) { @@ -471,6 +501,7 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len) ioh = &pgaio_ctl->io_handles[io_index]; error_ioh = ioh; + errcallback.arg = ioh; pgaio_debug_io(DEBUG4, ioh, "worker %d processing IO", @@ -511,6 +542,7 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len) pgaio_io_perform_synchronously(ioh); RESUME_INTERRUPTS(); + errcallback.arg = NULL; } else { @@ -522,6 +554,7 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len) CHECK_FOR_INTERRUPTS(); } + error_context_stack = errcallback.previous; proc_exit(0); } diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index 36c54fb695b05..cec93129f582c 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -404,6 +404,30 @@ read_stream_start_pending_read(ReadStream *stream) static void read_stream_look_ahead(ReadStream *stream) { + /* + * Batch-submitting multiple IOs is more efficient than doing so + * one-by-one. If we just ramp up to the max, we'll only be allowed to + * submit one io_combine_limit sized IO. Defer submitting IO in that case. + * + * FIXME: This needs better heuristics. + */ +#if 1 + if (!stream->sync_mode && stream->distance > (io_combine_limit * 8)) + { + if (stream->pinned_buffers + stream->pending_read_nblocks > ((stream->distance * 3) / 4)) + { +#if 0 + ereport(LOG, + errmsg("reduce reduce reduce: pinned: %d, pending: %d, distance: %d", + stream->pinned_buffers, + stream->pending_read_nblocks, + stream->distance)); +#endif + return; + } + } +#endif + /* * Allow amortizing the cost of submitting IO over multiple IOs. This * requires that we don't do any operations that could lead to a deadlock diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 1c37d7dfe2f92..23de216f35849 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -52,6 +52,7 @@ #include "storage/buf_internals.h" #include "storage/bufmgr.h" #include "storage/fd.h" +#include "storage/io_queue.h" #include "storage/ipc.h" #include "storage/lmgr.h" #include "storage/proc.h" @@ -76,6 +77,7 @@ /* Bits in SyncOneBuffer's return value */ #define BUF_WRITTEN 0x01 #define BUF_REUSABLE 0x02 +#define BUF_CANT_MERGE 0x04 #define RELS_BSEARCH_THRESHOLD 20 @@ -515,8 +517,6 @@ static void UnpinBuffer(BufferDesc *buf); static void UnpinBufferNoOwner(BufferDesc *buf); static void BufferSync(int flags); static uint32 WaitBufHdrUnlocked(BufferDesc *buf); -static int SyncOneBuffer(int buf_id, bool skip_recently_used, - WritebackContext *wb_context); static void WaitIO(BufferDesc *buf); static void AbortBufferIO(Buffer buffer); static void shared_buffer_write_error_callback(void *arg); @@ -532,6 +532,7 @@ static void CheckReadBuffersOperation(ReadBuffersOperation *operation, bool is_c static Buffer GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context); static void FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, IOContext io_context); + static void FindAndDropRelationBuffers(RelFileLocator rlocator, ForkNumber forkNum, BlockNumber nForkBlock, @@ -1776,6 +1777,18 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) IOObject io_object; bool did_start_io; +#if 0 + ereport(DEBUG3, + errmsg("%s: op->blocks: %d, op->blocks_done: %d, *nblocks_progress: %d, first buf %d", + __func__, + operation->nblocks, + nblocks_done, + *nblocks_progress, + buffers[0]), + errhidestmt(true), + errhidecontext(true)); +#endif + /* * When this IO is executed synchronously, either because the caller will * immediately block waiting for the IO or because IOMETHOD_SYNC is used, @@ -1869,6 +1882,13 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) operation->nblocks_done += 1; *nblocks_progress = 1; + ereport(DEBUG3, + errmsg("%s - trunc: %d", + __func__, + operation->nblocks_done), + errhidestmt(true), + errhidecontext(true)); + pgaio_io_release(ioh); pgaio_wref_clear(&operation->io_wref); did_start_io = false; @@ -1915,6 +1935,12 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) */ for (int i = nblocks_done + 1; i < operation->nblocks; i++) { +#if 0 + /* FIXME: Remove forced short read */ + if (i > 3) + break; +#endif + if (!ReadBuffersCanStartIO(buffers[i], true)) break; /* Must be consecutive block numbers. */ @@ -3321,6 +3347,57 @@ UnpinBufferNoOwner(BufferDesc *buf) } } +typedef struct BuffersToWrite +{ + int nbuffers; + BufferTag start_at_tag; + uint32 max_combine; + + XLogRecPtr max_lsn; + + PgAioHandle *ioh; + PgAioWaitRef iow; + + uint64 total_writes; + + Buffer buffers[IOV_MAX]; + PgAioBounceBuffer *bounce_buffers[IOV_MAX]; + const void *data_ptrs[IOV_MAX]; +} BuffersToWrite; + +static int PrepareToWriteBuffer(BuffersToWrite *to_write, Buffer buf, + bool skip_recently_used, + IOQueue *ioq, WritebackContext *wb_context); + +static void WriteBuffers(BuffersToWrite *to_write, + IOQueue *ioq, WritebackContext *wb_context); + +static void +BuffersToWriteInit(BuffersToWrite *to_write, + IOQueue *ioq, WritebackContext *wb_context) +{ + to_write->total_writes = 0; + to_write->nbuffers = 0; + to_write->ioh = NULL; + pgaio_wref_clear(&to_write->iow); + to_write->max_lsn = InvalidXLogRecPtr; + + pgaio_enter_batchmode(); +} + +static void +BuffersToWriteEnd(BuffersToWrite *to_write) +{ + if (to_write->ioh != NULL) + { + pgaio_io_release(to_write->ioh); + to_write->ioh = NULL; + } + + pgaio_exit_batchmode(); +} + + #define ST_SORT sort_checkpoint_bufferids #define ST_ELEMENT_TYPE CkptSortItem #define ST_COMPARE(a, b) ckpt_buforder_comparator(a, b) @@ -3352,7 +3429,10 @@ BufferSync(int flags) binaryheap *ts_heap; int i; int mask = BM_DIRTY; + IOQueue *ioq; WritebackContext wb_context; + BuffersToWrite to_write; + int max_combine; /* * Unless this is a shutdown checkpoint or we have been explicitly told, @@ -3414,7 +3494,9 @@ BufferSync(int flags) if (num_to_scan == 0) return; /* nothing to do */ + ioq = io_queue_create(512, 0); WritebackContextInit(&wb_context, &checkpoint_flush_after); + max_combine = Min(io_bounce_buffers, io_combine_limit); TRACE_POSTGRESQL_BUFFER_SYNC_START(NBuffers, num_to_scan); @@ -3522,48 +3604,91 @@ BufferSync(int flags) */ num_processed = 0; num_written = 0; + + BuffersToWriteInit(&to_write, ioq, &wb_context); + while (!binaryheap_empty(ts_heap)) { BufferDesc *bufHdr = NULL; CkptTsStatus *ts_stat = (CkptTsStatus *) DatumGetPointer(binaryheap_first(ts_heap)); + bool batch_continue = true; - buf_id = CkptBufferIds[ts_stat->index].buf_id; - Assert(buf_id != -1); - - bufHdr = GetBufferDescriptor(buf_id); - - num_processed++; + Assert(ts_stat->num_scanned <= ts_stat->num_to_scan); /* - * We don't need to acquire the lock here, because we're only looking - * at a single bit. It's possible that someone else writes the buffer - * and clears the flag right after we check, but that doesn't matter - * since SyncOneBuffer will then do nothing. However, there is a - * further race condition: it's conceivable that between the time we - * examine the bit here and the time SyncOneBuffer acquires the lock, - * someone else not only wrote the buffer but replaced it with another - * page and dirtied it. In that improbable case, SyncOneBuffer will - * write the buffer though we didn't need to. It doesn't seem worth - * guarding against this, though. + * Collect a batch of buffers to write out from the current + * tablespace. That causes some imbalance between the tablespaces, but + * that's more than outweighed by the efficiency gain due to batching. */ - if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED) + while (batch_continue && + to_write.nbuffers < max_combine && + ts_stat->num_scanned < ts_stat->num_to_scan) { - if (SyncOneBuffer(buf_id, false, &wb_context) & BUF_WRITTEN) + buf_id = CkptBufferIds[ts_stat->index].buf_id; + Assert(buf_id != -1); + + bufHdr = GetBufferDescriptor(buf_id); + + num_processed++; + + /* + * We don't need to acquire the lock here, because we're only + * looking at a single bit. It's possible that someone else writes + * the buffer and clears the flag right after we check, but that + * doesn't matter since SyncOneBuffer will then do nothing. + * However, there is a further race condition: it's conceivable + * that between the time we examine the bit here and the time + * SyncOneBuffer acquires the lock, someone else not only wrote + * the buffer but replaced it with another page and dirtied it. In + * that improbable case, SyncOneBuffer will write the buffer + * though we didn't need to. It doesn't seem worth guarding + * against this, though. + */ + if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED) + { + int result = PrepareToWriteBuffer(&to_write, buf_id + 1, false, + ioq, &wb_context); + + if (result & BUF_CANT_MERGE) + { + Assert(to_write.nbuffers > 0); + WriteBuffers(&to_write, ioq, &wb_context); + + result = PrepareToWriteBuffer(&to_write, buf_id + 1, false, + ioq, &wb_context); + Assert(result != BUF_CANT_MERGE); + } + + if (result & BUF_WRITTEN) + { + TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id); + PendingCheckpointerStats.buffers_written++; + num_written++; + } + else + { + batch_continue = false; + } + } + else { - TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id); - PendingCheckpointerStats.buffers_written++; - num_written++; + if (to_write.nbuffers > 0) + WriteBuffers(&to_write, ioq, &wb_context); } + + /* + * Measure progress independent of actually having to flush the + * buffer - otherwise writing become unbalanced. + */ + ts_stat->progress += ts_stat->progress_slice; + ts_stat->num_scanned++; + ts_stat->index++; } - /* - * Measure progress independent of actually having to flush the buffer - * - otherwise writing become unbalanced. - */ - ts_stat->progress += ts_stat->progress_slice; - ts_stat->num_scanned++; - ts_stat->index++; + if (to_write.nbuffers > 0) + WriteBuffers(&to_write, ioq, &wb_context); + /* Have all the buffers from the tablespace been processed? */ if (ts_stat->num_scanned == ts_stat->num_to_scan) @@ -3581,15 +3706,23 @@ BufferSync(int flags) * * (This will check for barrier events even if it doesn't sleep.) */ - CheckpointWriteDelay(flags, (double) num_processed / num_to_scan); + CheckpointWriteDelay(ioq, flags, (double) num_processed / num_to_scan); } + Assert(to_write.nbuffers == 0); + io_queue_wait_all(ioq); + /* * Issue all pending flushes. Only checkpointer calls BufferSync(), so * IOContext will always be IOCONTEXT_NORMAL. */ IssuePendingWritebacks(&wb_context, IOCONTEXT_NORMAL); + io_queue_wait_all(ioq); /* IssuePendingWritebacks might have added + * more */ + io_queue_free(ioq); + BuffersToWriteEnd(&to_write); + pfree(per_ts_stat); per_ts_stat = NULL; binaryheap_free(ts_heap); @@ -3615,7 +3748,7 @@ BufferSync(int flags) * bgwriter_lru_maxpages to 0.) */ bool -BgBufferSync(WritebackContext *wb_context) +BgBufferSync(IOQueue *ioq, WritebackContext *wb_context) { /* info obtained from freelist.c */ int strategy_buf_id; @@ -3658,6 +3791,9 @@ BgBufferSync(WritebackContext *wb_context) long new_strategy_delta; uint32 new_recent_alloc; + BuffersToWrite to_write; + int max_combine; + /* * Find out where the freelist clock sweep currently is, and how many * buffer allocations have happened since our last call. @@ -3678,6 +3814,8 @@ BgBufferSync(WritebackContext *wb_context) return true; } + max_combine = Min(io_bounce_buffers, io_combine_limit); + /* * Compute strategy_delta = how many buffers have been scanned by the * clock sweep since last time. If first time through, assume none. Then @@ -3834,11 +3972,25 @@ BgBufferSync(WritebackContext *wb_context) num_written = 0; reusable_buffers = reusable_buffers_est; + BuffersToWriteInit(&to_write, ioq, wb_context); + /* Execute the LRU scan */ while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est) { - int sync_state = SyncOneBuffer(next_to_clean, true, - wb_context); + int sync_state; + + sync_state = PrepareToWriteBuffer(&to_write, next_to_clean + 1, + true, ioq, wb_context); + if (sync_state & BUF_CANT_MERGE) + { + Assert(to_write.nbuffers > 0); + + WriteBuffers(&to_write, ioq, wb_context); + + sync_state = PrepareToWriteBuffer(&to_write, next_to_clean + 1, + true, ioq, wb_context); + Assert(sync_state != BUF_CANT_MERGE); + } if (++next_to_clean >= NBuffers) { @@ -3849,6 +4001,13 @@ BgBufferSync(WritebackContext *wb_context) if (sync_state & BUF_WRITTEN) { + Assert(sync_state & BUF_REUSABLE); + + if (to_write.nbuffers == max_combine) + { + WriteBuffers(&to_write, ioq, wb_context); + } + reusable_buffers++; if (++num_written >= bgwriter_lru_maxpages) { @@ -3860,6 +4019,11 @@ BgBufferSync(WritebackContext *wb_context) reusable_buffers++; } + if (to_write.nbuffers > 0) + WriteBuffers(&to_write, ioq, wb_context); + + BuffersToWriteEnd(&to_write); + PendingBgWriterStats.buf_written_clean += num_written; #ifdef BGW_DEBUG @@ -3898,8 +4062,66 @@ BgBufferSync(WritebackContext *wb_context) return (bufs_to_lap == 0 && recent_alloc == 0); } +static inline bool +BufferTagsSameRel(const BufferTag *tag1, const BufferTag *tag2) +{ + return (tag1->spcOid == tag2->spcOid) && + (tag1->dbOid == tag2->dbOid) && + (tag1->relNumber == tag2->relNumber) && + (tag1->forkNum == tag2->forkNum) + ; +} + +static bool +CanMergeWrite(BuffersToWrite *to_write, BufferDesc *cur_buf_hdr) +{ + BlockNumber cur_block = cur_buf_hdr->tag.blockNum; + + Assert(to_write->nbuffers > 0); /* can't merge with nothing */ + Assert(to_write->start_at_tag.relNumber != InvalidOid); + Assert(to_write->start_at_tag.blockNum != InvalidBlockNumber); + + Assert(to_write->ioh != NULL); + + /* + * First check if the blocknumber is one that we could actually merge, + * that's cheaper than checking the tablespace/db/relnumber/fork match. + */ + if (to_write->start_at_tag.blockNum + to_write->nbuffers != cur_block) + return false; + + if (!BufferTagsSameRel(&to_write->start_at_tag, &cur_buf_hdr->tag)) + return false; + + /* + * Need to check with smgr how large a write we're allowed to make. To + * reduce the overhead of the smgr check, only inquire once, when + * processing the first to-be-merged buffer. That avoids the overhead in + * the common case of writing out buffers that definitely not mergeable. + */ + if (to_write->nbuffers == 1) + { + SMgrRelation smgr; + + smgr = smgropen(BufTagGetRelFileLocator(&to_write->start_at_tag), INVALID_PROC_NUMBER); + + to_write->max_combine = smgrmaxcombine(smgr, + to_write->start_at_tag.forkNum, + to_write->start_at_tag.blockNum); + } + else + { + Assert(to_write->max_combine > 0); + } + + if (to_write->start_at_tag.blockNum + to_write->max_combine <= cur_block) + return false; + + return true; +} + /* - * SyncOneBuffer -- process a single buffer during syncing. + * PrepareToWriteBuffer -- process a single buffer during syncing. * * If skip_recently_used is true, we don't write currently-pinned buffers, nor * buffers marked recently used, as these are not replacement candidates. @@ -3908,22 +4130,50 @@ BgBufferSync(WritebackContext *wb_context) * BUF_WRITTEN: we wrote the buffer. * BUF_REUSABLE: buffer is available for replacement, ie, it has * pin count 0 and usage count 0. + * BUF_CANT_MERGE: can't combine this write with prior writes, caller needs + * to issue those first * * (BUF_WRITTEN could be set in error if FlushBuffer finds the buffer clean * after locking it, but we don't care all that much.) */ static int -SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context) +PrepareToWriteBuffer(BuffersToWrite *to_write, Buffer buf, + bool skip_recently_used, + IOQueue *ioq, WritebackContext *wb_context) { - BufferDesc *bufHdr = GetBufferDescriptor(buf_id); - int result = 0; + BufferDesc *cur_buf_hdr = GetBufferDescriptor(buf - 1); uint32 buf_state; - BufferTag tag; + int result = 0; + XLogRecPtr cur_buf_lsn; + LWLock *content_lock; + bool may_block; + + /* + * Check if this buffer can be written out together with already prepared + * writes. We check before we have pinned the buffer, so the buffer can be + * written out and replaced between this check and us pinning the buffer - + * we'll recheck below. The reason for the pre-check is that we don't want + * to pin the buffer just to find out that we can't merge the IO. + */ + if (to_write->nbuffers != 0) + { + if (!CanMergeWrite(to_write, cur_buf_hdr)) + { + result |= BUF_CANT_MERGE; + return result; + } + } + else + { + to_write->start_at_tag = cur_buf_hdr->tag; + } /* Make sure we can handle the pin */ ReservePrivateRefCountEntry(); ResourceOwnerEnlarge(CurrentResourceOwner); + /* XXX: Should also check if we are allowed to pin one more buffer */ + /* * Check whether buffer needs writing. * @@ -3933,7 +4183,7 @@ SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context) * don't worry because our checkpoint.redo points before log record for * upcoming changes and so we are not required to write such dirty buffer. */ - buf_state = LockBufHdr(bufHdr); + buf_state = LockBufHdr(cur_buf_hdr); if (BUF_STATE_GET_REFCOUNT(buf_state) == 0 && BUF_STATE_GET_USAGECOUNT(buf_state) == 0) @@ -3942,40 +4192,300 @@ SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context) } else if (skip_recently_used) { +#if 0 + elog(LOG, "at block %d: skip recent with nbuffers %d", + cur_buf_hdr->tag.blockNum, to_write->nbuffers); +#endif /* Caller told us not to write recently-used buffers */ - UnlockBufHdr(bufHdr, buf_state); + UnlockBufHdr(cur_buf_hdr, buf_state); return result; } if (!(buf_state & BM_VALID) || !(buf_state & BM_DIRTY)) { /* It's clean, so nothing to do */ - UnlockBufHdr(bufHdr, buf_state); + UnlockBufHdr(cur_buf_hdr, buf_state); return result; } + /* pin the buffer, from now on its identity can't change anymore */ + PinBuffer_Locked(cur_buf_hdr); + /* - * Pin it, share-lock it, write it. (FlushBuffer will do nothing if the - * buffer is clean by the time we've locked it.) + * Acquire IO, if needed, now that it's likely that we'll need to write. */ - PinBuffer_Locked(bufHdr); - LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED); + if (to_write->ioh == NULL) + { + /* otherwise we should already have acquired a handle */ + Assert(to_write->nbuffers == 0); - FlushBuffer(bufHdr, NULL, IOOBJECT_RELATION, IOCONTEXT_NORMAL); + to_write->ioh = io_queue_acquire_io(ioq); + pgaio_io_get_wref(to_write->ioh, &to_write->iow); + } - LWLockRelease(BufferDescriptorGetContentLock(bufHdr)); + /* + * If we are merging, check if the buffer's identity possibly changed + * while we hadn't yet pinned it. + * + * XXX: It might be worth checking if we still want to write the buffer + * out, e.g. it could have been replaced with a buffer that doesn't have + * BM_CHECKPOINT_NEEDED set. + */ + if (to_write->nbuffers != 0) + { + if (!CanMergeWrite(to_write, cur_buf_hdr)) + { + elog(LOG, "changed identity"); + UnpinBuffer(cur_buf_hdr); - tag = bufHdr->tag; + result |= BUF_CANT_MERGE; + + return result; + } + } - UnpinBuffer(bufHdr); + may_block = to_write->nbuffers == 0 + && !pgaio_have_staged() + && io_queue_is_empty(ioq) + ; + content_lock = BufferDescriptorGetContentLock(cur_buf_hdr); + + if (!may_block) + { + if (LWLockConditionalAcquire(content_lock, LW_SHARED)) + { + /* done */ + } + else if (to_write->nbuffers == 0) + { + /* + * Need to wait for all prior IO to finish before blocking for + * lock acquisition, to avoid the risk a deadlock due to us + * waiting for another backend that is waiting for our unsubmitted + * IO to complete. + */ + pgaio_submit_staged(); + io_queue_wait_all(ioq); + + elog(DEBUG2, "at block %u: can't block, nbuffers = 0", + cur_buf_hdr->tag.blockNum + ); + + may_block = to_write->nbuffers == 0 + && !pgaio_have_staged() + && io_queue_is_empty(ioq) + ; + Assert(may_block); + + LWLockAcquire(content_lock, LW_SHARED); + } + else + { + elog(DEBUG2, "at block %d: can't block nbuffers = %d", + cur_buf_hdr->tag.blockNum, + to_write->nbuffers); + + UnpinBuffer(cur_buf_hdr); + result |= BUF_CANT_MERGE; + Assert(to_write->nbuffers > 0); + + return result; + } + } + else + { + LWLockAcquire(content_lock, LW_SHARED); + } + + if (!may_block) + { + if (!StartBufferIO(cur_buf_hdr, false, !may_block)) + { + pgaio_submit_staged(); + io_queue_wait_all(ioq); + + may_block = io_queue_is_empty(ioq) && to_write->nbuffers == 0 && !pgaio_have_staged(); + + if (!StartBufferIO(cur_buf_hdr, false, !may_block)) + { + elog(DEBUG2, "at block %d: non-waitable StartBufferIO returns false, %d", + cur_buf_hdr->tag.blockNum, + may_block); + + /* + * FIXME: can't tell whether this is because the buffer has + * been cleaned + */ + if (!may_block) + { + result |= BUF_CANT_MERGE; + Assert(to_write->nbuffers > 0); + } + LWLockRelease(content_lock); + UnpinBuffer(cur_buf_hdr); + + return result; + } + } + } + else + { + if (!StartBufferIO(cur_buf_hdr, false, false)) + { + elog(DEBUG2, "waitable StartBufferIO returns false"); + LWLockRelease(content_lock); + UnpinBuffer(cur_buf_hdr); + + /* + * FIXME: Historically we returned BUF_WRITTEN in this case, which + * seems wrong + */ + return result; + } + } /* - * SyncOneBuffer() is only called by checkpointer and bgwriter, so - * IOContext will always be IOCONTEXT_NORMAL. + * Run PageGetLSN while holding header lock, since we don't have the + * buffer locked exclusively in all cases. */ - ScheduleBufferTagForWriteback(wb_context, IOCONTEXT_NORMAL, &tag); + buf_state = LockBufHdr(cur_buf_hdr); + + cur_buf_lsn = BufferGetLSN(cur_buf_hdr); - return result | BUF_WRITTEN; + /* To check if block content changes while flushing. - vadim 01/17/97 */ + buf_state &= ~BM_JUST_DIRTIED; + + UnlockBufHdr(cur_buf_hdr, buf_state); + + to_write->buffers[to_write->nbuffers] = buf; + to_write->nbuffers++; + + if (buf_state & BM_PERMANENT && + (to_write->max_lsn == InvalidXLogRecPtr || to_write->max_lsn < cur_buf_lsn)) + { + to_write->max_lsn = cur_buf_lsn; + } + + result |= BUF_WRITTEN; + + return result; +} + +static void +WriteBuffers(BuffersToWrite *to_write, + IOQueue *ioq, WritebackContext *wb_context) +{ + SMgrRelation smgr; + Buffer first_buf; + BufferDesc *first_buf_hdr; + bool needs_checksum; + + Assert(to_write->nbuffers > 0 && to_write->nbuffers <= io_combine_limit); + + first_buf = to_write->buffers[0]; + first_buf_hdr = GetBufferDescriptor(first_buf - 1); + + smgr = smgropen(BufTagGetRelFileLocator(&first_buf_hdr->tag), INVALID_PROC_NUMBER); + + /* + * Force XLOG flush up to buffer's LSN. This implements the basic WAL + * rule that log updates must hit disk before any of the data-file changes + * they describe do. + * + * However, this rule does not apply to unlogged relations, which will be + * lost after a crash anyway. Most unlogged relation pages do not bear + * LSNs since we never emit WAL records for them, and therefore flushing + * up through the buffer LSN would be useless, but harmless. However, + * GiST indexes use LSNs internally to track page-splits, and therefore + * unlogged GiST pages bear "fake" LSNs generated by + * GetFakeLSNForUnloggedRel. It is unlikely but possible that the fake + * LSN counter could advance past the WAL insertion point; and if it did + * happen, attempting to flush WAL through that location would fail, with + * disastrous system-wide consequences. To make sure that can't happen, + * skip the flush if the buffer isn't permanent. + */ + if (to_write->max_lsn != InvalidXLogRecPtr) + XLogFlush(to_write->max_lsn); + + /* + * Now it's safe to write the buffer to disk. Note that no one else should + * have been able to write it, while we were busy with log flushing, + * because we got the exclusive right to perform I/O by setting the + * BM_IO_IN_PROGRESS bit. + */ + + for (int nbuf = 0; nbuf < to_write->nbuffers; nbuf++) + { + Buffer cur_buf = to_write->buffers[nbuf]; + BufferDesc *cur_buf_hdr = GetBufferDescriptor(cur_buf - 1); + Block bufBlock; + char *bufToWrite; + + bufBlock = BufHdrGetBlock(cur_buf_hdr); + needs_checksum = PageNeedsChecksumCopy((Page) bufBlock); + + /* + * Update page checksum if desired. Since we have only shared lock on + * the buffer, other processes might be updating hint bits in it, so + * we must copy the page to a bounce buffer if we do checksumming. + */ + if (needs_checksum) + { + PgAioBounceBuffer *bb = pgaio_bounce_buffer_get(); + + pgaio_io_assoc_bounce_buffer(to_write->ioh, bb); + + bufToWrite = pgaio_bounce_buffer_buffer(bb); + memcpy(bufToWrite, bufBlock, BLCKSZ); + PageSetChecksumInplace((Page) bufToWrite, cur_buf_hdr->tag.blockNum); + } + else + { + bufToWrite = bufBlock; + } + + to_write->data_ptrs[nbuf] = bufToWrite; + } + + pgaio_io_set_handle_data_32(to_write->ioh, + (uint32 *) to_write->buffers, + to_write->nbuffers); + pgaio_io_register_callbacks(to_write->ioh, PGAIO_HCB_SHARED_BUFFER_WRITEV, 0); + + smgrstartwritev(to_write->ioh, smgr, + BufTagGetForkNum(&first_buf_hdr->tag), + first_buf_hdr->tag.blockNum, + to_write->data_ptrs, + to_write->nbuffers, + false); + pgstat_count_io_op(IOOBJECT_RELATION, IOCONTEXT_NORMAL, + IOOP_WRITE, 1, BLCKSZ * to_write->nbuffers); + + + for (int nbuf = 0; nbuf < to_write->nbuffers; nbuf++) + { + Buffer cur_buf = to_write->buffers[nbuf]; + BufferDesc *cur_buf_hdr = GetBufferDescriptor(cur_buf - 1); + + UnpinBuffer(cur_buf_hdr); + } + + io_queue_track(ioq, &to_write->iow); + to_write->total_writes++; + + /* clear state for next write */ + to_write->nbuffers = 0; + to_write->start_at_tag.relNumber = InvalidOid; + to_write->start_at_tag.blockNum = InvalidBlockNumber; + to_write->max_combine = 0; + to_write->max_lsn = InvalidXLogRecPtr; + to_write->ioh = NULL; + pgaio_wref_clear(&to_write->iow); + + /* + * FIXME: Implement issuing writebacks (note wb_context isn't used here). + * Possibly needs to be integrated with io_queue.c. + */ } /* @@ -4349,6 +4859,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, error_context_stack = errcallback.previous; } + /* * RelationGetNumberOfBlocksInFork * Determines the current number of pages in the specified relation fork. @@ -5536,7 +6047,15 @@ LockBuffer(Buffer buffer, int mode) else if (mode == BUFFER_LOCK_SHARE) LWLockAcquire(BufferDescriptorGetContentLock(buf), LW_SHARED); else if (mode == BUFFER_LOCK_EXCLUSIVE) + { + /* + * FIXME: Wait for AIO writes, otherwise there would be a risk of + * deadlock. This isn't entirely trivial to do in a race-free way, IO + * could be started between us checking whether there is IO and + * enqueueing ourselves for the lock. + */ LWLockAcquire(BufferDescriptorGetContentLock(buf), LW_EXCLUSIVE); + } else elog(ERROR, "unrecognized buffer lock mode: %d", mode); } @@ -5551,6 +6070,19 @@ ConditionalLockBuffer(Buffer buffer) { BufferDesc *buf; + /* + * FIXME: Wait for AIO writes. Some code does not deal well + * ConditionalLockBuffer() continuously failing, e.g. + * spginsert()->spgdoinsert() ends up busy-looping (easily reproducible by + * just making this function always fail and running the regression + * tests). While that code could be fixed, it'd be hard to find all + * problematic places. + * + * It would be OK to wait for the IO as waiting for IO completion does not + * need to wait for any locks that could lead to an undetected deadlock or + * such. + */ + Assert(BufferIsPinned(buffer)); if (BufferIsLocal(buffer)) return true; /* act as though we got it */ @@ -5614,10 +6146,8 @@ LockBufferForCleanup(Buffer buffer) CheckBufferIsPinnedOnce(buffer); /* - * We do not yet need to be worried about in-progress AIOs holding a pin, - * as we, so far, only support doing reads via AIO and this function can - * only be called once the buffer is valid (i.e. no read can be in - * flight). + * FIXME: See AIO related comments in LockBuffer() and + * ConditionalLockBuffer() */ /* Nobody else to wait for */ @@ -5630,6 +6160,11 @@ LockBufferForCleanup(Buffer buffer) { uint32 buf_state; + /* + * FIXME: LockBuffer()'s handling of in-progress writes (once + * implemented) should suffice to deal with deadlock risk. + */ + /* Try to acquire lock */ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); buf_state = LockBufHdr(bufHdr); @@ -5777,7 +6312,13 @@ ConditionalLockBufferForCleanup(Buffer buffer) Assert(BufferIsValid(buffer)); - /* see AIO related comment in LockBufferForCleanup() */ + /* + * FIXME: Should wait for IO for the same reason as in + * ConditionalLockBuffer(). Needs to happen before the + * ConditionalLockBuffer() call below, as we'd never reach the + * ConditionalLockBuffer() call due the buffer pin held for the duration + * of the IO. + */ if (BufferIsLocal(buffer)) { @@ -5834,7 +6375,10 @@ IsBufferCleanupOK(Buffer buffer) Assert(BufferIsValid(buffer)); - /* see AIO related comment in LockBufferForCleanup() */ + /* + * FIXME: See AIO related comments in LockBuffer() and + * ConditionalLockBuffer() + */ if (BufferIsLocal(buffer)) { @@ -7140,12 +7684,129 @@ buffer_readv_report(PgAioResult result, const PgAioTargetData *td, affected_count > 1 ? errhint_internal(hint_mult, affected_count - 1) : 0); } +/* + * Helper for AIO writev completion callbacks, supporting both shared and temp + * buffers. Gets called once for each buffer in a multi-page write. + */ +static pg_attribute_always_inline PgAioResult +buffer_writev_complete_one(uint8 buf_off, Buffer buffer, uint8 flags, + bool failed, bool is_temp) +{ + BufferDesc *buf_hdr = is_temp ? + GetLocalBufferDescriptor(-buffer - 1) + : GetBufferDescriptor(buffer - 1); + PgAioResult result = {.status = PGAIO_RS_OK}; + bool clear_dirty; + uint32 set_flag_bits; + +#ifdef USE_ASSERT_CHECKING + { + uint32 buf_state = pg_atomic_read_u32(&buf_hdr->state); + + Assert(buf_state & BM_VALID); + Assert(buf_state & BM_TAG_VALID); + /* temp buffers don't use BM_IO_IN_PROGRESS */ + if (!is_temp) + Assert(buf_state & BM_IO_IN_PROGRESS); + Assert(buf_state & BM_DIRTY); + } +#endif + + clear_dirty = failed ? false : true; + set_flag_bits = failed ? BM_IO_ERROR : 0; + + if (is_temp) + TerminateLocalBufferIO(buf_hdr, clear_dirty, set_flag_bits, true); + else + TerminateBufferIO(buf_hdr, clear_dirty, set_flag_bits, false, true); + + /* + * The initiator of IO is not managing the lock (i.e. we called + * LWLockDisown()), we are. + */ + if (!is_temp) + LWLockReleaseDisowned(BufferDescriptorGetContentLock(buf_hdr), + LW_SHARED); + + /* FIXME: tracepoint */ + + return result; +} + +/* + * Perform completion handling of a single AIO write. This write may cover + * multiple blocks / buffers. + * + * Shared between shared and local buffers, to reduce code duplication. + */ +static pg_attribute_always_inline PgAioResult +buffer_writev_complete(PgAioHandle *ioh, PgAioResult prior_result, + uint8 cb_data, bool is_temp) +{ + PgAioResult result = prior_result; + PgAioTargetData *td = pgaio_io_get_target_data(ioh); + uint64 *io_data; + uint8 handle_data_len; + + if (is_temp) + { + Assert(td->smgr.is_temp); + Assert(pgaio_io_get_owner(ioh) == MyProcNumber); + } + else + Assert(!td->smgr.is_temp); + + /* + * Iterate over all the buffers affected by this IO and call appropriate + * per-buffer completion function for each buffer. + */ + io_data = pgaio_io_get_handle_data(ioh, &handle_data_len); + for (uint8 buf_off = 0; buf_off < handle_data_len; buf_off++) + { + Buffer buf = io_data[buf_off]; + PgAioResult buf_result; + bool failed; + + Assert(BufferIsValid(buf)); + + /* + * If the entire failed on a lower-level, each buffer needs to be + * marked as failed. In case of a partial read, some buffers may be + * ok. + */ + failed = + prior_result.status == PGAIO_RS_ERROR + || prior_result.result <= buf_off; + + buf_result = buffer_writev_complete_one(buf_off, buf, cb_data, failed, + is_temp); + + /* + * If there wasn't any prior error and the IO for this page failed in + * some form, set the whole IO's to the page's result. + */ + if (result.status != PGAIO_RS_ERROR && buf_result.status != PGAIO_RS_OK) + { + result = buf_result; + pgaio_result_report(result, td, LOG); + } + } + + return result; +} + static void shared_buffer_readv_stage(PgAioHandle *ioh, uint8 cb_data) { buffer_stage_common(ioh, false, false); } +static void +shared_buffer_writev_stage(PgAioHandle *ioh, uint8 cb_data) +{ + buffer_stage_common(ioh, true, false); +} + static PgAioResult shared_buffer_readv_complete(PgAioHandle *ioh, PgAioResult prior_result, uint8 cb_data) @@ -7191,6 +7852,13 @@ shared_buffer_readv_complete_local(PgAioHandle *ioh, PgAioResult prior_result, return prior_result; } +static PgAioResult +shared_buffer_writev_complete(PgAioHandle *ioh, PgAioResult prior_result, + uint8 cb_data) +{ + return buffer_writev_complete(ioh, prior_result, cb_data, false); +} + static void local_buffer_readv_stage(PgAioHandle *ioh, uint8 cb_data) { @@ -7204,6 +7872,17 @@ local_buffer_readv_complete(PgAioHandle *ioh, PgAioResult prior_result, return buffer_readv_complete(ioh, prior_result, cb_data, true); } +static void +local_buffer_writev_stage(PgAioHandle *ioh, uint8 cb_data) +{ + /* + * Currently this is unreachable as the only write support is for + * checkpointer / bgwriter, which don't deal with local buffers. + */ + elog(ERROR, "should be unreachable"); +} + + /* readv callback is passed READ_BUFFERS_* flags as callback data */ const PgAioHandleCallbacks aio_shared_buffer_readv_cb = { .stage = shared_buffer_readv_stage, @@ -7213,6 +7892,11 @@ const PgAioHandleCallbacks aio_shared_buffer_readv_cb = { .report = buffer_readv_report, }; +const PgAioHandleCallbacks aio_shared_buffer_writev_cb = { + .stage = shared_buffer_writev_stage, + .complete_shared = shared_buffer_writev_complete, +}; + /* readv callback is passed READ_BUFFERS_* flags as callback data */ const PgAioHandleCallbacks aio_local_buffer_readv_cb = { .stage = local_buffer_readv_stage, @@ -7226,3 +7910,7 @@ const PgAioHandleCallbacks aio_local_buffer_readv_cb = { .complete_local = local_buffer_readv_complete, .report = buffer_readv_report, }; + +const PgAioHandleCallbacks aio_local_buffer_writev_cb = { + .stage = local_buffer_writev_stage, +}; diff --git a/src/backend/storage/buffer/freelist.c b/src/backend/storage/buffer/freelist.c index 336715b6c6342..b72a5957a206a 100644 --- a/src/backend/storage/buffer/freelist.c +++ b/src/backend/storage/buffer/freelist.c @@ -555,7 +555,12 @@ GetAccessStrategy(BufferAccessStrategyType btype) return NULL; case BAS_BULKREAD: - ring_size_kb = 256; + + /* + * FIXME: Temporary increase to allow large enough streaming reads + * to actually benefit from AIO. This needs a better solution. + */ + ring_size_kb = 2 * 1024; break; case BAS_BULKWRITE: ring_size_kb = 16 * 1024; diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c index 0e8299dd55646..2138d47dab930 100644 --- a/src/backend/storage/file/fd.c +++ b/src/backend/storage/file/fd.c @@ -2348,6 +2348,34 @@ FileWriteV(File file, const struct iovec *iov, int iovcnt, off_t offset, return returnCode; } +int +FileStartWriteV(PgAioHandle *ioh, File file, + int iovcnt, off_t offset, + uint32 wait_event_info) +{ + int returnCode; + Vfd *vfdP; + + Assert(FileIsValid(file)); + + DO_DB(elog(LOG, "FileStartWriteV: %d (%s) " INT64_FORMAT " %d", + file, VfdCache[file].fileName, + (int64) offset, + iovcnt)); + + returnCode = FileAccess(file); + if (returnCode < 0) + return returnCode; + + vfdP = &VfdCache[file]; + + /* FIXME: think about / reimplement temp_file_limit */ + + pgaio_io_start_writev(ioh, vfdP->fd, iovcnt, offset); + + return 0; +} + int FileSync(File file, uint32 wait_event_info) { diff --git a/src/backend/storage/page/bufpage.c b/src/backend/storage/page/bufpage.c index 82457bacc62fa..d4bbd2cdf00e9 100644 --- a/src/backend/storage/page/bufpage.c +++ b/src/backend/storage/page/bufpage.c @@ -1490,6 +1490,16 @@ PageIndexTupleOverwrite(Page page, OffsetNumber offnum, return true; } +bool +PageNeedsChecksumCopy(Page page) +{ + if (PageIsNew(page)) + return false; + + /* If we don't need a checksum, just return the passed-in data */ + return DataChecksumsEnabled(); +} + /* * Set checksum for a page in shared buffers. diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c index f99c9d9001342..f1b09ec3e984f 100644 --- a/src/backend/storage/smgr/md.c +++ b/src/backend/storage/smgr/md.c @@ -155,12 +155,19 @@ static BlockNumber _mdnblocks(SMgrRelation reln, ForkNumber forknum, static PgAioResult md_readv_complete(PgAioHandle *ioh, PgAioResult prior_result, uint8 cb_data); static void md_readv_report(PgAioResult result, const PgAioTargetData *target_data, int elevel); +static PgAioResult md_writev_complete(PgAioHandle *ioh, PgAioResult prior_result, uint8 cb_data); +static void md_writev_report(PgAioResult result, const PgAioTargetData *target_data, int elevel); const PgAioHandleCallbacks aio_md_readv_cb = { .complete_shared = md_readv_complete, .report = md_readv_report, }; +const PgAioHandleCallbacks aio_md_writev_cb = { + .complete_shared = md_writev_complete, + .report = md_writev_report, +}; + static inline int _mdfd_open_flags(void) @@ -1143,6 +1150,64 @@ mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, } } +/* + * mdstartwritev() -- Asynchronous version of mdrwritev(). + */ +void +mdstartwritev(PgAioHandle *ioh, + SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, + const void **buffers, BlockNumber nblocks, bool skipFsync) +{ + off_t seekpos; + MdfdVec *v; + BlockNumber nblocks_this_segment; + struct iovec *iov; + int iovcnt; + int ret; + + v = _mdfd_getseg(reln, forknum, blocknum, false, + EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY); + + seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE)); + + Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE); + + nblocks_this_segment = + Min(nblocks, + RELSEG_SIZE - (blocknum % ((BlockNumber) RELSEG_SIZE))); + + if (nblocks_this_segment != nblocks) + elog(ERROR, "write crossing segment boundary"); + + iovcnt = pgaio_io_get_iovec(ioh, &iov); + + Assert(nblocks <= iovcnt); + + iovcnt = buffers_to_iovec(iov, unconstify(void **, buffers), nblocks_this_segment); + + Assert(iovcnt <= nblocks_this_segment); + + if (!(io_direct_flags & IO_DIRECT_DATA)) + pgaio_io_set_flag(ioh, PGAIO_HF_BUFFERED); + + pgaio_io_set_target_smgr(ioh, + reln, + forknum, + blocknum, + nblocks, + skipFsync); + pgaio_io_register_callbacks(ioh, PGAIO_HCB_MD_WRITEV, 0); + + ret = FileStartWriteV(ioh, v->mdfd_vfd, iovcnt, seekpos, WAIT_EVENT_DATA_FILE_WRITE); + if (ret != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not start writing blocks %u..%u in file \"%s\": %m", + blocknum, + blocknum + nblocks_this_segment - 1, + FilePathName(v->mdfd_vfd)))); +} + /* * mdwriteback() -- Tell the kernel to write pages back to storage. @@ -1531,6 +1596,40 @@ register_dirty_segment(SMgrRelation reln, ForkNumber forknum, MdfdVec *seg) } } +/* + * Like register_dirty_segment(), except for use by AIO. In the completion + * callback we don't have access to the MdfdVec (the completion callback might + * be executed in a different backend than the issuing backend), therefore we + * have to implement this slightly differently. + */ +static void +register_dirty_segment_aio(RelFileLocator locator, ForkNumber forknum, uint64 segno) +{ + FileTag tag; + + INIT_MD_FILETAG(tag, locator, forknum, segno); + + /* + * Can't block here waiting for checkpointer to accept our sync request, + * as checkpointer might be waiting for this AIO to finish if offloaded to + * a worker. + */ + if (!RegisterSyncRequest(&tag, SYNC_REQUEST, false /* retryOnError */ )) + { + char path[MAXPGPATH]; + + ereport(DEBUG1, + (errmsg_internal("could not forward fsync request because request queue is full"))); + + /* reuse mdsyncfiletag() to avoid duplicating code */ + if (mdsyncfiletag(&tag, path)) + ereport(data_sync_elevel(ERROR), + (errcode_for_file_access(), + errmsg("could not fsync file \"%s\": %m", + path))); + } +} + /* * register_unlink_segment() -- Schedule a file to be deleted after next checkpoint */ @@ -2065,3 +2164,103 @@ md_readv_report(PgAioResult result, const PgAioTargetData *td, int elevel) td->smgr.nblocks * (size_t) BLCKSZ)); } } + +/* + * AIO completion callback for mdstartwritev(). + */ +static PgAioResult +md_writev_complete(PgAioHandle *ioh, PgAioResult prior_result, uint8 cb_data) +{ + PgAioTargetData *td = pgaio_io_get_target_data(ioh); + PgAioResult result = prior_result; + + if (prior_result.result < 0) + { + result.status = PGAIO_RS_ERROR; + result.id = PGAIO_HCB_MD_WRITEV; + /* For "hard" errors, track the error number in error_data */ + result.error_data = -prior_result.result; + result.result = 0; + + pgaio_result_report(result, td, LOG); + + return result; + } + + /* + * As explained above smgrstartwritev(), the smgr API operates on the + * level of blocks, rather than bytes. Convert. + */ + result.result /= BLCKSZ; + + Assert(result.result <= td->smgr.nblocks); + + if (result.result == 0) + { + /* consider 0 blocks written a failure */ + result.status = PGAIO_RS_ERROR; + result.id = PGAIO_HCB_MD_WRITEV; + result.error_data = 0; + + pgaio_result_report(result, td, LOG); + + return result; + } + + if (result.status != PGAIO_RS_ERROR && + result.result < td->smgr.nblocks) + { + /* partial writes should be retried at upper level */ + result.status = PGAIO_RS_PARTIAL; + result.id = PGAIO_HCB_MD_WRITEV; + } + + if (!td->smgr.skip_fsync) + register_dirty_segment_aio(td->smgr.rlocator, td->smgr.forkNum, + td->smgr.blockNum / ((BlockNumber) RELSEG_SIZE)); + + return result; +} + +/* + * AIO error reporting callback for mdstartwritev(). + */ +static void +md_writev_report(PgAioResult result, const PgAioTargetData *td, int elevel) +{ + RelPathStr path; + + path = relpathbackend(td->smgr.rlocator, + td->smgr.is_temp ? MyProcNumber : INVALID_PROC_NUMBER, + td->smgr.forkNum); + + if (result.error_data != 0) + { + errno = result.error_data; /* for errcode_for_file_access() */ + + ereport(elevel, + errcode_for_file_access(), + errmsg("could not write blocks %u..%u in file \"%s\": %m", + td->smgr.blockNum, + td->smgr.blockNum + td->smgr.nblocks, + path.str) + ); + } + else + { + /* + * NB: This will typically only be output in debug messages, while + * retrying a partial IO. + */ + ereport(elevel, + errcode(ERRCODE_DATA_CORRUPTED), + errmsg("could not write blocks %u..%u in file \"%s\": wrote only %zu of %zu bytes", + td->smgr.blockNum, + td->smgr.blockNum + td->smgr.nblocks - 1, + path.str, + result.result * (size_t) BLCKSZ, + td->smgr.nblocks * (size_t) BLCKSZ + ) + ); + } +} diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c index 4540284f5817c..abc9fdc88dcba 100644 --- a/src/backend/storage/smgr/smgr.c +++ b/src/backend/storage/smgr/smgr.c @@ -115,6 +115,11 @@ typedef struct f_smgr BlockNumber blocknum, const void **buffers, BlockNumber nblocks, bool skipFsync); + void (*smgr_startwritev) (PgAioHandle *ioh, + SMgrRelation reln, ForkNumber forknum, + BlockNumber blocknum, + const void **buffers, BlockNumber nblocks, + bool skipFsync); void (*smgr_writeback) (SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, BlockNumber nblocks); BlockNumber (*smgr_nblocks) (SMgrRelation reln, ForkNumber forknum); @@ -142,6 +147,7 @@ static const f_smgr smgrsw[] = { .smgr_readv = mdreadv, .smgr_startreadv = mdstartreadv, .smgr_writev = mdwritev, + .smgr_startwritev = mdstartwritev, .smgr_writeback = mdwriteback, .smgr_nblocks = mdnblocks, .smgr_truncate = mdtruncate, @@ -795,6 +801,29 @@ smgrwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, RESUME_INTERRUPTS(); } +/* + * smgrstartwritev() -- asynchronous version of smgrwritev() + * + * This starts an asynchronous writev IO using the IO handle `ioh`. Other than + * `ioh` all parameters are the same as smgrwritev(). + * + * Completion callbacks above smgr will be passed the result as the number of + * successfully written blocks if the write [partially] succeeds. This + * maintains the abstraction that smgr operates on the level of blocks, rather + * than bytes. + */ +void +smgrstartwritev(PgAioHandle *ioh, + SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, + const void **buffers, BlockNumber nblocks, bool skipFsync) +{ + HOLD_INTERRUPTS(); + smgrsw[reln->smgr_which].smgr_startwritev(ioh, + reln, forknum, blocknum, buffers, + nblocks, skipFsync); + RESUME_INTERRUPTS(); +} + /* * smgrwriteback() -- Trigger kernel writeback for the supplied range of * blocks. diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c index 7958ea11b7354..222e24bcb08bf 100644 --- a/src/backend/utils/init/postinit.c +++ b/src/backend/utils/init/postinit.c @@ -785,8 +785,12 @@ InitPostgres(const char *in_dbname, Oid dboid, * We don't yet have an aux-process resource owner, but StartupXLOG * and ShutdownXLOG will need one. Hence, create said resource owner * (and register a callback to clean it up after ShutdownXLOG runs). + * + * In bootstrap mode CreateAuxProcessResourceOwner() was already + * called in BootstrapModeMain(). */ - CreateAuxProcessResourceOwner(); + if (!bootstrap) + CreateAuxProcessResourceOwner(); StartupXLOG(); /* Release (and warn about) any buffer pins leaked in StartupXLOG */ diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 4eaeca89f2c7a..bd49b302293c4 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -3293,6 +3293,19 @@ struct config_int ConfigureNamesInt[] = check_io_max_concurrency, NULL, NULL }, + { + {"io_bounce_buffers", + PGC_POSTMASTER, + RESOURCES_IO, + gettext_noop("Number of IO Bounce Buffers reserved for each backend."), + NULL, + GUC_UNIT_BLOCKS + }, + &io_bounce_buffers, + -1, -1, 4096, + NULL, NULL, NULL + }, + { {"io_workers", PGC_SIGHUP, diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index ff56a1f0732c5..2c6456e907f54 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -211,6 +211,8 @@ # -1 sets based on shared_buffers # (change requires restart) #io_workers = 3 # 1-32; +#io_bounce_buffers = -1 # -1 sets based on shared_buffers + # (change requires restart) # - Worker Processes - diff --git a/src/backend/utils/resowner/resowner.c b/src/backend/utils/resowner/resowner.c index d39f3e1b655cd..81e7e27965a6f 100644 --- a/src/backend/utils/resowner/resowner.c +++ b/src/backend/utils/resowner/resowner.c @@ -159,10 +159,11 @@ struct ResourceOwnerData LOCALLOCK *locks[MAX_RESOWNER_LOCKS]; /* list of owned locks */ /* - * AIO handles need be registered in critical sections and therefore - * cannot use the normal ResourceElem mechanism. + * AIO handles & bounce buffers need be registered in critical sections + * and therefore cannot use the normal ResourceElem mechanism. */ dlist_head aio_handles; + dlist_head aio_bounce_buffers; }; @@ -434,6 +435,7 @@ ResourceOwnerCreate(ResourceOwner parent, const char *name) } dlist_init(&owner->aio_handles); + dlist_init(&owner->aio_bounce_buffers); return owner; } @@ -742,6 +744,13 @@ ResourceOwnerReleaseInternal(ResourceOwner owner, pgaio_io_release_resowner(node, !isCommit); } + + while (!dlist_is_empty(&owner->aio_bounce_buffers)) + { + dlist_node *node = dlist_head_node(&owner->aio_bounce_buffers); + + pgaio_bounce_buffer_release_resowner(node, !isCommit); + } } else if (phase == RESOURCE_RELEASE_LOCKS) { @@ -1111,3 +1120,15 @@ ResourceOwnerForgetAioHandle(ResourceOwner owner, struct dlist_node *ioh_node) { dlist_delete_from(&owner->aio_handles, ioh_node); } + +void +ResourceOwnerRememberAioBounceBuffer(ResourceOwner owner, struct dlist_node *ioh_node) +{ + dlist_push_tail(&owner->aio_bounce_buffers, ioh_node); +} + +void +ResourceOwnerForgetAioBounceBuffer(ResourceOwner owner, struct dlist_node *ioh_node) +{ + dlist_delete_from(&owner->aio_bounce_buffers, ioh_node); +} diff --git a/src/include/libpq/pqsignal.h b/src/include/libpq/pqsignal.h index 5be7870156ee0..89276242fb4f5 100644 --- a/src/include/libpq/pqsignal.h +++ b/src/include/libpq/pqsignal.h @@ -13,7 +13,7 @@ #ifndef PQSIGNAL_H #define PQSIGNAL_H -#include +#include /* IWYU pragma: export */ #ifdef WIN32 /* Emulate POSIX sigset_t APIs on Windows */ diff --git a/src/include/postmaster/bgwriter.h b/src/include/postmaster/bgwriter.h index 800ecbfd13b31..a8081d411b68d 100644 --- a/src/include/postmaster/bgwriter.h +++ b/src/include/postmaster/bgwriter.h @@ -31,7 +31,8 @@ pg_noreturn extern void BackgroundWriterMain(const void *startup_data, size_t st pg_noreturn extern void CheckpointerMain(const void *startup_data, size_t startup_data_len); extern void RequestCheckpoint(int flags); -extern void CheckpointWriteDelay(int flags, double progress); +struct IOQueue; +extern void CheckpointWriteDelay(struct IOQueue *ioq, int flags, double progress); extern bool ForwardSyncRequest(const FileTag *ftag, SyncRequestType type); diff --git a/src/include/storage/aio.h b/src/include/storage/aio.h index 9fe9d9ad9fa49..aa242676ccfdd 100644 --- a/src/include/storage/aio.h +++ b/src/include/storage/aio.h @@ -18,7 +18,7 @@ #ifndef AIO_H #define AIO_H -#include "storage/aio_types.h" +#include "storage/aio_types.h" /* IWYU pragma: export */ #include "storage/procnumber.h" @@ -194,13 +194,16 @@ typedef enum PgAioHandleCallbackID PGAIO_HCB_INVALID = 0, PGAIO_HCB_MD_READV, + PGAIO_HCB_MD_WRITEV, PGAIO_HCB_SHARED_BUFFER_READV, + PGAIO_HCB_SHARED_BUFFER_WRITEV, PGAIO_HCB_LOCAL_BUFFER_READV, + PGAIO_HCB_LOCAL_BUFFER_WRITEV, } PgAioHandleCallbackID; -#define PGAIO_HCB_MAX PGAIO_HCB_LOCAL_BUFFER_READV +#define PGAIO_HCB_MAX PGAIO_HCB_LOCAL_BUFFER_WRITEV StaticAssertDecl(PGAIO_HCB_MAX <= (1 << PGAIO_RESULT_ID_BITS), "PGAIO_HCB_MAX is too big for PGAIO_RESULT_ID_BITS"); @@ -290,7 +293,7 @@ extern ProcNumber pgaio_io_get_owner(PgAioHandle *ioh); extern void pgaio_io_get_wref(PgAioHandle *ioh, PgAioWaitRef *iow); /* functions in aio_io.c */ -struct iovec; +struct iovec; /* IWYU pragma: export */ extern int pgaio_io_get_iovec(PgAioHandle *ioh, struct iovec **iov); extern PgAioOp pgaio_io_get_op(PgAioHandle *ioh); @@ -352,6 +355,20 @@ extern bool pgaio_have_staged(void); +/* -------------------------------------------------------------------------------- + * Bounce Buffers + * -------------------------------------------------------------------------------- + */ + +extern PgAioBounceBuffer *pgaio_bounce_buffer_get(void); +extern void pgaio_io_assoc_bounce_buffer(PgAioHandle *ioh, PgAioBounceBuffer *bb); +extern uint32 pgaio_bounce_buffer_id(PgAioBounceBuffer *bb); +extern void pgaio_bounce_buffer_release(PgAioBounceBuffer *bb); +extern char *pgaio_bounce_buffer_buffer(PgAioBounceBuffer *bb); +extern void pgaio_bounce_buffer_release_resowner(struct dlist_node *bb_node, bool on_error); + + + /* -------------------------------------------------------------------------------- * Other * -------------------------------------------------------------------------------- @@ -364,6 +381,7 @@ extern void pgaio_closing_fd(int fd); /* GUCs */ extern PGDLLIMPORT int io_method; extern PGDLLIMPORT int io_max_concurrency; +extern PGDLLIMPORT int io_bounce_buffers; #endif /* AIO_H */ diff --git a/src/include/storage/aio_internal.h b/src/include/storage/aio_internal.h index 7f18da2c85651..8432ebbc67942 100644 --- a/src/include/storage/aio_internal.h +++ b/src/include/storage/aio_internal.h @@ -16,10 +16,12 @@ #define AIO_INTERNAL_H +/* IWYU pragma: begin_exports */ #include "lib/ilist.h" #include "port/pg_iovec.h" #include "storage/aio.h" #include "storage/condition_variable.h" +/* IWYU pragma: end_exports */ /* @@ -90,8 +92,11 @@ typedef enum PgAioHandleState } PgAioHandleState; +/* IWYU pragma: begin_exports */ struct ResourceOwnerData; +/* IWYU pragma: end_exports */ + /* typedef is in aio_types.h */ struct PgAioHandle { @@ -127,6 +132,12 @@ struct PgAioHandle /* raw result of the IO operation */ int32 result; + /* + * List of bounce_buffers owned by IO. It would suffice to use an index + * based linked list here. + */ + slist_head bounce_buffers; + /** * In which list the handle is registered, depends on the state: * - IDLE, in per-backend list @@ -182,11 +193,24 @@ struct PgAioHandle }; +/* typedef is in aio_types.h */ +struct PgAioBounceBuffer +{ + slist_node node; + struct ResourceOwnerData *resowner; + dlist_node resowner_node; + char *buffer; +}; + + typedef struct PgAioBackend { /* index into PgAioCtl->io_handles */ uint32 io_handle_off; + /* index into PgAioCtl->bounce_buffers */ + uint32 bounce_buffers_off; + /* IO Handles that currently are not used */ dclist_head idle_ios; @@ -217,6 +241,12 @@ typedef struct PgAioBackend * IOs being appended at the end. */ dclist_head in_flight_ios; + + /* Bounce Buffers that currently are not used */ + slist_head idle_bbs; + + /* see handed_out_io */ + PgAioBounceBuffer *handed_out_bb; } PgAioBackend; @@ -244,6 +274,15 @@ typedef struct PgAioCtl uint32 io_handle_count; PgAioHandle *io_handles; + + /* + * To perform AIO on buffers that are not located in shared memory (either + * because they are not in shared memory or because we need to operate on + * a copy, as e.g. the case for writes when checksums are in use) + */ + uint32 bounce_buffers_count; + PgAioBounceBuffer *bounce_buffers; + char *bounce_buffers_data; } PgAioCtl; diff --git a/src/include/storage/aio_types.h b/src/include/storage/aio_types.h index 181833660778e..3c18dade49cb9 100644 --- a/src/include/storage/aio_types.h +++ b/src/include/storage/aio_types.h @@ -134,4 +134,6 @@ typedef struct PgAioReturn } PgAioReturn; +typedef struct PgAioBounceBuffer PgAioBounceBuffer; + #endif /* AIO_TYPES_H */ diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index 0dec7d93b3b27..45c2b70b73602 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -21,6 +21,8 @@ #include "storage/buf.h" #include "storage/bufmgr.h" #include "storage/condition_variable.h" +#include "storage/io_queue.h" +#include "storage/latch.h" #include "storage/lwlock.h" #include "storage/procnumber.h" #include "storage/shmem.h" diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index f2192ceb2719b..89e0ca11288b4 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -174,7 +174,9 @@ extern PGDLLIMPORT int backend_flush_after; extern PGDLLIMPORT int bgwriter_flush_after; extern const PgAioHandleCallbacks aio_shared_buffer_readv_cb; +extern const PgAioHandleCallbacks aio_shared_buffer_writev_cb; extern const PgAioHandleCallbacks aio_local_buffer_readv_cb; +extern const PgAioHandleCallbacks aio_local_buffer_writev_cb; /* in buf_init.c */ extern PGDLLIMPORT char *BufferBlocks; @@ -295,7 +297,8 @@ extern bool ConditionalLockBufferForCleanup(Buffer buffer); extern bool IsBufferCleanupOK(Buffer buffer); extern bool HoldingBufferPinThatDelaysRecovery(void); -extern bool BgBufferSync(struct WritebackContext *wb_context); +struct IOQueue; +extern bool BgBufferSync(struct IOQueue *ioq, struct WritebackContext *wb_context); extern uint32 GetPinLimit(void); extern uint32 GetLocalPinLimit(void); diff --git a/src/include/storage/bufpage.h b/src/include/storage/bufpage.h index aeb67c498c59f..000a3ab23f905 100644 --- a/src/include/storage/bufpage.h +++ b/src/include/storage/bufpage.h @@ -507,5 +507,6 @@ extern bool PageIndexTupleOverwrite(Page page, OffsetNumber offnum, Item newtup, Size newsize); extern char *PageSetChecksumCopy(Page page, BlockNumber blkno); extern void PageSetChecksumInplace(Page page, BlockNumber blkno); +extern bool PageNeedsChecksumCopy(Page page); #endif /* BUFPAGE_H */ diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h index b77d8e5e30e3a..2cc7c5a476175 100644 --- a/src/include/storage/fd.h +++ b/src/include/storage/fd.h @@ -112,6 +112,7 @@ extern int FilePrefetch(File file, off_t offset, off_t amount, uint32 wait_event extern ssize_t FileReadV(File file, const struct iovec *iov, int iovcnt, off_t offset, uint32 wait_event_info); extern ssize_t FileWriteV(File file, const struct iovec *iov, int iovcnt, off_t offset, uint32 wait_event_info); extern int FileStartReadV(struct PgAioHandle *ioh, File file, int iovcnt, off_t offset, uint32 wait_event_info); +extern int FileStartWriteV(struct PgAioHandle *ioh, File file, int iovcnt, off_t offset, uint32 wait_event_info); extern int FileSync(File file, uint32 wait_event_info); extern int FileZero(File file, off_t offset, off_t amount, uint32 wait_event_info); extern int FileFallocate(File file, off_t offset, off_t amount, uint32 wait_event_info); diff --git a/src/include/storage/io_queue.h b/src/include/storage/io_queue.h new file mode 100644 index 0000000000000..92b1e9afe6f59 --- /dev/null +++ b/src/include/storage/io_queue.h @@ -0,0 +1,33 @@ +/*------------------------------------------------------------------------- + * + * io_queue.h + * Mechanism for tracking many IOs + * + * + * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/storage/io_queue.h + * + *------------------------------------------------------------------------- + */ +#ifndef IO_QUEUE_H +#define IO_QUEUE_H + +#include "storage/aio_types.h" + +struct IOQueue; +typedef struct IOQueue IOQueue; + +struct PgAioWaitRef; + +extern IOQueue *io_queue_create(int depth, int flags); +extern void io_queue_track(IOQueue *ioq, const PgAioWaitRef *iow); +extern void io_queue_wait_one(IOQueue *ioq); +extern void io_queue_wait_all(IOQueue *ioq); +extern bool io_queue_is_empty(IOQueue *ioq); +extern void io_queue_reserve(IOQueue *ioq); +extern PgAioHandle *io_queue_acquire_io(IOQueue *ioq); +extern void io_queue_free(IOQueue *ioq); + +#endif /* IO_QUEUE_H */ diff --git a/src/include/storage/md.h b/src/include/storage/md.h index 9d7131eff4384..47ae6c36c94f7 100644 --- a/src/include/storage/md.h +++ b/src/include/storage/md.h @@ -21,6 +21,7 @@ #include "storage/sync.h" extern const PgAioHandleCallbacks aio_md_readv_cb; +extern const PgAioHandleCallbacks aio_md_writev_cb; /* md storage manager functionality */ extern void mdinit(void); @@ -45,6 +46,10 @@ extern void mdstartreadv(PgAioHandle *ioh, extern void mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const void **buffers, BlockNumber nblocks, bool skipFsync); +extern void mdstartwritev(PgAioHandle *ioh, + SMgrRelation reln, ForkNumber forknum, + BlockNumber blocknum, + const void **buffers, BlockNumber nblocks, bool skipFsync); extern void mdwriteback(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, BlockNumber nblocks); extern BlockNumber mdnblocks(SMgrRelation reln, ForkNumber forknum); diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h index 856ebcda350e9..f00b3763ac99e 100644 --- a/src/include/storage/smgr.h +++ b/src/include/storage/smgr.h @@ -108,6 +108,11 @@ extern void smgrwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const void **buffers, BlockNumber nblocks, bool skipFsync); +extern void smgrstartwritev(PgAioHandle *ioh, + SMgrRelation reln, ForkNumber forknum, + BlockNumber blocknum, + const void **buffers, BlockNumber nblocks, + bool skipFsync); extern void smgrwriteback(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, BlockNumber nblocks); extern BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum); diff --git a/src/include/utils/elog.h b/src/include/utils/elog.h index a5313c5d2d5ae..78cdd90c71807 100644 --- a/src/include/utils/elog.h +++ b/src/include/utils/elog.h @@ -14,7 +14,7 @@ #ifndef ELOG_H #define ELOG_H -#include +#include /* IWYU pragma: export */ #include "lib/stringinfo.h" @@ -76,7 +76,7 @@ struct Node; #define ERRCODE_IS_CATEGORY(ec) (((ec) & ~((1 << 12) - 1)) == 0) /* SQLSTATE codes for errors are defined in a separate file */ -#include "utils/errcodes.h" +#include "utils/errcodes.h" /* IWYU pragma: export */ /* * Provide a way to prevent "errno" from being accidentally used inside an diff --git a/src/include/utils/resowner.h b/src/include/utils/resowner.h index aede4bfc820a3..7e2ec2241693f 100644 --- a/src/include/utils/resowner.h +++ b/src/include/utils/resowner.h @@ -168,5 +168,7 @@ extern void ResourceOwnerForgetLock(ResourceOwner owner, struct LOCALLOCK *local struct dlist_node; extern void ResourceOwnerRememberAioHandle(ResourceOwner owner, struct dlist_node *ioh_node); extern void ResourceOwnerForgetAioHandle(ResourceOwner owner, struct dlist_node *ioh_node); +extern void ResourceOwnerRememberAioBounceBuffer(ResourceOwner owner, struct dlist_node *bb_node); +extern void ResourceOwnerForgetAioBounceBuffer(ResourceOwner owner, struct dlist_node *bb_node); #endif /* RESOWNER_H */ diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql index e495481c41e43..e2a8123516626 100644 --- a/src/test/modules/test_aio/test_aio--1.0.sql +++ b/src/test/modules/test_aio/test_aio--1.0.sql @@ -87,6 +87,27 @@ RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME' LANGUAGE C; +CREATE FUNCTION bb_get_and_error() +RETURNS pg_catalog.void STRICT +AS 'MODULE_PATHNAME' LANGUAGE C; + +CREATE FUNCTION bb_get_twice() +RETURNS pg_catalog.void STRICT +AS 'MODULE_PATHNAME' LANGUAGE C; + +CREATE FUNCTION bb_get() +RETURNS pg_catalog.void STRICT +AS 'MODULE_PATHNAME' LANGUAGE C; + +CREATE FUNCTION bb_get_release() +RETURNS pg_catalog.void STRICT +AS 'MODULE_PATHNAME' LANGUAGE C; + +CREATE FUNCTION bb_release_last() +RETURNS pg_catalog.void STRICT +AS 'MODULE_PATHNAME' LANGUAGE C; + + /* * Injection point related functions diff --git a/src/test/modules/test_aio/test_aio.c b/src/test/modules/test_aio/test_aio.c index bef0ecd90078f..d5ad374b3cef1 100644 --- a/src/test/modules/test_aio/test_aio.c +++ b/src/test/modules/test_aio/test_aio.c @@ -52,6 +52,7 @@ static shmem_startup_hook_type prev_shmem_startup_hook = NULL; static PgAioHandle *last_handle; +static PgAioBounceBuffer *last_bb; @@ -671,6 +672,60 @@ batch_end(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } +PG_FUNCTION_INFO_V1(bb_get); +Datum +bb_get(PG_FUNCTION_ARGS) +{ + last_bb = pgaio_bounce_buffer_get(); + + PG_RETURN_VOID(); +} + +PG_FUNCTION_INFO_V1(bb_release_last); +Datum +bb_release_last(PG_FUNCTION_ARGS) +{ + if (!last_bb) + elog(ERROR, "no bb"); + + pgaio_bounce_buffer_release(last_bb); + + PG_RETURN_VOID(); +} + +PG_FUNCTION_INFO_V1(bb_get_and_error); +Datum +bb_get_and_error(PG_FUNCTION_ARGS) +{ + pgaio_bounce_buffer_get(); + + elog(ERROR, "as you command"); + PG_RETURN_VOID(); +} + +PG_FUNCTION_INFO_V1(bb_get_twice); +Datum +bb_get_twice(PG_FUNCTION_ARGS) +{ + pgaio_bounce_buffer_get(); + pgaio_bounce_buffer_get(); + + PG_RETURN_VOID(); +} + + +PG_FUNCTION_INFO_V1(bb_get_release); +Datum +bb_get_release(PG_FUNCTION_ARGS) +{ + PgAioBounceBuffer *bb; + + bb = pgaio_bounce_buffer_get(); + pgaio_bounce_buffer_release(bb); + + PG_RETURN_VOID(); +} + #ifdef USE_INJECTION_POINTS extern PGDLLEXPORT void inj_io_short_read(const char *name, const void *private_data); extern PGDLLEXPORT void inj_io_reopen(const char *name, const void *private_data); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index b66cecd879910..e89c501fa8aff 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -349,6 +349,7 @@ BufferManagerRelation BufferStrategyControl BufferTag BufferUsage +BuffersToWrite BuildAccumulator BuiltinScript BulkInsertState @@ -1196,6 +1197,7 @@ IOContext IOFuncSelector IOObject IOOp +IOQueue IO_STATUS_BLOCK IPCompareMethod ITEM @@ -2137,6 +2139,7 @@ PermutationStep PermutationStepBlocker PermutationStepBlockerType PgAioBackend +PgAioBounceBuffer PgAioCtl PgAioHandle PgAioHandleCallbackID @@ -3021,6 +3024,7 @@ TocEntry TokenAuxData TokenizedAuthLine TrackItem +TrackedIO TransApplyAction TransInvalidationInfo TransState