Skip to content

Commit 75891f5

Browse files
author
Trond Myklebust
committed
SUNRPC: Support for congestion control when queuing is enabled
Both RDMA and UDP transports require the request to get a "congestion control" credit before they can be transmitted. Right now, this is done when the request locks the socket. We'd like it to happen when a request attempts to be transmitted for the first time. In order to support retransmission of requests that already hold such credits, we also want to ensure that they get queued first, so that we don't deadlock with requests that have yet to obtain a credit. Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
1 parent 918f3c1 commit 75891f5

File tree

6 files changed

+109
-36
lines changed

6 files changed

+109
-36
lines changed

include/linux/sunrpc/xprt.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,7 @@ void xprt_complete_rqst(struct rpc_task *task, int copied);
397397
void xprt_pin_rqst(struct rpc_rqst *req);
398398
void xprt_unpin_rqst(struct rpc_rqst *req);
399399
void xprt_release_rqst_cong(struct rpc_task *task);
400+
bool xprt_request_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req);
400401
void xprt_disconnect_done(struct rpc_xprt *xprt);
401402
void xprt_force_disconnect(struct rpc_xprt *xprt);
402403
void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie);
@@ -415,6 +416,7 @@ void xprt_unlock_connect(struct rpc_xprt *, void *);
415416
#define XPRT_BINDING (5)
416417
#define XPRT_CLOSING (6)
417418
#define XPRT_CONGESTED (9)
419+
#define XPRT_CWND_WAIT (10)
418420

419421
static inline void xprt_set_connected(struct rpc_xprt *xprt)
420422
{

net/sunrpc/clnt.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1996,6 +1996,11 @@ call_transmit_status(struct rpc_task *task)
19961996
dprint_status(task);
19971997
xprt_end_transmit(task);
19981998
break;
1999+
case -EBADSLT:
2000+
xprt_end_transmit(task);
2001+
task->tk_action = call_transmit;
2002+
task->tk_status = 0;
2003+
break;
19992004
case -EBADMSG:
20002005
xprt_end_transmit(task);
20012006
task->tk_status = 0;

net/sunrpc/xprt.c

Lines changed: 92 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,6 @@
6868
static void xprt_init(struct rpc_xprt *xprt, struct net *net);
6969
static __be32 xprt_alloc_xid(struct rpc_xprt *xprt);
7070
static void xprt_connect_status(struct rpc_task *task);
71-
static int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);
72-
static void __xprt_put_cong(struct rpc_xprt *, struct rpc_rqst *);
7371
static void xprt_destroy(struct rpc_xprt *xprt);
7472

7573
static DEFINE_SPINLOCK(xprt_list_lock);
@@ -221,13 +219,39 @@ static void xprt_clear_locked(struct rpc_xprt *xprt)
221219
queue_work(xprtiod_workqueue, &xprt->task_cleanup);
222220
}
223221

222+
static bool
223+
xprt_need_congestion_window_wait(struct rpc_xprt *xprt)
224+
{
225+
return test_bit(XPRT_CWND_WAIT, &xprt->state);
226+
}
227+
228+
static void
229+
xprt_set_congestion_window_wait(struct rpc_xprt *xprt)
230+
{
231+
if (!list_empty(&xprt->xmit_queue)) {
232+
/* Peek at head of queue to see if it can make progress */
233+
if (list_first_entry(&xprt->xmit_queue, struct rpc_rqst,
234+
rq_xmit)->rq_cong)
235+
return;
236+
}
237+
set_bit(XPRT_CWND_WAIT, &xprt->state);
238+
}
239+
240+
static void
241+
xprt_test_and_clear_congestion_window_wait(struct rpc_xprt *xprt)
242+
{
243+
if (!RPCXPRT_CONGESTED(xprt))
244+
clear_bit(XPRT_CWND_WAIT, &xprt->state);
245+
}
246+
224247
/*
225248
* xprt_reserve_xprt_cong - serialize write access to transports
226249
* @task: task that is requesting access to the transport
227250
*
228251
* Same as xprt_reserve_xprt, but Van Jacobson congestion control is
229252
* integrated into the decision of whether a request is allowed to be
230253
* woken up and given access to the transport.
254+
* Note that the lock is only granted if we know there are free slots.
231255
*/
232256
int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
233257
{
@@ -243,14 +267,12 @@ int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
243267
xprt->snd_task = task;
244268
return 1;
245269
}
246-
if (__xprt_get_cong(xprt, task)) {
270+
if (!xprt_need_congestion_window_wait(xprt)) {
247271
xprt->snd_task = task;
248272
return 1;
249273
}
250274
xprt_clear_locked(xprt);
251275
out_sleep:
252-
if (req)
253-
__xprt_put_cong(xprt, req);
254276
dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt);
255277
task->tk_timeout = 0;
256278
task->tk_status = -EAGAIN;
@@ -294,32 +316,14 @@ static void __xprt_lock_write_next(struct rpc_xprt *xprt)
294316
xprt_clear_locked(xprt);
295317
}
296318

