Skip to content

Commit 50cb750

Browse files
committed
aio: Implement support for reads in smgr/md/fd
This implements the following: 1) An smgr AIO target, for AIO on smgr files. This should be usable not just for md.c but also other SMGR implementation if we ever get them. 2) readv support in fd.c, which requires a small bit of infrastructure work in fd.c 3) smgr.c and md.c support for readv There still is nothing performing AIO, but as of this commit it would be possible. As part of this change FileGetRawDesc() actually ensures that the file is opened - previously it was basically not usable. It's used to reopen a file in IO workers. Reviewed-by: Noah Misch <noah@leadboat.com> Discussion: https://postgr.es/m/uvrtrknj4kdytuboidbhwclo4gxhswwcpgadptsjvjqcluzmah%40brqs62irg4dt Discussion: https://postgr.es/m/20210223100344.llw5an2aklengrmn@alap3.anarazel.de Discussion: https://postgr.es/m/stj36ea6yyhoxtqkhpieia2z4krnam7qyetc57rfezgk4zgapf@gcnactj4z56m
1 parent dee8002 commit 50cb750

File tree

10 files changed

+437
-4
lines changed

10 files changed

+437
-4
lines changed

src/backend/storage/aio/aio_callback.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include "miscadmin.h"
1919
#include "storage/aio.h"
2020
#include "storage/aio_internal.h"
21+
#include "storage/md.h"
2122

2223

2324
/* just to have something to put into aio_handle_cbs */
@@ -37,6 +38,8 @@ typedef struct PgAioHandleCallbacksEntry
3738
static const PgAioHandleCallbacksEntry aio_handle_cbs[] = {
3839
#define CALLBACK_ENTRY(id, callback) [id] = {.cb = &callback, .name = #callback}
3940
CALLBACK_ENTRY(PGAIO_HCB_INVALID, aio_invalid_cb),
41+
42+
CALLBACK_ENTRY(PGAIO_HCB_MD_READV, aio_md_readv_cb),
4043
#undef CALLBACK_ENTRY
4144
};
4245

src/backend/storage/aio/aio_target.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
#include "storage/aio.h"
1818
#include "storage/aio_internal.h"
19+
#include "storage/smgr.h"
1920

2021

2122
/*
@@ -25,6 +26,7 @@ static const PgAioTargetInfo *pgaio_target_info[] = {
2526
[PGAIO_TID_INVALID] = &(PgAioTargetInfo) {
2627
.name = "invalid",
2728
},
29+
[PGAIO_TID_SMGR] = &aio_smgr_target_info,
2830
};
2931

3032

src/backend/storage/file/fd.c

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@
9494
#include "miscadmin.h"
9595
#include "pgstat.h"
9696
#include "postmaster/startup.h"
97+
#include "storage/aio.h"
9798
#include "storage/fd.h"
9899
#include "storage/ipc.h"
99100
#include "utils/guc.h"
@@ -1296,6 +1297,8 @@ LruDelete(File file)
12961297

12971298
vfdP = &VfdCache[file];
12981299

1300+
pgaio_closing_fd(vfdP->fd);
1301+
12991302
/*
13001303
* Close the file. We aren't expecting this to fail; if it does, better
13011304
* to leak the FD than to mess up our internal state.
@@ -1989,6 +1992,8 @@ FileClose(File file)
19891992

19901993
if (!FileIsNotOpen(file))
19911994
{
1995+
pgaio_closing_fd(vfdP->fd);
1996+
19921997
/* close the file */
19931998
if (close(vfdP->fd) != 0)
19941999
{
@@ -2212,6 +2217,32 @@ FileReadV(File file, const struct iovec *iov, int iovcnt, off_t offset,
22122217
return returnCode;
22132218
}
22142219

2220+
int
2221+
FileStartReadV(PgAioHandle *ioh, File file,
2222+
int iovcnt, off_t offset,
2223+
uint32 wait_event_info)
2224+
{
2225+
int returnCode;
2226+
Vfd *vfdP;
2227+
2228+
Assert(FileIsValid(file));
2229+
2230+
DO_DB(elog(LOG, "FileStartReadV: %d (%s) " INT64_FORMAT " %d",
2231+
file, VfdCache[file].fileName,
2232+
(int64) offset,
2233+
iovcnt));
2234+
2235+
returnCode = FileAccess(file);
2236+
if (returnCode < 0)
2237+
return returnCode;
2238+
2239+
vfdP = &VfdCache[file];
2240+
2241+
pgaio_io_start_readv(ioh, vfdP->fd, iovcnt, offset);
2242+
2243+
return 0;
2244+
}
2245+
22152246
ssize_t
22162247
FileWriteV(File file, const struct iovec *iov, int iovcnt, off_t offset,
22172248
uint32 wait_event_info)
@@ -2500,6 +2531,12 @@ FilePathName(File file)
25002531
int
25012532
FileGetRawDesc(File file)
25022533
{
2534+
int returnCode;
2535+
2536+
returnCode = FileAccess(file);
2537+
if (returnCode < 0)
2538+
return returnCode;
2539+
25032540
Assert(FileIsValid(file));
25042541
return VfdCache[file].fd;
25052542
}
@@ -2780,6 +2817,7 @@ FreeDesc(AllocateDesc *desc)
27802817
result = closedir(desc->desc.dir);
27812818
break;
27822819
case AllocateDescRawFD:
2820+
pgaio_closing_fd(desc->desc.fd);
27832821
result = close(desc->desc.fd);
27842822
break;
27852823
default:
@@ -2848,6 +2886,8 @@ CloseTransientFile(int fd)
28482886
/* Only get here if someone passes us a file not in allocatedDescs */
28492887
elog(WARNING, "fd passed to CloseTransientFile was not obtained from OpenTransientFile");
28502888

2889+
pgaio_closing_fd(fd);
2890+
28512891
return close(fd);
28522892
}
28532893

