@@ -1211,6 +1211,60 @@ rpcrdma_decode_error(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep,
1211
1211
return - EREMOTEIO ;
1212
1212
}
1213
1213
1214
+ /* Perform XID lookup, reconstruction of the RPC reply, and
1215
+ * RPC completion while holding the transport lock to ensure
1216
+ * the rep, rqst, and rq_task pointers remain stable.
1217
+ */
1218
+ void rpcrdma_complete_rqst (struct rpcrdma_rep * rep )
1219
+ {
1220
+ struct rpcrdma_xprt * r_xprt = rep -> rr_rxprt ;
1221
+ struct rpc_xprt * xprt = & r_xprt -> rx_xprt ;
1222
+ struct rpc_rqst * rqst = rep -> rr_rqst ;
1223
+ unsigned long cwnd ;
1224
+ int status ;
1225
+
1226
+ xprt -> reestablish_timeout = 0 ;
1227
+
1228
+ switch (rep -> rr_proc ) {
1229
+ case rdma_msg :
1230
+ status = rpcrdma_decode_msg (r_xprt , rep , rqst );
1231
+ break ;
1232
+ case rdma_nomsg :
1233
+ status = rpcrdma_decode_nomsg (r_xprt , rep );
1234
+ break ;
1235
+ case rdma_error :
1236
+ status = rpcrdma_decode_error (r_xprt , rep , rqst );
1237
+ break ;
1238
+ default :
1239
+ status = - EIO ;
1240
+ }
1241
+ if (status < 0 )
1242
+ goto out_badheader ;
1243
+
1244
+ out :
1245
+ spin_lock (& xprt -> recv_lock );
1246
+ cwnd = xprt -> cwnd ;
1247
+ xprt -> cwnd = atomic_read (& r_xprt -> rx_buf .rb_credits ) << RPC_CWNDSHIFT ;
1248
+ if (xprt -> cwnd > cwnd )
1249
+ xprt_release_rqst_cong (rqst -> rq_task );
1250
+
1251
+ xprt_complete_rqst (rqst -> rq_task , status );
1252
+ xprt_unpin_rqst (rqst );
1253
+ spin_unlock (& xprt -> recv_lock );
1254
+ return ;
1255
+
1256
+ /* If the incoming reply terminated a pending RPC, the next
1257
+ * RPC call will post a replacement receive buffer as it is
1258
+ * being marshaled.
1259
+ */
1260
+ out_badheader :
1261
+ dprintk ("RPC: %5u %s: invalid rpcrdma reply (type %u)\n" ,
1262
+ rqst -> rq_task -> tk_pid , __func__ , be32_to_cpu (rep -> rr_proc ));
1263
+ r_xprt -> rx_stats .bad_reply_count ++ ;
1264
+ status = - EIO ;
1265
+ goto out ;
1266
+ }
1267
+
1214
1268
/* Process received RPC/RDMA messages.
1215
1269
*
1216
1270
* Errors must result in the RPC task either being awakened, or
@@ -1225,8 +1279,6 @@ rpcrdma_reply_handler(struct work_struct *work)
1225
1279
struct rpc_xprt * xprt = & r_xprt -> rx_xprt ;
1226
1280
struct rpcrdma_req * req ;
1227
1281
struct rpc_rqst * rqst ;
1228
- unsigned long cwnd ;
1229
- int status ;
1230
1282
__be32 * p ;
1231
1283
1232
1284
dprintk ("RPC: %s: incoming rep %p\n" , __func__ , rep );
@@ -1263,6 +1315,7 @@ rpcrdma_reply_handler(struct work_struct *work)
1263
1315
spin_unlock (& xprt -> recv_lock );
1264
1316
req = rpcr_to_rdmar (rqst );
1265
1317
req -> rl_reply = rep ;
1318
+ rep -> rr_rqst = rqst ;
1266
1319
1267
1320
dprintk ("RPC: %s: reply %p completes request %p (xid 0x%08x)\n" ,
1268
1321
__func__ , rep , req , be32_to_cpu (rep -> rr_xid ));
@@ -1280,36 +1333,7 @@ rpcrdma_reply_handler(struct work_struct *work)
1280
1333
& req -> rl_registered );
1281
1334
}
1282
1335
1283
- xprt -> reestablish_timeout = 0 ;
1284
-
1285
- switch (rep -> rr_proc ) {
1286
- case rdma_msg :
1287
- status = rpcrdma_decode_msg (r_xprt , rep , rqst );
1288
- break ;
1289
- case rdma_nomsg :
1290
- status = rpcrdma_decode_nomsg (r_xprt , rep );
1291
- break ;
1292
- case rdma_error :
1293
- status = rpcrdma_decode_error (r_xprt , rep , rqst );
1294
- break ;
1295
- default :
1296
- status = - EIO ;
1297
- }
1298
- if (status < 0 )
1299
- goto out_badheader ;
1300
-
1301
- out :
1302
- spin_lock (& xprt -> recv_lock );
1303
- cwnd = xprt -> cwnd ;
1304
- xprt -> cwnd = atomic_read (& r_xprt -> rx_buf .rb_credits ) << RPC_CWNDSHIFT ;
1305
- if (xprt -> cwnd > cwnd )
1306
- xprt_release_rqst_cong (rqst -> rq_task );
1307
-
1308
- xprt_complete_rqst (rqst -> rq_task , status );
1309
- xprt_unpin_rqst (rqst );
1310
- spin_unlock (& xprt -> recv_lock );
1311
- dprintk ("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n" ,
1312
- __func__ , xprt , rqst , status );
1336
+ rpcrdma_complete_rqst (rep );
1313
1337
return ;
1314
1338
1315
1339
out_badstatus :
@@ -1325,20 +1349,8 @@ rpcrdma_reply_handler(struct work_struct *work)
1325
1349
__func__ , be32_to_cpu (rep -> rr_vers ));
1326
1350
goto repost ;
1327
1351
1328
- /* If the incoming reply terminated a pending RPC, the next
1329
- * RPC call will post a replacement receive buffer as it is
1330
- * being marshaled.
1331
- */
1332
- out_badheader :
1333
- dprintk ("RPC: %5u %s: invalid rpcrdma reply (type %u)\n" ,
1334
- rqst -> rq_task -> tk_pid , __func__ , be32_to_cpu (rep -> rr_proc ));
1335
- r_xprt -> rx_stats .bad_reply_count ++ ;
1336
- status = - EIO ;
1337
- goto out ;
1338
-
1339
- /* The req was still available, but by the time the recv_lock
1340
- * was acquired, the rqst and task had been released. Thus the RPC
1341
- * has already been terminated.
1352
+ /* The RPC transaction has already been terminated, or the header
1353
+ * is corrupt.
1342
1354
*/
1343
1355
out_norqst :
1344
1356
spin_unlock (& xprt -> recv_lock );
@@ -1348,7 +1360,6 @@ rpcrdma_reply_handler(struct work_struct *work)
1348
1360
1349
1361
out_shortreply :
1350
1362
dprintk ("RPC: %s: short/invalid reply\n" , __func__ );
1351
- goto repost ;
1352
1363
1353
1364
/* If no pending RPC transaction was matched, post a replacement
1354
1365
* receive buffer before returning.
0 commit comments