Skip to content

Commit 6c271ce

Browse files
committed
io_uring: add submission polling
This enables an application to do IO, without ever entering the kernel. By using the SQ ring to fill in new sqes and watching for completions on the CQ ring, we can submit and reap IOs without doing a single system call. The kernel side thread will poll for new submissions, and in case of HIPRI/polled IO, it'll also poll for completions. By default, we allow 1 second of active spinning. This can by changed by passing in a different grace period at io_uring_register(2) time. If the thread exceeds this idle time without having any work to do, it will set: sq_ring->flags |= IORING_SQ_NEED_WAKEUP. The application will have to call io_uring_enter() to start things back up again. If IO is kept busy, that will never be needed. Basically an application that has this feature enabled will guard it's io_uring_enter(2) call with: read_barrier(); if (*sq_ring->flags & IORING_SQ_NEED_WAKEUP) io_uring_enter(fd, 0, 0, IORING_ENTER_SQ_WAKEUP); instead of calling it unconditionally. It's mandatory to use fixed files with this feature. Failure to do so will result in the application getting an -EBADF CQ entry when submitting IO. Reviewed-by: Hannes Reinecke <hare@suse.com> Signed-off-by: Jens Axboe <axboe@kernel.dk>
1 parent 6b06314 commit 6c271ce

File tree

2 files changed

+253
-8
lines changed

2 files changed

+253
-8
lines changed

fs/io_uring.c

Lines changed: 242 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
#include <linux/percpu.h>
4545
#include <linux/slab.h>
4646
#include <linux/workqueue.h>
47+
#include <linux/kthread.h>
4748
#include <linux/blkdev.h>
4849
#include <linux/bvec.h>
4950
#include <linux/net.h>
@@ -108,12 +109,16 @@ struct io_ring_ctx {
108109
unsigned cached_sq_head;
109110
unsigned sq_entries;
110111
unsigned sq_mask;
112+
unsigned sq_thread_idle;
111113
struct io_uring_sqe *sq_sqes;
112114
} ____cacheline_aligned_in_smp;
113115

114116
/* IO offload */
115117
struct workqueue_struct *sqo_wq;
118+
struct task_struct *sqo_thread; /* if using sq thread polling */
116119
struct mm_struct *sqo_mm;
120+
wait_queue_head_t sqo_wait;
121+
unsigned sqo_stop;
117122