297-
static bool __xprt_lock_write_cong_func(struct rpc_task *task, void *data)
298-
{
299-
struct rpc_xprt *xprt = data;
300-
struct rpc_rqst *req;
301-
302-
req = task->tk_rqstp;
303-
if (req == NULL) {
304-
xprt->snd_task = task;
305-
return true;
306-
}
307-
if (__xprt_get_cong(xprt, task)) {
308-
xprt->snd_task = task;
309-
req->rq_ntrans++;
310-
return true;
311-
}
312-
return false;
313-
}
314-
315319
static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
316320
{
317321
if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
318322
return;
319-
if (RPCXPRT_CONGESTED(xprt))
323+
if (xprt_need_congestion_window_wait(xprt))
320324
goto out_unlock;
321325
if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
322-
__xprt_lock_write_cong_func, xprt))
326+
__xprt_lock_write_func, xprt))
323327
return;
324328
out_unlock:
325329
xprt_clear_locked(xprt);
@@ -370,16 +374,16 @@ static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *ta
370374
* overflowed. Put the task to sleep if this is the case.
371375
*/
372376
static int
373-
__xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task)
377+
__xprt_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
374378
{
375-
struct rpc_rqst *req = task->tk_rqstp;
376-
377379
if (req->rq_cong)
378380
return 1;
379381
dprintk("RPC: %5u xprt_cwnd_limited cong = %lu cwnd = %lu\n",
380-
task->tk_pid, xprt->cong, xprt->cwnd);
381-
if (RPCXPRT_CONGESTED(xprt))
382+
req->rq_task->tk_pid, xprt->cong, xprt->cwnd);
383+
if (RPCXPRT_CONGESTED(xprt)) {
384+
xprt_set_congestion_window_wait(xprt);
382385
return 0;
386+
}
383387
req->rq_cong = 1;
384388
xprt->cong += RPC_CWNDSCALE;
385389
return 1;
@@ -396,9 +400,31 @@ __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
396400
return;
397401
req->rq_cong = 0;
398402
xprt->cong -= RPC_CWNDSCALE;
403+
xprt_test_and_clear_congestion_window_wait(xprt);
399404
__xprt_lock_write_next_cong(xprt);
400405
}
401406

