Skip to content

Commit c544577

Browse files
author
Trond Myklebust
committed
SUNRPC: Clean up transport write space handling
Treat socket write space handling in the same way we now treat transport congestion: by denying the XPRT_LOCK until the transport signals that it has free buffer space. Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
1 parent 36bd7de commit c544577

File tree

8 files changed

+73
-82
lines changed

8 files changed

+73
-82
lines changed

include/linux/sunrpc/svc_xprt.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,6 @@ struct svc_xprt {
8484
struct sockaddr_storage xpt_remote; /* remote peer's address */
8585
size_t xpt_remotelen; /* length of address */
8686
char xpt_remotebuf[INET6_ADDRSTRLEN + 10];
87-
struct rpc_wait_queue xpt_bc_pending; /* backchannel wait queue */
8887
struct list_head xpt_users; /* callbacks on free */
8988

9089
struct net *xpt_net;

include/linux/sunrpc/xprt.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -387,8 +387,8 @@ int xprt_load_transport(const char *);
387387
void xprt_set_retrans_timeout_def(struct rpc_task *task);
388388
void xprt_set_retrans_timeout_rtt(struct rpc_task *task);
389389
void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status);
390-
void xprt_wait_for_buffer_space(struct rpc_task *task, rpc_action action);
391-
void xprt_write_space(struct rpc_xprt *xprt);
390+
void xprt_wait_for_buffer_space(struct rpc_xprt *xprt);
391+
bool xprt_write_space(struct rpc_xprt *xprt);
392392
void xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result);
393393
struct rpc_rqst * xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid);
394394
void xprt_update_rtt(struct rpc_task *task);
@@ -416,6 +416,7 @@ void xprt_unlock_connect(struct rpc_xprt *, void *);
416416
#define XPRT_CLOSING (6)
417417
#define XPRT_CONGESTED (9)
418418
#define XPRT_CWND_WAIT (10)
419+
#define XPRT_WRITE_SPACE (11)
419420

420421
static inline void xprt_set_connected(struct rpc_xprt *xprt)
421422
{

net/sunrpc/clnt.c

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1964,13 +1964,14 @@ call_transmit(struct rpc_task *task)
19641964
{
19651965
dprint_status(task);
19661966

1967+
task->tk_status = 0;
1968+
if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) {
1969+
if (!xprt_prepare_transmit(task))
1970+
return;
1971+
xprt_transmit(task);
1972+
}
19671973
task->tk_action = call_transmit_status;
1968-
if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
1969-
return;
1970-
1971-
if (!xprt_prepare_transmit(task))
1972-
return;
1973-
xprt_transmit(task);
1974+
xprt_end_transmit(task);
19741975
}
19751976

19761977
/*
@@ -1986,23 +1987,15 @@ call_transmit_status(struct rpc_task *task)
19861987
* test first.
19871988
*/
19881989
if (task->tk_status == 0) {
1989-
xprt_end_transmit(task);
19901990
xprt_request_wait_receive(task);
19911991
return;
19921992
}
19931993