118123
struct {
119124
/* CQ ring */
@@ -168,6 +173,7 @@ struct sqe_submit {
168173
unsigned short index;
169174
bool has_user;
170175
bool needs_lock;
176+
bool needs_fixed_file;
171177
};
172178

173179
struct io_kiocb {
@@ -327,6 +333,8 @@ static void io_cqring_add_event(struct io_ring_ctx *ctx, u64 ki_user_data,
327333

328334
if (waitqueue_active(&ctx->wait))
329335
wake_up(&ctx->wait);
336+
if (waitqueue_active(&ctx->sqo_wait))
337+
wake_up(&ctx->sqo_wait);
330338
}
331339

332340
static void io_ring_drop_ctx_refs(struct io_ring_ctx *ctx, unsigned refs)
@@ -680,9 +688,10 @@ static bool io_file_supports_async(struct file *file)
680688
return false;
681689
}
682690

683-
static int io_prep_rw(struct io_kiocb *req, const struct io_uring_sqe *sqe,
691+
static int io_prep_rw(struct io_kiocb *req, const struct sqe_submit *s,
684692
bool force_nonblock, struct io_submit_state *state)
685693
{
694+
const struct io_uring_sqe *sqe = s->sqe;
686695
struct io_ring_ctx *ctx = req->ctx;
687696
struct kiocb *kiocb = &req->rw;
688697
unsigned ioprio, flags;
@@ -702,6 +711,8 @@ static int io_prep_rw(struct io_kiocb *req, const struct io_uring_sqe *sqe,
702711
kiocb->ki_filp = ctx->user_files[fd];
703712
req->flags |= REQ_F_FIXED_FILE;
704713
} else {
714+
if (s->needs_fixed_file)
715+
return -EBADF;
705716
kiocb->ki_filp = io_file_get(state, fd);
706717
if (unlikely(!kiocb->ki_filp))
707718
return -EBADF;
@@ -865,7 +876,7 @@ static ssize_t io_read(struct io_kiocb *req, const struct sqe_submit *s,
865876
struct file *file;
866877
ssize_t ret;
867878

868-
ret = io_prep_rw(req, s->sqe, force_nonblock, state);
879+
ret = io_prep_rw(req, s, force_nonblock, state);
869880
if (ret)
870881
return ret;
871882
file = kiocb->ki_filp;
@@ -909,7 +920,7 @@ static ssize_t io_write(struct io_kiocb *req, const struct sqe_submit *s,
909920
struct file *file;
910921
ssize_t ret;
911922

912-
ret = io_prep_rw(req, s->sqe, force_nonblock, state);
923+
ret = io_prep_rw(req, s, force_nonblock, state);
913924
if (ret)
914925
return ret;
915926
/* Hold on to the file for -EAGAIN */
@@ -1301,6 +1312,169 @@ static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s)
13011312
return false;
13021313
}
13031314

1315+
static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
1316+
unsigned int nr, bool has_user, bool mm_fault)
1317+
{
1318+
struct io_submit_state state, *statep = NULL;
1319+
int ret, i, submitted = 0;
1320+
1321+
if (nr > IO_PLUG_THRESHOLD) {
1322+
io_submit_state_start(&state, ctx, nr);
1323+
statep = &state;
1324+
}
1325+
1326+
for (i = 0; i < nr; i++) {
1327+
if (unlikely(mm_fault)) {
1328+
ret = -EFAULT;
1329+
} else {
1330+
sqes[i].has_user = has_user;
1331+
sqes[i].needs_lock = true;
1332+
sqes[i].needs_fixed_file = true;
1333+
ret = io_submit_sqe(ctx, &sqes[i], statep);
1334+
}
1335+
if (!ret) {
1336+
submitted++;
1337+
continue;
1338+
}
1339+
1340+
io_cqring_add_event(ctx, sqes[i].sqe->user_data, ret, 0);
1341+
}
1342+
1343+
if (statep)
1344+
io_submit_state_end(&state);
1345+
1346+
return submitted;
1347+
}
1348+
1349+
static int io_sq_thread(void *data)
1350+
{
1351+
struct sqe_submit sqes[IO_IOPOLL_BATCH];
1352+
struct io_ring_ctx *ctx = data;
1353+
struct mm_struct *cur_mm = NULL;
1354+
mm_segment_t old_fs;
1355+
DEFINE_WAIT(wait);
1356+
unsigned inflight;
1357+
unsigned long timeout;
1358+
1359+
old_fs = get_fs();
1360+
set_fs(USER_DS);
1361+
1362+
timeout = inflight = 0;
1363+
while (!kthread_should_stop() && !ctx->sqo_stop) {
1364+
bool all_fixed, mm_fault = false;
1365+
int i;
1366+
1367+
if (inflight) {
1368+
unsigned nr_events = 0;
1369+
1370+
if (ctx->flags & IORING_SETUP_IOPOLL) {
1371+
/*
1372+
* We disallow the app entering submit/complete
1373+
* with polling, but we still need to lock the
1374+
* ring to prevent racing with polled issue
1375+
* that got punted to a workqueue.
1376+
*/
1377+
mutex_lock(&ctx->uring_lock);
1378+
io_iopoll_check(ctx, &nr_events, 0);
1379+
mutex_unlock(&ctx->uring_lock);
1380+
} else {
1381+
/*
1382+
* Normal IO, just pretend everything completed.
1383+
* We don't have to poll completions for that.
1384+
*/
1385+
nr_events = inflight;
1386+
}
1387+
1388+
inflight -= nr_events;
1389+
if (!inflight)
1390+
timeout = jiffies + ctx->sq_thread_idle;
1391+
}
1392+
1393+
if (!io_get_sqring(ctx, &sqes[0])) {
1394+
/*
1395+
* We're polling. If we're within the defined idle
1396+
* period, then let us spin without work before going
1397+
* to sleep.
1398+
*/
1399+
if (inflight || !time_after(jiffies, timeout)) {
1400+
cpu_relax();
1401+
continue;
1402+
}
1403+
1404+
/*
1405+
* Drop cur_mm before scheduling, we can't hold it for
1406+
* long periods (or over schedule()). Do this before
1407+
* adding ourselves to the waitqueue, as the unuse/drop
1408+
* may sleep.
1409+
*/
1410+
if (cur_mm) {
1411+
unuse_mm(cur_mm);
1412+
mmput(cur_mm);
1413+
cur_mm = NULL;
1414+
}
1415+
1416+
prepare_to_wait(&ctx->sqo_wait, &wait,
1417+
TASK_INTERRUPTIBLE);
1418+
1419+
/* Tell userspace we may need a wakeup call */
1420+
ctx->sq_ring->flags |= IORING_SQ_NEED_WAKEUP;
1421+
smp_wmb();
1422+
1423+
if (!io_get_sqring(ctx, &sqes[0])) {
1424+
if (kthread_should_stop()) {
1425+
finish_wait(&ctx->sqo_wait, &wait);
1426+
break;
1427+
}
1428+
if (signal_pending(current))
1429+
flush_signals(current);
1430+
schedule();
1431+
finish_wait(&ctx->sqo_wait, &wait);
1432+
1433+
ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP;
1434+
smp_wmb();
1435+
continue;
1436+
}
1437+
finish_wait(&ctx->sqo_wait, &wait);
1438+
1439+
ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP;
1440+
smp_wmb();
1441+
}
1442+
1443+
i = 0;
1444+
all_fixed = true;
1445+
do {
1446+
if (all_fixed && io_sqe_needs_user(sqes[i].sqe))
1447+
all_fixed = false;
1448+
1449+
i++;
1450+
if (i == ARRAY_SIZE(sqes))
1451+
break;
1452+
} while (io_get_sqring(ctx, &sqes[i]));
1453+
1454+
/* Unless all new commands are FIXED regions, grab mm */
1455+
if (!all_fixed && !cur_mm) {
1456+
mm_fault = !mmget_not_zero(ctx->sqo_mm);
1457+
if (!mm_fault) {
1458+
use_mm(ctx->sqo_mm);
1459+
cur_mm = ctx->sqo_mm;
1460+
}
1461+
}
1462+
1463+
inflight += io_submit_sqes(ctx, sqes, i, cur_mm != NULL,
1464+
mm_fault);
1465+
1466+
/* Commit SQ ring head once we've consumed all SQEs */
1467+
io_commit_sqring(ctx);
1468+
}
1469+
1470+
set_fs(old_fs);
1471+
if (cur_mm) {
1472+
unuse_mm(cur_mm);
1473+
mmput(cur_mm);
1474+
}
1475+
return 0;
1476+
}
1477+
13041478
static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
13051479
{
13061480
struct io_submit_state state, *statep = NULL;
@@ -1319,6 +1493,7 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
13191493

13201494
s.has_user = true;
13211495
s.needs_lock = false;
1496+
s.needs_fixed_file = false;
13221497

13231498
ret = io_submit_sqe(ctx, &s, statep);
13241499
if (ret) {
@@ -1418,8 +1593,20 @@ static int io_sqe_files_unregister(struct io_ring_ctx *ctx)
14181593
return 0;
14191594
}
14201595

1596+
static void io_sq_thread_stop(struct io_ring_ctx *ctx)
1597+
{
1598+
if (ctx->sqo_thread) {
1599+
ctx->sqo_stop = 1;
1600+
mb();
1601+
kthread_stop(ctx->sqo_thread);
1602+
ctx->sqo_thread = NULL;
1603+
}
1604+
}
1605+
14211606
static void io_finish_async(struct io_ring_ctx *ctx)
14221607
{
1608+
io_sq_thread_stop(ctx);
1609+
14231610
if (ctx->sqo_wq) {
14241611
destroy_workqueue(ctx->sqo_wq);
14251612
ctx->sqo_wq = NULL;
@@ -1583,13 +1770,47 @@ static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg,
15831770
return ret;
15841771
}
15851772

1586-
static int io_sq_offload_start(struct io_ring_ctx *ctx)
1773+
static int io_sq_offload_start(struct io_ring_ctx *ctx,
1774+
struct io_uring_params *p)
15871775
{
15881776
int ret;
15891777

1778+
init_waitqueue_head(&ctx->sqo_wait);
15901779
mmgrab(current->mm);
15911780
ctx->sqo_mm = current->mm;
15921781

1782+
ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle);
1783+
if (!ctx->sq_thread_idle)
1784+
ctx->sq_thread_idle = HZ;
1785+
1786+
ret = -EINVAL;
1787+
if (!cpu_possible(p->sq_thread_cpu))
1788+
goto err;
1789+
1790+
if (ctx->flags & IORING_SETUP_SQPOLL) {
1791+
if (p->flags & IORING_SETUP_SQ_AFF) {
1792+
int cpu;
1793+
1794+
cpu = array_index_nospec(p->sq_thread_cpu, NR_CPUS);
1795+
ctx->sqo_thread = kthread_create_on_cpu(io_sq_thread,
1796+
ctx, cpu,
1797+
"io_uring-sq");
1798+
} else {
1799+
ctx->sqo_thread = kthread_create(io_sq_thread, ctx,
1800+
"io_uring-sq");
1801+
}
1802+
if (IS_ERR(ctx->sqo_thread)) {
1803+
ret = PTR_ERR(ctx->sqo_thread);
1804+
ctx->sqo_thread = NULL;
1805+
goto err;
1806+
}
1807+
wake_up_process(ctx->sqo_thread);
1808+
} else if (p->flags & IORING_SETUP_SQ_AFF) {
1809+
/* Can't have SQ_AFF without SQPOLL */
1810+
ret = -EINVAL;
1811+
goto err;
1812+
}
1813+
15931814
/* Do QD, or 2 * CPUS, whatever is smallest */
15941815
ctx->sqo_wq = alloc_workqueue("io_ring-wq", WQ_UNBOUND | WQ_FREEZABLE,
15951816
min(ctx->sq_entries - 1, 2 * num_online_cpus()));
@@ -1600,6 +1821,7 @@ static int io_sq_offload_start(struct io_ring_ctx *ctx)
16001821

16011822
return 0;
16021823
err:
1824+
io_sq_thread_stop(ctx);
16031825
mmdrop(ctx->sqo_mm);
16041826
ctx->sqo_mm = NULL;
16051827
return ret;
@@ -1959,7 +2181,7 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
19592181
int submitted = 0;
19602182
struct fd f;
19612183

1962-
if (flags & ~IORING_ENTER_GETEVENTS)
2184+
if (flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP))
19632185
return -EINVAL;
19642186

19652187
f = fdget(fd);
@@ -1975,6 +2197,18 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
19752197
if (!percpu_ref_tryget(&ctx->refs))
19762198
goto out_fput;
19772199

2200+
/*
2201+
* For SQ polling, the thread will do all submissions and completions.
2202+
* Just return the requested submit count, and wake the thread if
2203+
* we were asked to.
2204+
*/
2205+
if (ctx->flags & IORING_SETUP_SQPOLL) {
2206+
if (flags & IORING_ENTER_SQ_WAKEUP)
2207+
wake_up(&ctx->sqo_wait);
2208+
submitted = to_submit;
2209+
goto out_ctx;
2210+
}
2211+
19782212
ret = 0;
19792213
if (to_submit) {
19802214
to_submit = min(to_submit, ctx->sq_entries);
@@ -2156,7 +2390,7 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p)
21562390
if (ret)
21572391
goto err;
21582392

2159-
ret = io_sq_offload_start(ctx);
2393+
ret = io_sq_offload_start(ctx, p);
21602394
if (ret)
21612395
goto err;
21622396

@@ -2204,7 +2438,8 @@ static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
22042438
return -EINVAL;
22052439
}
22062440

2207-
if (p.flags & ~IORING_SETUP_IOPOLL)
2441+
if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
2442+
IORING_SETUP_SQ_AFF))
22082443
return -EINVAL;
22092444

22102445
ret = io_uring_create(entries, &p);

0 commit comments

Comments
 (0)