Skip to content

Commit 89f90fe

Browse files
author
Trond Myklebust
committed
SUNRPC: Allow calls to xprt_transmit() to drain the entire transmit queue
Rather than forcing each and every RPC task to grab the socket write lock in order to send itself, we allow whichever task is holding the write lock to attempt to drain the entire transmit queue. Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
1 parent 86aeee0 commit 89f90fe

File tree

1 file changed

+60
-11
lines changed

1 file changed

+60
-11
lines changed

net/sunrpc/xprt.c

Lines changed: 60 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1224,27 +1224,34 @@ void xprt_end_transmit(struct rpc_task *task)
12241224
}
12251225

12261226
/**
1227-
* xprt_transmit - send an RPC request on a transport
1228-
* @task: controlling RPC task
1227+
* xprt_request_transmit - send an RPC request on a transport
1228+
* @req: pointer to request to transmit
1229+
* @snd_task: RPC task that owns the transport lock
12291230
*
1230-
* We have to copy the iovec because sendmsg fiddles with its contents.
1231+
* This performs the transmission of a single request.
1232+
* Note that if the request is not the same as snd_task, then it
1233+
* does need to be pinned.
1234+
* Returns '0' on success.
12311235
*/
1232-
void xprt_transmit(struct rpc_task *task)
1236+
static int
1237+
xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task)
12331238
{
1234-
struct rpc_rqst *req = task->tk_rqstp;
1235-
struct rpc_xprt *xprt = req->rq_xprt;
1239+
struct rpc_xprt *xprt = req->rq_xprt;
1240+
struct rpc_task *task = req->rq_task;
12361241
unsigned int connect_cookie;
12371242
int is_retrans = RPC_WAS_SENT(task);
12381243
int status;
12391244

12401245
dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen);
12411246

12421247
if (!req->rq_bytes_sent) {
1243-
if (xprt_request_data_received(task))
1248+
if (xprt_request_data_received(task)) {
1249+
status = 0;
12441250
goto out_dequeue;
1251+
}
12451252
/* Verify that our message lies in the RPCSEC_GSS window */
12461253
if (rpcauth_xmit_need_reencode(task)) {
1247-
task->tk_status = -EBADMSG;
1254+
status = -EBADMSG;
12481255
goto out_dequeue;
12491256
}
12501257
}
@@ -1257,12 +1264,11 @@ void xprt_transmit(struct rpc_task *task)
12571264
req->rq_ntrans++;
12581265

12591266
connect_cookie = xprt->connect_cookie;
1260-
status = xprt->ops->send_request(req, task);
1267+
status = xprt->ops->send_request(req, snd_task);
12611268
trace_xprt_transmit(xprt, req->rq_xid, status);
12621269
if (status != 0) {
12631270
req->rq_ntrans--;
1264-
task->tk_status = status;
1265-
return;
1271+
return status;
12661272
}
12671273

12681274
if (is_retrans)
@@ -1284,6 +1290,49 @@ void xprt_transmit(struct rpc_task *task)
12841290
req->rq_connect_cookie = connect_cookie;
12851291
out_dequeue:
12861292
xprt_request_dequeue_transmit(task);
1293+
rpc_wake_up_queued_task_set_status(&xprt->sending, task, status);
1294+
return status;
1295+
}
1296+
1297+
/**
1298+
* xprt_transmit - send an RPC request on a transport
1299+
* @task: controlling RPC task
1300+
*
1301+
* Attempts to drain the transmit queue. On exit, either the transport
1302+
* signalled an error that needs to be handled before transmission can
1303+
* resume, or @task finished transmitting, and detected that it already
1304+
* received a reply.
1305+
*/
1306+
void
1307+
xprt_transmit(struct rpc_task *task)
1308+
{
1309+
struct rpc_rqst *next, *req = task->tk_rqstp;
1310+
struct rpc_xprt *xprt = req->rq_xprt;
1311+
int status;
1312+
1313+
spin_lock(&xprt->queue_lock);
1314+
while (!list_empty(&xprt->xmit_queue)) {
1315+
next = list_first_entry(&xprt->xmit_queue,
1316+
struct rpc_rqst, rq_xmit);
1317+
xprt_pin_rqst(next);
1318+
spin_unlock(&xprt->queue_lock);
1319+
status = xprt_request_transmit(next, task);
1320+
if (status == -EBADMSG && next != req)
1321+
status = 0;
1322+
cond_resched();
1323+
spin_lock(&xprt->queue_lock);
1324+
xprt_unpin_rqst(next);
1325+
if (status == 0) {
1326+
if (!xprt_request_data_received(task) ||
1327+
test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
1328+
continue;
1329+
} else if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
1330+
rpc_wake_up_queued_task(&xprt->pending, task);
1331+
else
1332+
task->tk_status = status;
1333+
break;
1334+
}
1335+
spin_unlock(&xprt->queue_lock);
12871336
}
12881337

12891338
static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)

0 commit comments

Comments
 (0)