Skip to content

Commit e1352c9

Browse files
chuckleveramschuma-ntap
authored andcommitted
xprtrdma: Refactor rpcrdma_reply_handler some more
Clean up: I'd like to be able to invoke the tail of rpcrdma_reply_handler in two different places. Split the tail out into its own helper function. Signed-off-by: Chuck Lever <chuck.lever@oracle.com> Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
1 parent 5381e0e commit e1352c9

File tree

2 files changed

+69
-57
lines changed

2 files changed

+69
-57
lines changed

net/sunrpc/xprtrdma/rpc_rdma.c

Lines changed: 58 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1211,6 +1211,60 @@ rpcrdma_decode_error(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep,
12111211
return -EREMOTEIO;
12121212
}
12131213

1214+
/* Perform XID lookup, reconstruction of the RPC reply, and
1215+
* RPC completion while holding the transport lock to ensure
1216+
* the rep, rqst, and rq_task pointers remain stable.
1217+
*/
1218+
void rpcrdma_complete_rqst(struct rpcrdma_rep *rep)
1219+
{
1220+
struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
1221+
struct rpc_xprt *xprt = &r_xprt->rx_xprt;
1222+
struct rpc_rqst *rqst = rep->rr_rqst;
1223+
unsigned long cwnd;
1224+
int status;
1225+
1226+
xprt->reestablish_timeout = 0;
1227+
1228+
switch (rep->rr_proc) {
1229+
case rdma_msg:
1230+
status = rpcrdma_decode_msg(r_xprt, rep, rqst);
1231+
break;
1232+
case rdma_nomsg:
1233+
status = rpcrdma_decode_nomsg(r_xprt, rep);
1234+
break;
1235+
case rdma_error:
1236+
status = rpcrdma_decode_error(r_xprt, rep, rqst);
1237+
break;
1238+
default:
1239+
status = -EIO;
1240+
}
1241+
if (status < 0)
1242+
goto out_badheader;
1243+
1244+
out:
1245+
spin_lock(&xprt->recv_lock);
1246+
cwnd = xprt->cwnd;
1247+
xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
1248+
if (xprt->cwnd > cwnd)
1249+
xprt_release_rqst_cong(rqst->rq_task);
1250+
1251+
xprt_complete_rqst(rqst->rq_task, status);
1252+
xprt_unpin_rqst(rqst);
1253+
spin_unlock(&xprt->recv_lock);
1254+
return;
1255+
1256+
/* If the incoming reply terminated a pending RPC, the next
1257+
* RPC call will post a replacement receive buffer as it is
1258+
* being marshaled.
1259+
*/
1260+
out_badheader:
1261+
dprintk("RPC: %5u %s: invalid rpcrdma reply (type %u)\n",
1262+
rqst->rq_task->tk_pid, __func__, be32_to_cpu(rep->rr_proc));
1263+
r_xprt->rx_stats.bad_reply_count++;
1264+
status = -EIO;
1265+
goto out;
1266+
}
1267+
12141268
/* Process received RPC/RDMA messages.
12151269
*
12161270
* Errors must result in the RPC task either being awakened, or
@@ -1225,8 +1279,6 @@ rpcrdma_reply_handler(struct work_struct *work)
12251279
struct rpc_xprt *xprt = &r_xprt->rx_xprt;
12261280
struct rpcrdma_req *req;
12271281
struct rpc_rqst *rqst;
1228-
unsigned long cwnd;
1229-
int status;
12301282
__be32 *p;
12311283

12321284
dprintk("RPC: %s: incoming rep %p\n", __func__, rep);
@@ -1263,6 +1315,7 @@ rpcrdma_reply_handler(struct work_struct *work)
12631315
spin_unlock(&xprt->recv_lock);
12641316
req = rpcr_to_rdmar(rqst);
12651317
req->rl_reply = rep;
1318+
rep->rr_rqst = rqst;
12661319

12671320
dprintk("RPC: %s: reply %p completes request %p (xid 0x%08x)\n",
12681321
__func__, rep, req, be32_to_cpu(rep->rr_xid));
@@ -1280,36 +1333,7 @@ rpcrdma_reply_handler(struct work_struct *work)
12801333
&req->rl_registered);
12811334
}
12821335

1283-
xprt->reestablish_timeout = 0;
1284-
1285-
switch (rep->rr_proc) {
1286-
case rdma_msg:
1287-
status = rpcrdma_decode_msg(r_xprt, rep, rqst);
1288-
break;
1289-
case rdma_nomsg:
1290-
status = rpcrdma_decode_nomsg(r_xprt, rep);
1291-
break;
1292-
case rdma_error:
1293-
status = rpcrdma_decode_error(r_xprt, rep, rqst);
1294-
break;
1295-
default:
1296-
status = -EIO;
1297-
}
1298-
if (status < 0)
1299-
goto out_badheader;
1300-
1301-
out:
1302-
spin_lock(&xprt->recv_lock);
1303-
cwnd = xprt->cwnd;
1304-
xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
1305-
if (xprt->cwnd > cwnd)
1306-
xprt_release_rqst_cong(rqst->rq_task);
1307-
1308-
xprt_complete_rqst(rqst->rq_task, status);
1309-
xprt_unpin_rqst(rqst);
1310-
spin_unlock(&xprt->recv_lock);
1311-
dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
1312-
__func__, xprt, rqst, status);
1336+
rpcrdma_complete_rqst(rep);
13131337
return;
13141338

13151339
out_badstatus:
@@ -1325,20 +1349,8 @@ rpcrdma_reply_handler(struct work_struct *work)
13251349
__func__, be32_to_cpu(rep->rr_vers));
13261350
goto repost;
13271351

1328-
/* If the incoming reply terminated a pending RPC, the next
1329-
* RPC call will post a replacement receive buffer as it is
1330-
* being marshaled.
1331-
*/
1332-
out_badheader:
1333-
dprintk("RPC: %5u %s: invalid rpcrdma reply (type %u)\n",
1334-
rqst->rq_task->tk_pid, __func__, be32_to_cpu(rep->rr_proc));
1335-
r_xprt->rx_stats.bad_reply_count++;
1336-
status = -EIO;
1337-
goto out;
1338-
1339-
/* The req was still available, but by the time the recv_lock
1340-
* was acquired, the rqst and task had been released. Thus the RPC
1341-
* has already been terminated.
1352+
/* The RPC transaction has already been terminated, or the header
1353+
* is corrupt.
13421354
*/
13431355
out_norqst:
13441356
spin_unlock(&xprt->recv_lock);
@@ -1348,7 +1360,6 @@ rpcrdma_reply_handler(struct work_struct *work)
13481360

