68
68
static void xprt_init (struct rpc_xprt * xprt , struct net * net );
69
69
static __be32 xprt_alloc_xid (struct rpc_xprt * xprt );
70
70
static void xprt_connect_status (struct rpc_task * task );
71
- static int __xprt_get_cong (struct rpc_xprt * , struct rpc_task * );
72
- static void __xprt_put_cong (struct rpc_xprt * , struct rpc_rqst * );
73
71
static void xprt_destroy (struct rpc_xprt * xprt );
74
72
75
73
static DEFINE_SPINLOCK (xprt_list_lock );
@@ -221,13 +219,39 @@ static void xprt_clear_locked(struct rpc_xprt *xprt)
221
219
queue_work (xprtiod_workqueue , & xprt -> task_cleanup );
222
220
}
223
221
222
+ static bool
223
+ xprt_need_congestion_window_wait (struct rpc_xprt * xprt )
224
+ {
225
+ return test_bit (XPRT_CWND_WAIT , & xprt -> state );
226
+ }
227
+
228
+ static void
229
+ xprt_set_congestion_window_wait (struct rpc_xprt * xprt )
230
+ {
231
+ if (!list_empty (& xprt -> xmit_queue )) {
232
+ /* Peek at head of queue to see if it can make progress */
233
+ if (list_first_entry (& xprt -> xmit_queue , struct rpc_rqst ,
234
+ rq_xmit )-> rq_cong )
235
+ return ;
236
+ }
237
+ set_bit (XPRT_CWND_WAIT , & xprt -> state );
238
+ }
239
+
240
+ static void
241
+ xprt_test_and_clear_congestion_window_wait (struct rpc_xprt * xprt )
242
+ {
243
+ if (!RPCXPRT_CONGESTED (xprt ))
244
+ clear_bit (XPRT_CWND_WAIT , & xprt -> state );
245
+ }
246
+
224
247
/*
225
248
* xprt_reserve_xprt_cong - serialize write access to transports
226
249
* @task: task that is requesting access to the transport
227
250
*
228
251
* Same as xprt_reserve_xprt, but Van Jacobson congestion control is
229
252
* integrated into the decision of whether a request is allowed to be
230
253
* woken up and given access to the transport.
254
+ * Note that the lock is only granted if we know there are free slots.
231
255
*/
232
256
int xprt_reserve_xprt_cong (struct rpc_xprt * xprt , struct rpc_task * task )
233
257
{
@@ -243,14 +267,12 @@ int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
243
267
xprt -> snd_task = task ;
244
268
return 1 ;
245
269
}
246
- if (__xprt_get_cong (xprt , task )) {
270
+ if (! xprt_need_congestion_window_wait (xprt )) {
247
271
xprt -> snd_task = task ;
248
272
return 1 ;
249
273
}
250
274
xprt_clear_locked (xprt );
251
275
out_sleep :
252
- if (req )
253
- __xprt_put_cong (xprt , req );
254
276
dprintk ("RPC: %5u failed to lock transport %p\n" , task -> tk_pid , xprt );
255
277
task -> tk_timeout = 0 ;
256
278
task -> tk_status = - EAGAIN ;
@@ -294,32 +316,14 @@ static void __xprt_lock_write_next(struct rpc_xprt *xprt)
294
316
xprt_clear_locked (xprt );
295
317
}
296
318
297
- static bool __xprt_lock_write_cong_func (struct rpc_task * task , void * data )
298
- {
299
- struct rpc_xprt * xprt = data ;
300
- struct rpc_rqst * req ;
301
-
302
- req = task -> tk_rqstp ;
303
- if (req == NULL ) {
304
- xprt -> snd_task = task ;
305
- return true;
306
- }
307
- if (__xprt_get_cong (xprt , task )) {
308
- xprt -> snd_task = task ;
309
- req -> rq_ntrans ++ ;
310
- return true;
311
- }
312
- return false;
313
- }
314
-
315
319
static void __xprt_lock_write_next_cong (struct rpc_xprt * xprt )
316
320
{
317
321
if (test_and_set_bit (XPRT_LOCKED , & xprt -> state ))
318
322
return ;
319
- if (RPCXPRT_CONGESTED (xprt ))
323
+ if (xprt_need_congestion_window_wait (xprt ))
320
324
goto out_unlock ;
321
325
if (rpc_wake_up_first_on_wq (xprtiod_workqueue , & xprt -> sending ,
322
- __xprt_lock_write_cong_func , xprt ))
326
+ __xprt_lock_write_func , xprt ))
323
327
return ;
324
328
out_unlock :
325
329
xprt_clear_locked (xprt );
@@ -370,16 +374,16 @@ static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *ta
370
374
* overflowed. Put the task to sleep if this is the case.
371
375
*/
372
376
static int
373
- __xprt_get_cong (struct rpc_xprt * xprt , struct rpc_task * task )
377
+ __xprt_get_cong (struct rpc_xprt * xprt , struct rpc_rqst * req )
374
378
{
375
- struct rpc_rqst * req = task -> tk_rqstp ;
376
-
377
379
if (req -> rq_cong )
378
380
return 1 ;
379
381
dprintk ("RPC: %5u xprt_cwnd_limited cong = %lu cwnd = %lu\n" ,
380
- task -> tk_pid , xprt -> cong , xprt -> cwnd );
381
- if (RPCXPRT_CONGESTED (xprt ))
382
+ req -> rq_task -> tk_pid , xprt -> cong , xprt -> cwnd );
383
+ if (RPCXPRT_CONGESTED (xprt )) {
384
+ xprt_set_congestion_window_wait (xprt );
382
385
return 0 ;
386
+ }
383
387
req -> rq_cong = 1 ;
384
388
xprt -> cong += RPC_CWNDSCALE ;
385
389
return 1 ;
@@ -396,9 +400,31 @@ __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
396
400
return ;
397
401
req -> rq_cong = 0 ;
398
402
xprt -> cong -= RPC_CWNDSCALE ;
403
+ xprt_test_and_clear_congestion_window_wait (xprt );
399
404
__xprt_lock_write_next_cong (xprt );
400
405
}
401
406
407
+ /**
408
+ * xprt_request_get_cong - Request congestion control credits
409
+ * @xprt: pointer to transport
410
+ * @req: pointer to RPC request
411
+ *
412
+ * Useful for transports that require congestion control.
413
+ */
414
+ bool
415
+ xprt_request_get_cong (struct rpc_xprt * xprt , struct rpc_rqst * req )
416
+ {
417
+ bool ret = false;
418
+
419
+ if (req -> rq_cong )
420
+ return true;
421
+ spin_lock_bh (& xprt -> transport_lock );
422
+ ret = __xprt_get_cong (xprt , req ) != 0 ;
423
+ spin_unlock_bh (& xprt -> transport_lock );
424
+ return ret ;
425
+ }
426
+ EXPORT_SYMBOL_GPL (xprt_request_get_cong );
427
+
402
428
/**
403
429
* xprt_release_rqst_cong - housekeeping when request is complete
404
430
* @task: RPC request that recently completed
@@ -413,6 +439,20 @@ void xprt_release_rqst_cong(struct rpc_task *task)
413
439
}
414
440
EXPORT_SYMBOL_GPL (xprt_release_rqst_cong );
415
441
442
+ /*
443
+ * Clear the congestion window wait flag and wake up the next
444
+ * entry on xprt->sending
445
+ */
446
+ static void
447
+ xprt_clear_congestion_window_wait (struct rpc_xprt * xprt )
448
+ {
449
+ if (test_and_clear_bit (XPRT_CWND_WAIT , & xprt -> state )) {
450
+ spin_lock_bh (& xprt -> transport_lock );
451
+ __xprt_lock_write_next_cong (xprt );
452
+ spin_unlock_bh (& xprt -> transport_lock );
453
+ }
454
+ }
455
+
416
456
/**
417
457
* xprt_adjust_cwnd - adjust transport congestion window
418
458
* @xprt: pointer to xprt
@@ -1058,12 +1098,28 @@ xprt_request_enqueue_transmit(struct rpc_task *task)
1058
1098
1059
1099
if (xprt_request_need_enqueue_transmit (task , req )) {
1060
1100
spin_lock (& xprt -> queue_lock );
1061
- list_for_each_entry (pos , & xprt -> xmit_queue , rq_xmit ) {
1062
- if (pos -> rq_task -> tk_owner != task -> tk_owner )
1063
- continue ;
1064
- list_add_tail (& req -> rq_xmit2 , & pos -> rq_xmit2 );
1065
- INIT_LIST_HEAD (& req -> rq_xmit );
1066
- goto out ;
1101
+ /*
1102
+ * Requests that carry congestion control credits are added
1103
+ * to the head of the list to avoid starvation issues.
1104
+ */
1105
+ if (req -> rq_cong ) {
1106
+ xprt_clear_congestion_window_wait (xprt );
1107
+ list_for_each_entry (pos , & xprt -> xmit_queue , rq_xmit ) {
1108
+ if (pos -> rq_cong )
1109
+ continue ;
1110
+ /* Note: req is added _before_ pos */
1111
+ list_add_tail (& req -> rq_xmit , & pos -> rq_xmit );
1112
+ INIT_LIST_HEAD (& req -> rq_xmit2 );
1113
+ goto out ;
1114
+ }
1115
+ } else {
1116
+ list_for_each_entry (pos , & xprt -> xmit_queue , rq_xmit ) {
1117
+ if (pos -> rq_task -> tk_owner != task -> tk_owner )
1118
+ continue ;
1119
+ list_add_tail (& req -> rq_xmit2 , & pos -> rq_xmit2 );
1120
+ INIT_LIST_HEAD (& req -> rq_xmit );
1121
+ goto out ;
1122
+ }
1067
1123
}
1068
1124
list_add_tail (& req -> rq_xmit , & xprt -> xmit_queue );
1069
1125
INIT_LIST_HEAD (& req -> rq_xmit2 );
0 commit comments