19941994
switch (task->tk_status) {
19951995
default:
19961996
dprint_status(task);
1997-
xprt_end_transmit(task);
1998-
break;
1999-
case -EBADSLT:
2000-
xprt_end_transmit(task);
2001-
task->tk_action = call_transmit;
2002-
task->tk_status = 0;
20031997
break;
20041998
case -EBADMSG:
2005-
xprt_end_transmit(task);
20061999
task->tk_status = 0;
20072000
task->tk_action = call_encode;
20082001
break;
@@ -2015,6 +2008,7 @@ call_transmit_status(struct rpc_task *task)
20152008
case -ENOBUFS:
20162009
rpc_delay(task, HZ>>2);
20172010
/* fall through */
2011+
case -EBADSLT:
20182012
case -EAGAIN:
20192013
task->tk_action = call_transmit;
20202014
task->tk_status = 0;
@@ -2026,7 +2020,6 @@ call_transmit_status(struct rpc_task *task)
20262020
case -ENETUNREACH:
20272021
case -EPERM:
20282022
if (RPC_IS_SOFTCONN(task)) {
2029-
xprt_end_transmit(task);
20302023
if (!task->tk_msg.rpc_proc->p_proc)
20312024
trace_xprt_ping(task->tk_xprt,
20322025
task->tk_status);
@@ -2069,9 +2062,6 @@ call_bc_transmit(struct rpc_task *task)
20692062

20702063
xprt_transmit(task);
20712064

2072-
if (task->tk_status == -EAGAIN)
2073-
goto out_retry;
2074-
20752065
xprt_end_transmit(task);
20762066
dprint_status(task);
20772067
switch (task->tk_status) {
@@ -2087,6 +2077,8 @@ call_bc_transmit(struct rpc_task *task)
20872077
case -ENOTCONN:
20882078
case -EPIPE:
20892079
break;
2080+
case -EAGAIN:
2081+
goto out_retry;
20902082
case -ETIMEDOUT:
20912083
/*
20922084
* Problem reaching the server. Disconnect and let the

net/sunrpc/svc_xprt.c

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,6 @@ void svc_xprt_init(struct net *net, struct svc_xprt_class *xcl,
171171
mutex_init(&xprt->xpt_mutex);
172172
spin_lock_init(&xprt->xpt_lock);
173173
set_bit(XPT_BUSY, &xprt->xpt_flags);
174-
rpc_init_wait_queue(&xprt->xpt_bc_pending, "xpt_bc_pending");
175174
xprt->xpt_net = get_net(net);
176175
strcpy(xprt->xpt_remotebuf, "uninitialized");
177176
}
@@ -895,7 +894,6 @@ int svc_send(struct svc_rqst *rqstp)
895894
else
896895
len = xprt->xpt_ops->xpo_sendto(rqstp);
897896
mutex_unlock(&xprt->xpt_mutex);
898-
rpc_wake_up(&xprt->xpt_bc_pending);
899897
trace_svc_send(rqstp, len);
900898
svc_xprt_release(rqstp);
901899

net/sunrpc/xprt.c

Lines changed: 47 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,17 @@ int xprt_load_transport(const char *transport_name)
169169
}
170170
EXPORT_SYMBOL_GPL(xprt_load_transport);
171171

172+
static void xprt_clear_locked(struct rpc_xprt *xprt)
173+
{
174+
xprt->snd_task = NULL;
175+
if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state)) {
176+
smp_mb__before_atomic();
177+
clear_bit(XPRT_LOCKED, &xprt->state);
178+
smp_mb__after_atomic();
179+
} else
180+
queue_work(xprtiod_workqueue, &xprt->task_cleanup);
181+
}
182+
172183
/**
173184
* xprt_reserve_xprt - serialize write access to transports
174185
* @task: task that is requesting access to the transport
@@ -188,10 +199,14 @@ int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
188199
return 1;
189200
goto out_sleep;
190201
}
202+
if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
203+
goto out_unlock;
191204
xprt->snd_task = task;
192205

193206
return 1;
194207

208+
out_unlock:
209+
xprt_clear_locked(xprt);
195210
out_sleep:
196211
dprintk("RPC: %5u failed to lock transport %p\n",
197212
task->tk_pid, xprt);
@@ -208,17 +223,6 @@ int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
208223
}
209224
EXPORT_SYMBOL_GPL(xprt_reserve_xprt);
210225

211-
static void xprt_clear_locked(struct rpc_xprt *xprt)
212-
{
213-
xprt->snd_task = NULL;
214-
if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state)) {
215-
smp_mb__before_atomic();
216-
clear_bit(XPRT_LOCKED, &xprt->state);
217-
smp_mb__after_atomic();
218-
} else
219-
queue_work(xprtiod_workqueue, &xprt->task_cleanup);
220-
}
221-
222226
static bool
223227
xprt_need_congestion_window_wait(struct rpc_xprt *xprt)
224228
{
@@ -267,10 +271,13 @@ int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
267271
xprt->snd_task = task;
268272
return 1;
269273
}
274+
if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
275+
goto out_unlock;
270276
if (!xprt_need_congestion_window_wait(xprt)) {
271277
xprt->snd_task = task;
272278
return 1;
273279
}
280+
out_unlock:
274281
xprt_clear_locked(xprt);
275282
out_sleep:
276283
dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt);
@@ -309,17 +316,21 @@ static void __xprt_lock_write_next(struct rpc_xprt *xprt)
309316
{
310317
if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
311318
return;
312-
319+
if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
320+
goto out_unlock;
313321
if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
314322
__xprt_lock_write_func, xprt))
315323
return;
324+
out_unlock:
316325
xprt_clear_locked(xprt);
317326
}
318327

319328
static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
320329
{
321330
if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
322331
return;
332+
if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
333+
goto out_unlock;
323334
if (xprt_need_congestion_window_wait(xprt))
324335
goto out_unlock;
325336
if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
@@ -510,39 +521,46 @@ EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks);
510521

511522
/**
512523
* xprt_wait_for_buffer_space - wait for transport output buffer to clear
513-
* @task: task to be put to sleep
514-
* @action: function pointer to be executed after wait
524+
* @xprt: transport
515525
*
516526
* Note that we only set the timer for the case of RPC_IS_SOFT(), since
517527
* we don't in general want to force a socket disconnection due to
518528
* an incomplete RPC call transmission.
519529
*/
520-
void xprt_wait_for_buffer_space(struct rpc_task *task, rpc_action action)
530+
void xprt_wait_for_buffer_space(struct rpc_xprt *xprt)
521531
{
522-
struct rpc_rqst *req = task->tk_rqstp;
523-
struct rpc_xprt *xprt = req->rq_xprt;
524-
525-
task->tk_timeout = RPC_IS_SOFT(task) ? req->rq_timeout : 0;
526-
rpc_sleep_on(&xprt->pending, task, action);
532+
set_bit(XPRT_WRITE_SPACE, &xprt->state);
527533
}
528534
EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space);
529535

536+
static bool
537+
xprt_clear_write_space_locked(struct rpc_xprt *xprt)
538+
{
539+
if (test_and_clear_bit(XPRT_WRITE_SPACE, &xprt->state)) {
540+
__xprt_lock_write_next(xprt);
541+
dprintk("RPC: write space: waking waiting task on "
542+
"xprt %p\n", xprt);
543+
return true;
544+
}
545+
return false;
546+
}
547+
530548
/**
531549
* xprt_write_space - wake the task waiting for transport output buffer space
532550
* @xprt: transport with waiting tasks
533551
*
534552
* Can be called in a soft IRQ context, so xprt_write_space never sleeps.
535553
*/
536-
void xprt_write_space(struct rpc_xprt *xprt)
554+
bool xprt_write_space(struct rpc_xprt *xprt)
537555
{
556+
bool ret;
557+
558+
if (!test_bit(XPRT_WRITE_SPACE, &xprt->state))
559+
return false;
538560
spin_lock_bh(&xprt->transport_lock);
539-
if (xprt->snd_task) {
540-
dprintk("RPC: write space: waking waiting task on "
541-
"xprt %p\n", xprt);
542-
rpc_wake_up_queued_task_on_wq(xprtiod_workqueue,
543-
&xprt->pending, xprt->snd_task);
544-
}
561+
ret = xprt_clear_write_space_locked(xprt);
545562
spin_unlock_bh(&xprt->transport_lock);
563+
return ret;
546564
}
547565
EXPORT_SYMBOL_GPL(xprt_write_space);
548566

@@ -653,6 +671,7 @@ void xprt_disconnect_done(struct rpc_xprt *xprt)
653671
dprintk("RPC: disconnected transport %p\n", xprt);
654672
spin_lock_bh(&xprt->transport_lock);
655673
xprt_clear_connected(xprt);
674+
xprt_clear_write_space_locked(xprt);
656675
xprt_wake_pending_tasks(xprt, -EAGAIN);
657676
spin_unlock_bh(&xprt->transport_lock);
658677
}
@@ -1326,9 +1345,7 @@ xprt_transmit(struct rpc_task *task)
13261345
if (!xprt_request_data_received(task) ||
13271346
test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
13281347
continue;
1329-
} else if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
1330-
rpc_wake_up_queued_task(&xprt->pending, task);
1331-
else
1348+
} else if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
13321349
task->tk_status = status;
13331350
break;
13341351
}

