Skip to content

Commit e5fe570

Browse files
committed
aio: Implement smgr/md/fd write support
TODO: - Right now the sync.c integration with smgr.c/md.c isn't properly safe to use in a critical section The only reason it doesn't immediately fail is that it's reasonably rare that RegisterSyncRequest() fails *and* either: - smgropen()->hash_search(HASH_ENTER) decides to resize the hash table, even though the lookup is guaranteed to succeed for io_method=worker. - an io_method=uring completion is run in a different backend and smgropen() needs to build a new entry and thus needs to allocate memory For a bit I thought this could be worked around easily enough by not doing an smgropen() in mdsyncfiletag(), or adding a "fallible" smgropen() and instead just opening the file directly. That actually does kinda solve the problem, but only because the memory allocation in PathNameOpenFile() uses malloc(), not palloc() and thus doesn't trigger - temp_file_limit implementation
1 parent 4ce424d commit e5fe570

File tree

8 files changed

+269
-0
lines changed

8 files changed

+269
-0
lines changed

src/backend/storage/aio/aio_callback.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ static const PgAioHandleCallbacksEntry aio_handle_cbs[] = {
4141
CALLBACK_ENTRY(PGAIO_HCB_INVALID, aio_invalid_cb),
4242

4343
CALLBACK_ENTRY(PGAIO_HCB_MD_READV, aio_md_readv_cb),
44+
CALLBACK_ENTRY(PGAIO_HCB_MD_WRITEV, aio_md_writev_cb),
4445

4546
CALLBACK_ENTRY(PGAIO_HCB_SHARED_BUFFER_READV, aio_shared_buffer_readv_cb),
4647

src/backend/storage/file/fd.c

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2348,6 +2348,34 @@ FileWriteV(File file, const struct iovec *iov, int iovcnt, off_t offset,
23482348
return returnCode;
23492349
}
23502350

2351+
int
2352+
FileStartWriteV(PgAioHandle *ioh, File file,
2353+
int iovcnt, off_t offset,
2354+
uint32 wait_event_info)
2355+
{
2356+
int returnCode;
2357+
Vfd *vfdP;
2358+
2359+
Assert(FileIsValid(file));
2360+
2361+
DO_DB(elog(LOG, "FileStartWriteV: %d (%s) " INT64_FORMAT " %d",
2362+
file, VfdCache[file].fileName,
2363+
(int64) offset,
2364+
iovcnt));
2365+
2366+
returnCode = FileAccess(file);
2367+
if (returnCode < 0)
2368+
return returnCode;
2369+
2370+
vfdP = &VfdCache[file];
2371+
2372+
/* FIXME: think about / reimplement temp_file_limit */
2373+
2374+
pgaio_io_start_writev(ioh, vfdP->fd, iovcnt, offset);
2375+
2376+
return 0;
2377+
}
2378+
23512379
int
23522380
FileSync(File file, uint32 wait_event_info)
23532381
{

src/backend/storage/smgr/md.c

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,12 +155,19 @@ static BlockNumber _mdnblocks(SMgrRelation reln, ForkNumber forknum,
155155

156156
static PgAioResult md_readv_complete(PgAioHandle *ioh, PgAioResult prior_result, uint8 cb_data);
157157
static void md_readv_report(PgAioResult result, const PgAioTargetData *target_data, int elevel);
158+
static PgAioResult md_writev_complete(PgAioHandle *ioh, PgAioResult prior_result, uint8 cb_data);
159+
static void md_writev_report(PgAioResult result, const PgAioTargetData *target_data, int elevel);
158160

159161
const PgAioHandleCallbacks aio_md_readv_cb = {
160162
.complete_shared = md_readv_complete,
161163
.report = md_readv_report,
162164
};
163165

166+
const PgAioHandleCallbacks aio_md_writev_cb = {
167+
.complete_shared = md_writev_complete,
168+
.report = md_writev_report,
169+
};
170+
164171

165172
static inline int
166173
_mdfd_open_flags(void)
@@ -1115,6 +1122,64 @@ mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
11151122
}
11161123
}
11171124

1125+
/*
1126+
* mdstartwritev() -- Asynchronous version of mdrwritev().
1127+
*/
1128+
void
1129+
mdstartwritev(PgAioHandle *ioh,
1130+
SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
1131+
const void **buffers, BlockNumber nblocks, bool skipFsync)
1132+
{
1133+
off_t seekpos;
1134+
MdfdVec *v;
1135+
BlockNumber nblocks_this_segment;
1136+
struct iovec *iov;
1137+
int iovcnt;
1138+
int ret;
1139+
1140+
v = _mdfd_getseg(reln, forknum, blocknum, false,
1141+
EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY);
1142+
1143+
seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE));
1144+
1145+
Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE);
1146+
1147+
nblocks_this_segment =
1148+
Min(nblocks,
1149+
RELSEG_SIZE - (blocknum % ((BlockNumber) RELSEG_SIZE)));
1150+
1151+
if (nblocks_this_segment != nblocks)
1152+
elog(ERROR, "write crossing segment boundary");
1153+
1154+
iovcnt = pgaio_io_get_iovec(ioh, &iov);
1155+
1156+
Assert(nblocks <= iovcnt);
1157+
1158+
iovcnt = buffers_to_iovec(iov, unconstify(void **, buffers), nblocks_this_segment);
1159+
1160+
Assert(iovcnt <= nblocks_this_segment);
1161+
1162+
if (!(io_direct_flags & IO_DIRECT_DATA))
1163+
pgaio_io_set_flag(ioh, PGAIO_HF_BUFFERED);
1164+
1165+
pgaio_io_set_target_smgr(ioh,
1166+
reln,
1167+
forknum,
1168+
blocknum,
1169+
nblocks,
1170+
skipFsync);
1171+
pgaio_io_register_callbacks(ioh, PGAIO_HCB_MD_WRITEV, 0);
1172+
1173+
ret = FileStartWriteV(ioh, v->mdfd_vfd, iovcnt, seekpos, WAIT_EVENT_DATA_FILE_WRITE);
1174+
if (ret != 0)
1175+
ereport(ERROR,
1176+
(errcode_for_file_access(),
1177+
errmsg("could not start writing blocks %u..%u in file \"%s\": %m",
1178+
blocknum,
1179+
blocknum + nblocks_this_segment - 1,
1180+
FilePathName(v->mdfd_vfd))));
1181+
}
1182+
11181183