407+
/**
408+
* xprt_request_get_cong - Request congestion control credits
409+
* @xprt: pointer to transport
410+
* @req: pointer to RPC request
411+
*
412+
* Useful for transports that require congestion control.
413+
*/
414+
bool
415+
xprt_request_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
416+
{
417+
bool ret = false;
418+
419+
if (req->rq_cong)
420+
return true;
421+
spin_lock_bh(&xprt->transport_lock);
422+
ret = __xprt_get_cong(xprt, req) != 0;
423+
spin_unlock_bh(&xprt->transport_lock);
424+
return ret;
425+
}
426+
EXPORT_SYMBOL_GPL(xprt_request_get_cong);
427+
402428
/**
403429
* xprt_release_rqst_cong - housekeeping when request is complete
404430
* @task: RPC request that recently completed
@@ -413,6 +439,20 @@ void xprt_release_rqst_cong(struct rpc_task *task)
413439
}
414440
EXPORT_SYMBOL_GPL(xprt_release_rqst_cong);
415441

442+
/*
443+
* Clear the congestion window wait flag and wake up the next
444+
* entry on xprt->sending
445+
*/
446+
static void
447+
xprt_clear_congestion_window_wait(struct rpc_xprt *xprt)
448+
{
449+
if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) {
450+
spin_lock_bh(&xprt->transport_lock);
451+
__xprt_lock_write_next_cong(xprt);
452+
spin_unlock_bh(&xprt->transport_lock);
453+
}
454+
}
455+
416456
/**
417457
* xprt_adjust_cwnd - adjust transport congestion window
418458
* @xprt: pointer to xprt
@@ -1058,12 +1098,28 @@ xprt_request_enqueue_transmit(struct rpc_task *task)
10581098

10591099
if (xprt_request_need_enqueue_transmit(task, req)) {
10601100
spin_lock(&xprt->queue_lock);
1061-
list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) {
1062-
if (pos->rq_task->tk_owner != task->tk_owner)
1063-
continue;
1064-
list_add_tail(&req->rq_xmit2, &pos->rq_xmit2);
1065-
INIT_LIST_HEAD(&req->rq_xmit);
1066-
goto out;
1101+
/*
1102+
* Requests that carry congestion control credits are added
1103+
* to the head of the list to avoid starvation issues.
1104+
*/
1105+
if (req->rq_cong) {
1106+
xprt_clear_congestion_window_wait(xprt);
1107+
list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) {
1108+
if (pos->rq_cong)
1109+
continue;
1110+
/* Note: req is added _before_ pos */
1111+
list_add_tail(&req->rq_xmit, &pos->rq_xmit);
1112+
INIT_LIST_HEAD(&req->rq_xmit2);
1113+
goto out;
1114+
}
1115+
} else {
1116+
list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) {
1117+
if (pos->rq_task->tk_owner != task->tk_owner)
1118+
continue;
1119+
list_add_tail(&req->rq_xmit2, &pos->rq_xmit2);
1120+
INIT_LIST_HEAD(&req->rq_xmit);
1121+
goto out;
1122+
}
10671123
}
10681124
list_add_tail(&req->rq_xmit, &xprt->xmit_queue);
10691125
INIT_LIST_HEAD(&req->rq_xmit2);

net/sunrpc/xprtrdma/backchannel.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,9 @@ int xprt_rdma_bc_send_reply(struct rpc_rqst *rqst)
200200
if (!xprt_connected(rqst->rq_xprt))
201201
goto drop_connection;
202202

203+
if (!xprt_request_get_cong(rqst->rq_xprt, rqst))
204+
return -EBADSLT;
205+
203206
rc = rpcrdma_bc_marshal_reply(rqst);
204207
if (rc < 0)
205208
goto failed_marshal;

net/sunrpc/xprtrdma/transport.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -721,6 +721,9 @@ xprt_rdma_send_request(struct rpc_rqst *rqst, struct rpc_task *task)
721721
if (!xprt_connected(xprt))
722722
goto drop_connection;
723723

724+
if (!xprt_request_get_cong(xprt, rqst))
725+
return -EBADSLT;
726+
724727
rc = rpcrdma_marshal_req(r_xprt, rqst);
725728
if (rc < 0)
726729
goto failed_marshal;

net/sunrpc/xprtsock.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -609,6 +609,10 @@ static int xs_udp_send_request(struct rpc_rqst *req, struct rpc_task *task)
609609

610610
if (!xprt_bound(xprt))
611611
return -ENOTCONN;
612+
613+
if (!xprt_request_get_cong(xprt, req))
614+
return -EBADSLT;
615+
612616
req->rq_xtime = ktime_get();
613617
status = xs_sendpages(transport->sock, xs_addr(xprt), xprt->addrlen,
614618
xdr, 0, true, &sent);

0 commit comments

Comments
 (0)