src/backend/storage/smgr/md.c

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include "miscadmin.h"
3232
#include "pg_trace.h"
3333
#include "pgstat.h"
34+
#include "storage/aio.h"
3435
#include "storage/bufmgr.h"
3536
#include "storage/fd.h"
3637
#include "storage/md.h"
@@ -152,6 +153,15 @@ static MdfdVec *_mdfd_getseg(SMgrRelation reln, ForkNumber forknum,
152153
static BlockNumber _mdnblocks(SMgrRelation reln, ForkNumber forknum,
153154
MdfdVec *seg);
154155

156+
static PgAioResult md_readv_complete(PgAioHandle *ioh, PgAioResult prior_result, uint8 cb_data);
157+
static void md_readv_report(PgAioResult result, const PgAioTargetData *target_data, int elevel);
158+
159+
const PgAioHandleCallbacks aio_md_readv_cb = {
160+
.complete_shared = md_readv_complete,
161+
.report = md_readv_report,
162+
};
163+
164+
155165
static inline int
156166
_mdfd_open_flags(void)
157167
{
@@ -937,6 +947,69 @@ mdreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
937947
}
938948
}
939949

950+
/*
951+
* mdstartreadv() -- Asynchronous version of mdreadv().
952+
*/
953+
void
954+
mdstartreadv(PgAioHandle *ioh,
955+
SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
956+
void **buffers, BlockNumber nblocks)
957+
{
958+
off_t seekpos;
959+
MdfdVec *v;
960+
BlockNumber nblocks_this_segment;
961+
struct iovec *iov;
962+
int iovcnt;
963+
int ret;
964+
965+
v = _mdfd_getseg(reln, forknum, blocknum, false,
966+
EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY);
967+
968+
seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE));
969+
970+
Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE);
971+
972+
nblocks_this_segment =
973+
Min(nblocks,
974+
RELSEG_SIZE - (blocknum % ((BlockNumber) RELSEG_SIZE)));
975+
976+
if (nblocks_this_segment != nblocks)
977+
elog(ERROR, "read crossing segment boundary");
978+
979+
iovcnt = pgaio_io_get_iovec(ioh, &iov);
980+
981+
Assert(nblocks <= iovcnt);
982+
983+
iovcnt = buffers_to_iovec(iov, buffers, nblocks_this_segment);
984+
985+
Assert(iovcnt <= nblocks_this_segment);
986+
987+
if (!(io_direct_flags & IO_DIRECT_DATA))
988+
pgaio_io_set_flag(ioh, PGAIO_HF_BUFFERED);
989+
990+
pgaio_io_set_target_smgr(ioh,
991+
reln,
992+
forknum,
993+
blocknum,
994+
nblocks,
995+
false);
996+
pgaio_io_register_callbacks(ioh, PGAIO_HCB_MD_READV, 0);
997+
998+
ret = FileStartReadV(ioh, v->mdfd_vfd, iovcnt, seekpos, WAIT_EVENT_DATA_FILE_READ);
999+
if (ret != 0)
1000+
ereport(ERROR,
1001+
(errcode_for_file_access(),
1002+
errmsg("could not start reading blocks %u..%u in file \"%s\": %m",
1003+
blocknum,
1004+
blocknum + nblocks_this_segment - 1,
1005+
FilePathName(v->mdfd_vfd))));
1006+
1007+
/*
1008+
* The error checks corresponding to the post-read checks in mdreadv() are
1009+
* in md_readv_complete().
1010+
*/
1011+
}
1012+
9401013
/*
9411014
* mdwritev() -- Write the supplied blocks at the appropriate location.
9421015
*
@@ -1365,6 +1438,21 @@ mdimmedsync(SMgrRelation reln, ForkNumber forknum)
13651438
}
13661439
}
13671440

1441+
int
1442+
mdfd(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, uint32 *off)
1443+
{
1444+
MdfdVec *v = mdopenfork(reln, forknum, EXTENSION_FAIL);
1445+
1446+
v = _mdfd_getseg(reln, forknum, blocknum, false,
1447+
EXTENSION_FAIL);
1448+
1449+
*off = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE));
1450+
1451+
Assert(*off < (off_t) BLCKSZ * RELSEG_SIZE);
1452+
1453+
return FileGetRawDesc(v->mdfd_vfd);
1454+
}
1455+
13681456
/*
13691457
* register_dirty_segment() -- Mark a relation segment as needing fsync
13701458
*
@@ -1841,3 +1929,111 @@ mdfiletagmatches(const FileTag *ftag, const FileTag *candidate)
18411929
*/
18421930
return ftag->rlocator.dbOid == candidate->rlocator.dbOid;
18431931
}
1932+
1933+
/*
1934+
* AIO completion callback for mdstartreadv().
1935+
*/
1936+
static PgAioResult
1937+
md_readv_complete(PgAioHandle *ioh, PgAioResult prior_result, uint8 cb_data)
1938+
{
1939+
PgAioTargetData *td = pgaio_io_get_target_data(ioh);
1940+
PgAioResult result = prior_result;
1941+
1942+
if (prior_result.result < 0)
1943+
{
1944+
result.status = PGAIO_RS_ERROR;
1945+
result.id = PGAIO_HCB_MD_READV;
1946+
/* For "hard" errors, track the error number in error_data */
1947+
result.error_data = -prior_result.result;
1948+
result.result = 0;
1949+
1950+
/*
1951+
* Immediately log a message about the IO error, but only to the
1952+
* server log. The reason to do so immediately is that the originator
1953+
* might not process the query result immediately (because it is busy
1954+
* doing another part of query processing) or at all (e.g. if it was
1955+
* cancelled or errored out due to another IO also failing). The
1956+
* issuer of the IO will emit an ERROR when processing the IO's
1957+
* results
1958+
*/
1959+
pgaio_result_report(result, td, LOG_SERVER_ONLY);
1960+
1961+
return result;
1962+
}
1963+
1964+
/*
1965+
* As explained above smgrstartreadv(), the smgr API operates on the level
1966+
* of blocks, rather than bytes. Convert.
1967+
*/
1968+
result.result /= BLCKSZ;
1969+
1970+
Assert(result.result <= td->smgr.nblocks);
1971+
1972+
if (result.result == 0)
1973+
{
1974+
/* consider 0 blocks read a failure */
1975+
result.status = PGAIO_RS_ERROR;
1976+
result.id = PGAIO_HCB_MD_READV;
1977+
result.error_data = 0;
1978+
1979+
/* see comment above the "hard error" case */
1980+
pgaio_result_report(result, td, LOG_SERVER_ONLY);
1981+
1982+
return result;
1983+
}
1984+
1985+
if (result.status != PGAIO_RS_ERROR &&
1986+
result.result < td->smgr.nblocks)
1987+
{
1988+
/* partial reads should be retried at upper level */
1989+
result.status = PGAIO_RS_PARTIAL;
1990+
result.id = PGAIO_HCB_MD_READV;
1991+
}
1992+
1993+
return result;
1994+
}
1995+
1996+
/*
1997+
* AIO error reporting callback for mdstartreadv().
1998+
*
1999+
* Errors are encoded as follows:
2000+
* - PgAioResult.error_data != 0 encodes IO that failed with that errno
2001+
* - PgAioResult.error_data == 0 encodes IO that didn't read all data
2002+
*/
2003+
static void
2004+
md_readv_report(PgAioResult result, const PgAioTargetData *td, int elevel)
2005+
{
2006+
RelPathStr path;
2007+
2008+
path = relpathbackend(td->smgr.rlocator,
2009+
td->smgr.is_temp ? MyProcNumber : INVALID_PROC_NUMBER,
2010+
td->smgr.forkNum);
2011+
2012+
if (result.error_data != 0)
2013+
{
2014+
/* for errcode_for_file_access() and %m */
2015+
errno = result.error_data;
2016+
2017+
ereport(elevel,
2018+
errcode_for_file_access(),
2019+
errmsg("could not read blocks %u..%u in file \"%s\": %m",
2020+
td->smgr.blockNum,
2021+
td->smgr.blockNum + td->smgr.nblocks - 1,
2022+
path.str));
2023+
}
2024+
else
2025+
{
2026+
/*
2027+
* NB: This will typically only be output in debug messages, while
2028+
* retrying a partial IO.
2029+
*/
2030+
ereport(elevel,
2031+
errcode(ERRCODE_DATA_CORRUPTED),
2032+
errmsg("could not read blocks %u..%u in file \"%s\": read only %zu of %zu bytes",
2033+
td->smgr.blockNum,
2034+
td->smgr.blockNum + td->smgr.nblocks - 1,
2035+
path.str,
2036+
result.result * (size_t) BLCKSZ,
2037+
td->smgr.nblocks * (size_t) BLCKSZ));
2038+
}
2039+
}

0 commit comments

Comments
 (0)