@@ -155,12 +155,19 @@ static BlockNumber _mdnblocks(SMgrRelation reln, ForkNumber forknum,
155
155
156
156
static PgAioResult md_readv_complete (PgAioHandle * ioh , PgAioResult prior_result , uint8 cb_data );
157
157
static void md_readv_report (PgAioResult result , const PgAioTargetData * target_data , int elevel );
158
+ static PgAioResult md_writev_complete (PgAioHandle * ioh , PgAioResult prior_result , uint8 cb_data );
159
+ static void md_writev_report (PgAioResult result , const PgAioTargetData * target_data , int elevel );
158
160
159
161
const PgAioHandleCallbacks aio_md_readv_cb = {
160
162
.complete_shared = md_readv_complete ,
161
163
.report = md_readv_report ,
162
164
};
163
165
166
+ const PgAioHandleCallbacks aio_md_writev_cb = {
167
+ .complete_shared = md_writev_complete ,
168
+ .report = md_writev_report ,
169
+ };
170
+
164
171
165
172
static inline int
166
173
_mdfd_open_flags (void )
@@ -1115,6 +1122,64 @@ mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
1115
1122
}
1116
1123
}
1117
1124
1125
+ /*
1126
+ * mdstartwritev() -- Asynchronous version of mdrwritev().
1127
+ */
1128
+ void
1129
+ mdstartwritev (PgAioHandle * ioh ,
1130
+ SMgrRelation reln , ForkNumber forknum , BlockNumber blocknum ,
1131
+ const void * * buffers , BlockNumber nblocks , bool skipFsync )
1132
+ {
1133
+ off_t seekpos ;
1134
+ MdfdVec * v ;
1135
+ BlockNumber nblocks_this_segment ;
1136
+ struct iovec * iov ;
1137
+ int iovcnt ;
1138
+ int ret ;
1139
+
1140
+ v = _mdfd_getseg (reln , forknum , blocknum , false,
1141
+ EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY );
1142
+
1143
+ seekpos = (off_t ) BLCKSZ * (blocknum % ((BlockNumber ) RELSEG_SIZE ));
1144
+
1145
+ Assert (seekpos < (off_t ) BLCKSZ * RELSEG_SIZE );
1146
+
1147
+ nblocks_this_segment =
1148
+ Min (nblocks ,
1149
+ RELSEG_SIZE - (blocknum % ((BlockNumber ) RELSEG_SIZE )));
1150
+
1151
+ if (nblocks_this_segment != nblocks )
1152
+ elog (ERROR , "write crossing segment boundary" );
1153
+
1154
+ iovcnt = pgaio_io_get_iovec (ioh , & iov );
1155
+
1156
+ Assert (nblocks <= iovcnt );
1157
+
1158
+ iovcnt = buffers_to_iovec (iov , unconstify (void * * , buffers ), nblocks_this_segment );
1159
+
1160
+ Assert (iovcnt <= nblocks_this_segment );
1161
+
1162
+ if (!(io_direct_flags & IO_DIRECT_DATA ))
1163
+ pgaio_io_set_flag (ioh , PGAIO_HF_BUFFERED );
1164
+
1165
+ pgaio_io_set_target_smgr (ioh ,
1166
+ reln ,
1167
+ forknum ,
1168
+ blocknum ,
1169
+ nblocks ,
1170
+ skipFsync );
1171
+ pgaio_io_register_callbacks (ioh , PGAIO_HCB_MD_WRITEV , 0 );
1172
+
1173
+ ret = FileStartWriteV (ioh , v -> mdfd_vfd , iovcnt , seekpos , WAIT_EVENT_DATA_FILE_WRITE );
1174
+ if (ret != 0 )
1175
+ ereport (ERROR ,
1176
+ (errcode_for_file_access (),
1177
+ errmsg ("could not start writing blocks %u..%u in file \"%s\": %m" ,
1178
+ blocknum ,
1179
+ blocknum + nblocks_this_segment - 1 ,
1180
+ FilePathName (v -> mdfd_vfd ))));
1181
+ }
1182
+
1118
1183
1119
1184
/*
1120
1185
* mdwriteback() -- Tell the kernel to write pages back to storage.
@@ -1503,6 +1568,40 @@ register_dirty_segment(SMgrRelation reln, ForkNumber forknum, MdfdVec *seg)
1503
1568
}
1504
1569
}
1505
1570
1571
+ /*
1572
+ * Like register_dirty_segment(), except for use by AIO. In the completion
1573
+ * callback we don't have access to the MdfdVec (the completion callback might
1574
+ * be executed in a different backend than the issuing backend), therefore we
1575
+ * have to implement this slightly differently.
1576
+ */
1577
+ static void
1578
+ register_dirty_segment_aio (RelFileLocator locator , ForkNumber forknum , uint64 segno )
1579
+ {
1580
+ FileTag tag ;
1581
+
1582
+ INIT_MD_FILETAG (tag , locator , forknum , segno );
1583
+
1584
+ /*
1585
+ * Can't block here waiting for checkpointer to accept our sync request,
1586
+ * as checkpointer might be waiting for this AIO to finish if offloaded to
1587
+ * a worker.
1588
+ */
1589
+ if (!RegisterSyncRequest (& tag , SYNC_REQUEST , false /* retryOnError */ ))
1590
+ {
1591
+ char path [MAXPGPATH ];
1592
+
1593
+ ereport (DEBUG1 ,
1594
+ (errmsg_internal ("could not forward fsync request because request queue is full" )));
1595
+
1596
+ /* reuse mdsyncfiletag() to avoid duplicating code */
1597
+ if (mdsyncfiletag (& tag , path ))
1598
+ ereport (data_sync_elevel (ERROR ),
1599
+ (errcode_for_file_access (),
1600
+ errmsg ("could not fsync file \"%s\": %m" ,
1601
+ path )));
1602
+ }
1603
+ }
1604
+
1506
1605
/*
1507
1606
* register_unlink_segment() -- Schedule a file to be deleted after next checkpoint
1508
1607
*/
@@ -2037,3 +2136,103 @@ md_readv_report(PgAioResult result, const PgAioTargetData *td, int elevel)
2037
2136
td -> smgr .nblocks * (size_t ) BLCKSZ ));
2038
2137
}
2039
2138
}
2139
+
2140
+ /*
2141
+ * AIO completion callback for mdstartwritev().
2142
+ */
2143
+ static PgAioResult
2144
+ md_writev_complete (PgAioHandle * ioh , PgAioResult prior_result , uint8 cb_data )
2145
+ {
2146
+ PgAioTargetData * td = pgaio_io_get_target_data (ioh );
2147
+ PgAioResult result = prior_result ;
2148
+
2149
+ if (prior_result .result < 0 )
2150
+ {
2151
+ result .status = PGAIO_RS_ERROR ;
2152
+ result .id = PGAIO_HCB_MD_WRITEV ;
2153
+ /* For "hard" errors, track the error number in error_data */
2154
+ result .error_data = - prior_result .result ;
2155
+ result .result = 0 ;
2156
+
2157
+ pgaio_result_report (result , td , LOG );
2158
+
2159
+ return result ;
2160
+ }
2161
+
2162
+ /*
2163
+ * As explained above smgrstartwritev(), the smgr API operates on the
2164
+ * level of blocks, rather than bytes. Convert.
2165
+ */
2166
+ result .result /= BLCKSZ ;
2167
+
2168
+ Assert (result .result <= td -> smgr .nblocks );
2169
+
2170
+ if (result .result == 0 )
2171
+ {
2172
+ /* consider 0 blocks written a failure */
2173
+ result .status = PGAIO_RS_ERROR ;
2174
+ result .id = PGAIO_HCB_MD_WRITEV ;
2175
+ result .error_data = 0 ;
2176
+
2177
+ pgaio_result_report (result , td , LOG );
2178
+
2179
+ return result ;
2180
+ }
2181
+
2182
+ if (result .status != PGAIO_RS_ERROR &&
2183
+ result .result < td -> smgr .nblocks )
2184
+ {
2185
+ /* partial writes should be retried at upper level */
2186
+ result .status = PGAIO_RS_PARTIAL ;
2187
+ result .id = PGAIO_HCB_MD_WRITEV ;
2188
+ }
2189
+
2190
+ if (!td -> smgr .skip_fsync )
2191
+ register_dirty_segment_aio (td -> smgr .rlocator , td -> smgr .forkNum ,
2192
+ td -> smgr .blockNum / ((BlockNumber ) RELSEG_SIZE ));
2193
+
2194
+ return result ;
2195
+ }
2196
+
2197
+ /*
2198
+ * AIO error reporting callback for mdstartwritev().
2199
+ */
2200
+ static void
2201
+ md_writev_report (PgAioResult result , const PgAioTargetData * td , int elevel )
2202
+ {
2203
+ RelPathStr path ;
2204
+
2205
+ path = relpathbackend (td -> smgr .rlocator ,
2206
+ td -> smgr .is_temp ? MyProcNumber : INVALID_PROC_NUMBER ,
2207
+ td -> smgr .forkNum );
2208
+
2209
+ if (result .error_data != 0 )
2210
+ {
2211
+ errno = result .error_data ; /* for errcode_for_file_access() */
2212
+
2213
+ ereport (elevel ,
2214
+ errcode_for_file_access (),
2215
+ errmsg ("could not write blocks %u..%u in file \"%s\": %m" ,
2216
+ td -> smgr .blockNum ,
2217
+ td -> smgr .blockNum + td -> smgr .nblocks ,
2218
+ path .str )
2219
+ );
2220
+ }
2221
+ else
2222
+ {
2223
+ /*
2224
+ * NB: This will typically only be output in debug messages, while
2225
+ * retrying a partial IO.
2226
+ */
2227
+ ereport (elevel ,
2228
+ errcode (ERRCODE_DATA_CORRUPTED ),
2229
+ errmsg ("could not write blocks %u..%u in file \"%s\": wrote only %zu of %zu bytes" ,
2230
+ td -> smgr .blockNum ,
2231
+ td -> smgr .blockNum + td -> smgr .nblocks - 1 ,
2232
+ path .str ,
2233
+ result .result * (size_t ) BLCKSZ ,
2234
+ td -> smgr .nblocks * (size_t ) BLCKSZ
2235
+ )
2236
+ );
2237
+ }
2238
+ }
0 commit comments