13491361
out_shortreply:
13501362
dprintk("RPC: %s: short/invalid reply\n", __func__);
1351-
goto repost;
13521363

13531364
/* If no pending RPC transaction was matched, post a replacement
13541365
* receive buffer before returning.

net/sunrpc/xprtrdma/xprt_rdma.h

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -202,18 +202,17 @@ enum {
202202
};
203203

204204
/*
205-
* struct rpcrdma_rep -- this structure encapsulates state required to recv
206-
* and complete a reply, asychronously. It needs several pieces of
207-
* state:
208-
* o recv buffer (posted to provider)
209-
* o ib_sge (also donated to provider)
210-
* o status of reply (length, success or not)
211-
* o bookkeeping state to get run by reply handler (list, etc)
205+
* struct rpcrdma_rep -- this structure encapsulates state required
206+
* to receive and complete an RPC Reply, asychronously. It needs
207+
* several pieces of state:
212208
*
213-
* These are allocated during initialization, per-transport instance.
209+
* o receive buffer and ib_sge (donated to provider)
210+
* o status of receive (success or not, length, inv rkey)
211+
* o bookkeeping state to get run by reply handler (XDR stream)
214212
*
215-
* N of these are associated with a transport instance, and stored in
216-
* struct rpcrdma_buffer. N is the max number of outstanding requests.
213+
* These structures are allocated during transport initialization.
214+
* N of these are associated with a transport instance, managed by
215+
* struct rpcrdma_buffer. N is the max number of outstanding RPCs.
217216
*/
218217

219218
struct rpcrdma_rep {
@@ -228,6 +227,7 @@ struct rpcrdma_rep {
228227
struct work_struct rr_work;
229228
struct xdr_buf rr_hdrbuf;
230229
struct xdr_stream rr_stream;
230+
struct rpc_rqst *rr_rqst;
231231
struct list_head rr_list;
232232
struct ib_recv_wr rr_recv_wr;
233233
};
@@ -616,6 +616,7 @@ bool rpcrdma_prepare_send_sges(struct rpcrdma_ia *, struct rpcrdma_req *,
616616
void rpcrdma_unmap_sges(struct rpcrdma_ia *, struct rpcrdma_req *);
617617
int rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst);
618618
void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *);
619+
void rpcrdma_complete_rqst(struct rpcrdma_rep *rep);
619620
void rpcrdma_reply_handler(struct work_struct *work);
620621

621622
static inline void rpcrdma_set_xdrlen(struct xdr_buf *xdr, size_t len)

0 commit comments

Comments
 (0)