11191184
/*
11201185
* mdwriteback() -- Tell the kernel to write pages back to storage.
@@ -1503,6 +1568,40 @@ register_dirty_segment(SMgrRelation reln, ForkNumber forknum, MdfdVec *seg)
15031568
}
15041569
}
15051570

1571+
/*
1572+
* Like register_dirty_segment(), except for use by AIO. In the completion
1573+
* callback we don't have access to the MdfdVec (the completion callback might
1574+
* be executed in a different backend than the issuing backend), therefore we
1575+
* have to implement this slightly differently.
1576+
*/
1577+
static void
1578+
register_dirty_segment_aio(RelFileLocator locator, ForkNumber forknum, uint64 segno)
1579+
{
1580+
FileTag tag;
1581+
1582+
INIT_MD_FILETAG(tag, locator, forknum, segno);
1583+
1584+
/*
1585+
* Can't block here waiting for checkpointer to accept our sync request,
1586+
* as checkpointer might be waiting for this AIO to finish if offloaded to
1587+
* a worker.
1588+
*/
1589+
if (!RegisterSyncRequest(&tag, SYNC_REQUEST, false /* retryOnError */ ))
1590+
{
1591+
char path[MAXPGPATH];
1592+
1593+
ereport(DEBUG1,
1594+
(errmsg_internal("could not forward fsync request because request queue is full")));
1595+
1596+
/* reuse mdsyncfiletag() to avoid duplicating code */
1597+
if (mdsyncfiletag(&tag, path))
1598+
ereport(data_sync_elevel(ERROR),
1599+
(errcode_for_file_access(),
1600+
errmsg("could not fsync file \"%s\": %m",
1601+
path)));
1602+
}
1603+
}
1604+
15061605
/*
15071606
* register_unlink_segment() -- Schedule a file to be deleted after next checkpoint
15081607
*/
@@ -2037,3 +2136,103 @@ md_readv_report(PgAioResult result, const PgAioTargetData *td, int elevel)
20372136
td->smgr.nblocks * (size_t) BLCKSZ));
20382137
}
20392138
}
2139+
2140+
/*
2141+
* AIO completion callback for mdstartwritev().
2142+
*/
2143+
static PgAioResult
2144+
md_writev_complete(PgAioHandle *ioh, PgAioResult prior_result, uint8 cb_data)
2145+
{
2146+
PgAioTargetData *td = pgaio_io_get_target_data(ioh);
2147+
PgAioResult result = prior_result;
2148+
2149+
if (prior_result.result < 0)
2150+
{
2151+
result.status = PGAIO_RS_ERROR;
2152+
result.id = PGAIO_HCB_MD_WRITEV;
2153+
/* For "hard" errors, track the error number in error_data */
2154+
result.error_data = -prior_result.result;
2155+
result.result = 0;
2156+
2157+
pgaio_result_report(result, td, LOG);
2158+
2159+
return result;
2160+
}
2161+
2162+
/*
2163+
* As explained above smgrstartwritev(), the smgr API operates on the
2164+
* level of blocks, rather than bytes. Convert.
2165+
*/
2166+
result.result /= BLCKSZ;
2167+
2168+
Assert(result.result <= td->smgr.nblocks);
2169+
2170+
if (result.result == 0)
2171+
{
2172+
/* consider 0 blocks written a failure */
2173+
result.status = PGAIO_RS_ERROR;
2174+
result.id = PGAIO_HCB_MD_WRITEV;
2175+
result.error_data = 0;
2176+
2177+
pgaio_result_report(result, td, LOG);
2178+
2179+
return result;
2180+
}
2181+
2182+
if (result.status != PGAIO_RS_ERROR &&
2183+
result.result < td->smgr.nblocks)
2184+
{
2185+
/* partial writes should be retried at upper level */
2186+
result.status = PGAIO_RS_PARTIAL;
2187+
result.id = PGAIO_HCB_MD_WRITEV;
2188+
}
2189+
2190+
if (!td->smgr.skip_fsync)
2191+
register_dirty_segment_aio(td->smgr.rlocator, td->smgr.forkNum,
2192+
td->smgr.blockNum / ((BlockNumber) RELSEG_SIZE));
2193+
2194+
return result;
2195+
}
2196+
2197+
/*
2198+
* AIO error reporting callback for mdstartwritev().
2199+
*/
2200+
static void
2201+
md_writev_report(PgAioResult result, const PgAioTargetData *td, int elevel)
2202+
{
2203+
RelPathStr path;
2204+
2205+
path = relpathbackend(td->smgr.rlocator,
2206+
td->smgr.is_temp ? MyProcNumber : INVALID_PROC_NUMBER,
2207+
td->smgr.forkNum);
2208+
2209+
if (result.error_data != 0)
2210+
{
2211+
errno = result.error_data; /* for errcode_for_file_access() */
2212+
2213+
ereport(elevel,
2214+
errcode_for_file_access(),
2215+
errmsg("could not write blocks %u..%u in file \"%s\": %m",
2216+
td->smgr.blockNum,
2217+
td->smgr.blockNum + td->smgr.nblocks,
2218+
path.str)
2219+
);
2220+
}
2221+
else
2222+
{
2223+
/*
2224+
* NB: This will typically only be output in debug messages, while
2225+
* retrying a partial IO.
2226+
*/
2227+
ereport(elevel,
2228+
errcode(ERRCODE_DATA_CORRUPTED),
2229+
errmsg("could not write blocks %u..%u in file \"%s\": wrote only %zu of %zu bytes",
2230+
td->smgr.blockNum,
2231+
td->smgr.blockNum + td->smgr.nblocks - 1,
2232+
path.str,
2233+
result.result * (size_t) BLCKSZ,
2234+
td->smgr.nblocks * (size_t) BLCKSZ
2235+
)
2236+
);
2237+
}
2238+
}

