44
44
#include <linux/percpu.h>
45
45
#include <linux/slab.h>
46
46
#include <linux/workqueue.h>
47
+ #include <linux/kthread.h>
47
48
#include <linux/blkdev.h>
48
49
#include <linux/bvec.h>
49
50
#include <linux/net.h>
@@ -108,12 +109,16 @@ struct io_ring_ctx {
108
109
unsigned cached_sq_head ;
109
110
unsigned sq_entries ;
110
111
unsigned sq_mask ;
112
+ unsigned sq_thread_idle ;
111
113
struct io_uring_sqe * sq_sqes ;
112
114
} ____cacheline_aligned_in_smp ;
113
115
114
116
/* IO offload */
115
117
struct workqueue_struct * sqo_wq ;
118
+ struct task_struct * sqo_thread ; /* if using sq thread polling */
116
119
struct mm_struct * sqo_mm ;
120
+ wait_queue_head_t sqo_wait ;
121
+ unsigned sqo_stop ;
117
122
118
123
struct {
119
124
/* CQ ring */
@@ -168,6 +173,7 @@ struct sqe_submit {
168
173
unsigned short index ;
169
174
bool has_user ;
170
175
bool needs_lock ;
176
+ bool needs_fixed_file ;
171
177
};
172
178
173
179
struct io_kiocb {
@@ -327,6 +333,8 @@ static void io_cqring_add_event(struct io_ring_ctx *ctx, u64 ki_user_data,
327
333
328
334
if (waitqueue_active (& ctx -> wait ))
329
335
wake_up (& ctx -> wait );
336
+ if (waitqueue_active (& ctx -> sqo_wait ))
337
+ wake_up (& ctx -> sqo_wait );
330
338
}
331
339
332
340
static void io_ring_drop_ctx_refs (struct io_ring_ctx * ctx , unsigned refs )
@@ -680,9 +688,10 @@ static bool io_file_supports_async(struct file *file)
680
688
return false;
681
689
}
682
690
683
- static int io_prep_rw (struct io_kiocb * req , const struct io_uring_sqe * sqe ,
691
+ static int io_prep_rw (struct io_kiocb * req , const struct sqe_submit * s ,
684
692
bool force_nonblock , struct io_submit_state * state )
685
693
{
694
+ const struct io_uring_sqe * sqe = s -> sqe ;
686
695
struct io_ring_ctx * ctx = req -> ctx ;
687
696
struct kiocb * kiocb = & req -> rw ;
688
697
unsigned ioprio , flags ;
@@ -702,6 +711,8 @@ static int io_prep_rw(struct io_kiocb *req, const struct io_uring_sqe *sqe,
702
711
kiocb -> ki_filp = ctx -> user_files [fd ];
703
712
req -> flags |= REQ_F_FIXED_FILE ;
704
713
} else {
714
+ if (s -> needs_fixed_file )
715
+ return - EBADF ;
705
716
kiocb -> ki_filp = io_file_get (state , fd );
706
717
if (unlikely (!kiocb -> ki_filp ))
707
718
return - EBADF ;
@@ -865,7 +876,7 @@ static ssize_t io_read(struct io_kiocb *req, const struct sqe_submit *s,
865
876
struct file * file ;
866
877
ssize_t ret ;
867
878
868
- ret = io_prep_rw (req , s -> sqe , force_nonblock , state );
879
+ ret = io_prep_rw (req , s , force_nonblock , state );
869
880
if (ret )
870
881
return ret ;
871
882
file = kiocb -> ki_filp ;
@@ -909,7 +920,7 @@ static ssize_t io_write(struct io_kiocb *req, const struct sqe_submit *s,
909
920
struct file * file ;
910
921
ssize_t ret ;
911
922
912
- ret = io_prep_rw (req , s -> sqe , force_nonblock , state );
923
+ ret = io_prep_rw (req , s , force_nonblock , state );
913
924
if (ret )
914
925
return ret ;
915
926
/* Hold on to the file for -EAGAIN */
@@ -1301,6 +1312,169 @@ static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s)
1301
1312
return false;
1302
1313
}
1303
1314
1315
+ static int io_submit_sqes (struct io_ring_ctx * ctx , struct sqe_submit * sqes ,
1316
+ unsigned int nr , bool has_user , bool mm_fault )
1317
+ {
1318
+ struct io_submit_state state , * statep = NULL ;
1319
+ int ret , i , submitted = 0 ;
1320
+
1321
+ if (nr > IO_PLUG_THRESHOLD ) {
1322
+ io_submit_state_start (& state , ctx , nr );
1323
+ statep = & state ;
1324
+ }
1325
+
1326
+ for (i = 0 ; i < nr ; i ++ ) {
1327
+ if (unlikely (mm_fault )) {
1328
+ ret = - EFAULT ;
1329
+ } else {
1330
+ sqes [i ].has_user = has_user ;
1331
+ sqes [i ].needs_lock = true;
1332
+ sqes [i ].needs_fixed_file = true;
1333
+ ret = io_submit_sqe (ctx , & sqes [i ], statep );
1334
+ }
1335
+ if (!ret ) {
1336
+ submitted ++ ;
1337
+ continue ;
1338
+ }
1339
+
1340
+ io_cqring_add_event (ctx , sqes [i ].sqe -> user_data , ret , 0 );
1341
+ }
1342
+
1343
+ if (statep )
1344
+ io_submit_state_end (& state );
1345
+
1346
+ return submitted ;
1347
+ }
1348
+
1349
+ static int io_sq_thread (void * data )
1350
+ {
1351
+ struct sqe_submit sqes [IO_IOPOLL_BATCH ];
1352
+ struct io_ring_ctx * ctx = data ;
1353
+ struct mm_struct * cur_mm = NULL ;
1354
+ mm_segment_t old_fs ;
1355
+ DEFINE_WAIT (wait );
1356
+ unsigned inflight ;
1357
+ unsigned long timeout ;
1358
+
1359
+ old_fs = get_fs ();
1360
+ set_fs (USER_DS );
1361
+
1362
+ timeout = inflight = 0 ;
1363
+ while (!kthread_should_stop () && !ctx -> sqo_stop ) {
1364
+ bool all_fixed , mm_fault = false;
1365
+ int i ;
1366
+
1367
+ if (inflight ) {
1368
+ unsigned nr_events = 0 ;
1369
+
1370
+ if (ctx -> flags & IORING_SETUP_IOPOLL ) {
1371
+ /*
1372
+ * We disallow the app entering submit/complete
1373
+ * with polling, but we still need to lock the
1374
+ * ring to prevent racing with polled issue
1375
+ * that got punted to a workqueue.
1376
+ */
1377
+ mutex_lock (& ctx -> uring_lock );
1378
+ io_iopoll_check (ctx , & nr_events , 0 );
1379
+ mutex_unlock (& ctx -> uring_lock );
1380
+ } else {
1381
+ /*
1382
+ * Normal IO, just pretend everything completed.
1383
+ * We don't have to poll completions for that.
1384
+ */
1385
+ nr_events = inflight ;
1386
+ }
1387
+
1388
+ inflight -= nr_events ;
1389
+ if (!inflight )
1390
+ timeout = jiffies + ctx -> sq_thread_idle ;
1391
+ }
1392
+
1393
+ if (!io_get_sqring (ctx , & sqes [0 ])) {
1394
+ /*
1395
+ * We're polling. If we're within the defined idle
1396
+ * period, then let us spin without work before going
1397
+ * to sleep.
1398
+ */
1399
+ if (inflight || !time_after (jiffies , timeout )) {
1400
+ cpu_relax ();
1401
+ continue ;
1402
+ }
1403
+
1404
+ /*
1405
+ * Drop cur_mm before scheduling, we can't hold it for
1406
+ * long periods (or over schedule()). Do this before
1407
+ * adding ourselves to the waitqueue, as the unuse/drop
1408
+ * may sleep.
1409
+ */
1410
+ if (cur_mm ) {
1411
+ unuse_mm (cur_mm );
1412
+ mmput (cur_mm );
1413
+ cur_mm = NULL ;
1414
+ }
1415
+
1416
+ prepare_to_wait (& ctx -> sqo_wait , & wait ,
1417
+ TASK_INTERRUPTIBLE );
1418
+
1419
+ /* Tell userspace we may need a wakeup call */
1420
+ ctx -> sq_ring -> flags |= IORING_SQ_NEED_WAKEUP ;
1421
+ smp_wmb ();
1422
+
1423
+ if (!io_get_sqring (ctx , & sqes [0 ])) {
1424
+ if (kthread_should_stop ()) {
1425
+ finish_wait (& ctx -> sqo_wait , & wait );
1426
+ break ;
1427
+ }
1428
+ if (signal_pending (current ))
1429
+ flush_signals (current );
1430
+ schedule ();
1431
+ finish_wait (& ctx -> sqo_wait , & wait );
1432
+
1433
+ ctx -> sq_ring -> flags &= ~IORING_SQ_NEED_WAKEUP ;
1434
+ smp_wmb ();
1435
+ continue ;
1436
+ }
1437
+ finish_wait (& ctx -> sqo_wait , & wait );
1438
+
1439
+ ctx -> sq_ring -> flags &= ~IORING_SQ_NEED_WAKEUP ;
1440
+ smp_wmb ();
1441
+ }
1442
+
1443
+ i = 0 ;
1444
+ all_fixed = true;
1445
+ do {
1446
+ if (all_fixed && io_sqe_needs_user (sqes [i ].sqe ))
1447
+ all_fixed = false;
1448
+
1449
+ i ++ ;
1450
+ if (i == ARRAY_SIZE (sqes ))
1451
+ break ;
1452
+ } while (io_get_sqring (ctx , & sqes [i ]));
1453
+
1454
+ /* Unless all new commands are FIXED regions, grab mm */
1455
+ if (!all_fixed && !cur_mm ) {
1456
+ mm_fault = !mmget_not_zero (ctx -> sqo_mm );
1457
+ if (!mm_fault ) {
1458
+ use_mm (ctx -> sqo_mm );
1459
+ cur_mm = ctx -> sqo_mm ;
1460
+ }
1461
+ }
1462
+
1463
+ inflight += io_submit_sqes (ctx , sqes , i , cur_mm != NULL ,
1464
+ mm_fault );
1465
+
1466
+ /* Commit SQ ring head once we've consumed all SQEs */
1467
+ io_commit_sqring (ctx );
1468
+ }
1469
+
1470
+ set_fs (old_fs );
1471
+ if (cur_mm ) {
1472
+ unuse_mm (cur_mm );
1473
+ mmput (cur_mm );
1474
+ }
1475
+ return 0 ;
1476
+ }
1477
+
1304
1478
static int io_ring_submit (struct io_ring_ctx * ctx , unsigned int to_submit )
1305
1479
{
1306
1480
struct io_submit_state state , * statep = NULL ;
@@ -1319,6 +1493,7 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
1319
1493
1320
1494
s .has_user = true;
1321
1495
s .needs_lock = false;
1496
+ s .needs_fixed_file = false;
1322
1497
1323
1498
ret = io_submit_sqe (ctx , & s , statep );
1324
1499
if (ret ) {
@@ -1418,8 +1593,20 @@ static int io_sqe_files_unregister(struct io_ring_ctx *ctx)
1418
1593
return 0 ;
1419
1594
}
1420
1595
1596
+ static void io_sq_thread_stop (struct io_ring_ctx * ctx )
1597
+ {
1598
+ if (ctx -> sqo_thread ) {
1599
+ ctx -> sqo_stop = 1 ;
1600
+ mb ();
1601
+ kthread_stop (ctx -> sqo_thread );
1602
+ ctx -> sqo_thread = NULL ;
1603
+ }
1604
+ }
1605
+
1421
1606
static void io_finish_async (struct io_ring_ctx * ctx )
1422
1607
{
1608
+ io_sq_thread_stop (ctx );
1609
+
1423
1610
if (ctx -> sqo_wq ) {
1424
1611
destroy_workqueue (ctx -> sqo_wq );
1425
1612
ctx -> sqo_wq = NULL ;
@@ -1583,13 +1770,47 @@ static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg,
1583
1770
return ret ;
1584
1771
}
1585
1772
1586
- static int io_sq_offload_start (struct io_ring_ctx * ctx )
1773
+ static int io_sq_offload_start (struct io_ring_ctx * ctx ,
1774
+ struct io_uring_params * p )
1587
1775
{
1588
1776
int ret ;
1589
1777
1778
+ init_waitqueue_head (& ctx -> sqo_wait );
1590
1779
mmgrab (current -> mm );
1591
1780
ctx -> sqo_mm = current -> mm ;
1592
1781
1782
+ ctx -> sq_thread_idle = msecs_to_jiffies (p -> sq_thread_idle );
1783
+ if (!ctx -> sq_thread_idle )
1784
+ ctx -> sq_thread_idle = HZ ;
1785
+
1786
+ ret = - EINVAL ;
1787
+ if (!cpu_possible (p -> sq_thread_cpu ))
1788
+ goto err ;
1789
+
1790
+ if (ctx -> flags & IORING_SETUP_SQPOLL ) {
1791
+ if (p -> flags & IORING_SETUP_SQ_AFF ) {
1792
+ int cpu ;
1793
+
1794
+ cpu = array_index_nospec (p -> sq_thread_cpu , NR_CPUS );
1795
+ ctx -> sqo_thread = kthread_create_on_cpu (io_sq_thread ,
1796
+ ctx , cpu ,
1797
+ "io_uring-sq" );
1798
+ } else {
1799
+ ctx -> sqo_thread = kthread_create (io_sq_thread , ctx ,
1800
+ "io_uring-sq" );
1801
+ }
1802
+ if (IS_ERR (ctx -> sqo_thread )) {
1803
+ ret = PTR_ERR (ctx -> sqo_thread );
1804
+ ctx -> sqo_thread = NULL ;
1805
+ goto err ;
1806
+ }
1807
+ wake_up_process (ctx -> sqo_thread );
1808
+ } else if (p -> flags & IORING_SETUP_SQ_AFF ) {
1809
+ /* Can't have SQ_AFF without SQPOLL */
1810
+ ret = - EINVAL ;
1811
+ goto err ;
1812
+ }
1813
+
1593
1814
/* Do QD, or 2 * CPUS, whatever is smallest */
1594
1815
ctx -> sqo_wq = alloc_workqueue ("io_ring-wq" , WQ_UNBOUND | WQ_FREEZABLE ,
1595
1816
min (ctx -> sq_entries - 1 , 2 * num_online_cpus ()));
@@ -1600,6 +1821,7 @@ static int io_sq_offload_start(struct io_ring_ctx *ctx)
1600
1821
1601
1822
return 0 ;
1602
1823
err :
1824
+ io_sq_thread_stop (ctx );
1603
1825
mmdrop (ctx -> sqo_mm );
1604
1826
ctx -> sqo_mm = NULL ;
1605
1827
return ret ;
@@ -1959,7 +2181,7 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
1959
2181
int submitted = 0 ;
1960
2182
struct fd f ;
1961
2183
1962
- if (flags & ~IORING_ENTER_GETEVENTS )
2184
+ if (flags & ~( IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP ) )
1963
2185
return - EINVAL ;
1964
2186
1965
2187
f = fdget (fd );
@@ -1975,6 +2197,18 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
1975
2197
if (!percpu_ref_tryget (& ctx -> refs ))
1976
2198
goto out_fput ;
1977
2199
2200
+ /*
2201
+ * For SQ polling, the thread will do all submissions and completions.
2202
+ * Just return the requested submit count, and wake the thread if
2203
+ * we were asked to.
2204
+ */
2205
+ if (ctx -> flags & IORING_SETUP_SQPOLL ) {
2206
+ if (flags & IORING_ENTER_SQ_WAKEUP )
2207
+ wake_up (& ctx -> sqo_wait );
2208
+ submitted = to_submit ;
2209
+ goto out_ctx ;
2210
+ }
2211
+
1978
2212
ret = 0 ;
1979
2213
if (to_submit ) {
1980
2214
to_submit = min (to_submit , ctx -> sq_entries );
@@ -2156,7 +2390,7 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p)
2156
2390
if (ret )
2157
2391
goto err ;
2158
2392
2159
- ret = io_sq_offload_start (ctx );
2393
+ ret = io_sq_offload_start (ctx , p );
2160
2394
if (ret )
2161
2395
goto err ;
2162
2396
@@ -2204,7 +2438,8 @@ static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
2204
2438
return - EINVAL ;
2205
2439
}
2206
2440
2207
- if (p .flags & ~IORING_SETUP_IOPOLL )
2441
+ if (p .flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
2442
+ IORING_SETUP_SQ_AFF ))
2208
2443
return - EINVAL ;
2209
2444
2210
2445
ret = io_uring_create (entries , & p );
0 commit comments