net/sunrpc/xprtrdma/rpc_rdma.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -866,7 +866,7 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
866866
out_err:
867867
switch (ret) {
868868
case -EAGAIN:
869-
xprt_wait_for_buffer_space(rqst->rq_task, NULL);
869+
xprt_wait_for_buffer_space(rqst->rq_xprt);
870870
break;
871871
case -ENOBUFS:
872872
break;

net/sunrpc/xprtrdma/svc_rdma_backchannel.c

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -224,12 +224,7 @@ xprt_rdma_bc_send_request(struct rpc_rqst *rqst, struct rpc_task *task)
224224
dprintk("svcrdma: sending bc call with xid: %08x\n",
225225
be32_to_cpu(rqst->rq_xid));
226226

227-
if (!mutex_trylock(&sxprt->xpt_mutex)) {
228-
rpc_sleep_on(&sxprt->xpt_bc_pending, task, NULL);
229-
if (!mutex_trylock(&sxprt->xpt_mutex))
230-
return -EAGAIN;
231-
rpc_wake_up_queued_task(&sxprt->xpt_bc_pending, task);
232-
}
227+
mutex_lock(&sxprt->xpt_mutex);
233228

234229
ret = -ENOTCONN;
235230
rdma = container_of(sxprt, struct svcxprt_rdma, sc_xprt);

0 commit comments

Comments
 (0)