src/backend/storage/smgr/smgr.c

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,11 @@ typedef struct f_smgr
115115
BlockNumber blocknum,
116116
const void **buffers, BlockNumber nblocks,
117117
bool skipFsync);
118+
void (*smgr_startwritev) (PgAioHandle *ioh,
119+
SMgrRelation reln, ForkNumber forknum,
120+
BlockNumber blocknum,
121+
const void **buffers, BlockNumber nblocks,
122+
bool skipFsync);
118123
void (*smgr_writeback) (SMgrRelation reln, ForkNumber forknum,
119124
BlockNumber blocknum, BlockNumber nblocks);
120125
BlockNumber (*smgr_nblocks) (SMgrRelation reln, ForkNumber forknum);
@@ -142,6 +147,7 @@ static const f_smgr smgrsw[] = {
142147
.smgr_readv = mdreadv,
143148
.smgr_startreadv = mdstartreadv,
144149
.smgr_writev = mdwritev,
150+
.smgr_startwritev = mdstartwritev,
145151
.smgr_writeback = mdwriteback,
146152
.smgr_nblocks = mdnblocks,
147153
.smgr_truncate = mdtruncate,
@@ -787,6 +793,29 @@ smgrwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
787793
RESUME_INTERRUPTS();
788794
}
789795

796+
/*
797+
* smgrstartwritev() -- asynchronous version of smgrwritev()
798+
*
799+
* This starts an asynchronous writev IO using the IO handle `ioh`. Other than
800+
* `ioh` all parameters are the same as smgrwritev().
801+
*
802+
* Completion callbacks above smgr will be passed the result as the number of
803+
* successfully written blocks if the write [partially] succeeds. This
804+
* maintains the abstraction that smgr operates on the level of blocks, rather
805+
* than bytes.
806+
*/
807+
void
808+
smgrstartwritev(PgAioHandle *ioh,
809+
SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
810+
const void **buffers, BlockNumber nblocks, bool skipFsync)
811+
{
812+
HOLD_INTERRUPTS();
813+
smgrsw[reln->smgr_which].smgr_startwritev(ioh,
814+
reln, forknum, blocknum, buffers,
815+
nblocks, skipFsync);
816+
RESUME_INTERRUPTS();
817+
}
818+
790819
/*
791820
* smgrwriteback() -- Trigger kernel writeback for the supplied range of
792821
* blocks.

src/include/storage/aio.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ typedef enum PgAioHandleCallbackID
194194
PGAIO_HCB_INVALID = 0,
195195

196196
PGAIO_HCB_MD_READV,
197+
PGAIO_HCB_MD_WRITEV,
197198

198199
PGAIO_HCB_SHARED_BUFFER_READV,
199200

src/include/storage/fd.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ extern int FilePrefetch(File file, off_t offset, off_t amount, uint32 wait_event
112112
extern ssize_t FileReadV(File file, const struct iovec *iov, int iovcnt, off_t offset, uint32 wait_event_info);
113113
extern ssize_t FileWriteV(File file, const struct iovec *iov, int iovcnt, off_t offset, uint32 wait_event_info);
114114
extern int FileStartReadV(struct PgAioHandle *ioh, File file, int iovcnt, off_t offset, uint32 wait_event_info);
115+
extern int FileStartWriteV(struct PgAioHandle *ioh, File file, int iovcnt, off_t offset, uint32 wait_event_info);
115116
extern int FileSync(File file, uint32 wait_event_info);
116117
extern int FileZero(File file, off_t offset, off_t amount, uint32 wait_event_info);
117118
extern int FileFallocate(File file, off_t offset, off_t amount, uint32 wait_event_info);

src/include/storage/md.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include "storage/sync.h"
2222

2323
extern const PgAioHandleCallbacks aio_md_readv_cb;
24+
extern const PgAioHandleCallbacks aio_md_writev_cb;
2425

2526
/* md storage manager functionality */
2627
extern void mdinit(void);
@@ -45,6 +46,10 @@ extern void mdstartreadv(PgAioHandle *ioh,
4546
extern void mdwritev(SMgrRelation reln, ForkNumber forknum,
4647
BlockNumber blocknum,
4748
const void **buffers, BlockNumber nblocks, bool skipFsync);
49+
extern void mdstartwritev(PgAioHandle *ioh,
50+
SMgrRelation reln, ForkNumber forknum,
51+
BlockNumber blocknum,
52+
const void **buffers, BlockNumber nblocks, bool skipFsync);
4853
extern void mdwriteback(SMgrRelation reln, ForkNumber forknum,
4954
BlockNumber blocknum, BlockNumber nblocks);
5055
extern BlockNumber mdnblocks(SMgrRelation reln, ForkNumber forknum);

src/include/storage/smgr.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,11 @@ extern void smgrwritev(SMgrRelation reln, ForkNumber forknum,
108108
BlockNumber blocknum,
109109
const void **buffers, BlockNumber nblocks,
110110
bool skipFsync);
111+
extern void smgrstartwritev(PgAioHandle *ioh,
112+
SMgrRelation reln, ForkNumber forknum,
113+
BlockNumber blocknum,
114+
const void **buffers, BlockNumber nblocks,
115+
bool skipFsync);
111116
extern void smgrwriteback(SMgrRelation reln, ForkNumber forknum,
112117
BlockNumber blocknum, BlockNumber nblocks);
113118
extern BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum);

0 commit comments

Comments
 (0)