Skip to content

Commit 6d2d0ee

Browse files
chuckleveramschuma-ntap
authored andcommitted
xprtrdma: Replace rpcrdma_receive_wq with a per-xprt workqueue
To address a connection-close ordering problem, we need the ability to drain the RPC completions running on rpcrdma_receive_wq for just one transport. Give each transport its own RPC completion workqueue, and drain that workqueue when disconnecting the transport. Signed-off-by: Chuck Lever <chuck.lever@oracle.com> Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
1 parent 6ceea36 commit 6d2d0ee

File tree

4 files changed

+44
-48
lines changed

4 files changed

+44
-48
lines changed

net/sunrpc/xprtrdma/rpc_rdma.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1356,7 +1356,7 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
13561356
clear_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags);
13571357

13581358
trace_xprtrdma_reply(rqst->rq_task, rep, req, credits);
1359-
queue_work(rpcrdma_receive_wq, &rep->rr_work);
1359+
queue_work(buf->rb_completion_wq, &rep->rr_work);
13601360
return;
13611361

13621362
out_badversion:

net/sunrpc/xprtrdma/transport.c

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -444,10 +444,14 @@ xprt_rdma_close(struct rpc_xprt *xprt)
444444
struct rpcrdma_ep *ep = &r_xprt->rx_ep;
445445
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
446446

447+
might_sleep();
448+
447449
dprintk("RPC: %s: closing xprt %p\n", __func__, xprt);
448450

451+
/* Prevent marshaling and sending of new requests */
452+
xprt_clear_connected(xprt);
453+
449454
if (test_and_clear_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags)) {
450-
xprt_clear_connected(xprt);
451455
rpcrdma_ia_remove(ia);
452456
return;
453457
}
@@ -858,8 +862,6 @@ void xprt_rdma_cleanup(void)
858862
dprintk("RPC: %s: xprt_unregister returned %i\n",
859863
__func__, rc);
860864

861-
rpcrdma_destroy_wq();
862-
863865
rc = xprt_unregister_transport(&xprt_rdma_bc);
864866
if (rc)
865867
dprintk("RPC: %s: xprt_unregister(bc) returned %i\n",
@@ -870,20 +872,13 @@ int xprt_rdma_init(void)
870872
{
871873
int rc;
872874

873-
rc = rpcrdma_alloc_wq();
874-
if (rc)
875-
return rc;
876-
877875
rc = xprt_register_transport(&xprt_rdma);
878-
if (rc) {
879-
rpcrdma_destroy_wq();
876+
if (rc)
880877
return rc;
881-
}
882878

883879
rc = xprt_register_transport(&xprt_rdma_bc);
884880
if (rc) {
885881
xprt_unregister_transport(&xprt_rdma);
886-
rpcrdma_destroy_wq();
887882
return rc;
888883
}
889884

net/sunrpc/xprtrdma/verbs.c

Lines changed: 36 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -80,33 +80,23 @@ static int rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt, bool temp);
8080
static void rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb);
8181
static void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp);
8282

83-
struct workqueue_struct *rpcrdma_receive_wq __read_mostly;
84-
85-
int
86-
rpcrdma_alloc_wq(void)
83+
/* Wait for outstanding transport work to finish.
84+
*/
85+
static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt)
8786
{
88-
struct workqueue_struct *recv_wq;
89-
90-
recv_wq = alloc_workqueue("xprtrdma_receive",
91-
WQ_MEM_RECLAIM | WQ_HIGHPRI,
92-
0);
93-
if (!recv_wq)
94-
return -ENOMEM;
95-
96-
rpcrdma_receive_wq = recv_wq;
97-
return 0;
98-
}
87+
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
88+
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
9989

100-
void
101-
rpcrdma_destroy_wq(void)
102-
{
103-
struct workqueue_struct *wq;
90+
/* Flush Receives, then wait for deferred Reply work
91+
* to complete.
92+
*/
93+
ib_drain_qp(ia->ri_id->qp);
94+
drain_workqueue(buf->rb_completion_wq);
10495

105-
if (rpcrdma_receive_wq) {
106-
wq = rpcrdma_receive_wq;
107-
rpcrdma_receive_wq = NULL;
108-
destroy_workqueue(wq);
109-
}
96+
/* Deferred Reply processing might have scheduled
97+
* local invalidations.
98+
*/
99+
ib_drain_sq(ia->ri_id->qp);
110100
}
111101

112102
/**
@@ -483,7 +473,7 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia)
483473
* connection is already gone.
484474
*/
485475
if (ia->ri_id->qp) {
486-
ib_drain_qp(ia->ri_id->qp);
476+
rpcrdma_xprt_drain(r_xprt);
487477
rdma_destroy_qp(ia->ri_id);
488478
ia->ri_id->qp = NULL;
489479
}
@@ -825,8 +815,10 @@ rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
825815
return rc;
826816
}
827817

828-
/*
829-
* rpcrdma_ep_disconnect
818+
/**
819+
* rpcrdma_ep_disconnect - Disconnect underlying transport
820+
* @ep: endpoint to disconnect
821+
* @ia: associated interface adapter
830822
*
831823
* This is separate from destroy to facilitate the ability
832824
* to reconnect without recreating the endpoint.
@@ -837,19 +829,20 @@ rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
837829
void
838830
rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
839831
{
832+
struct rpcrdma_xprt *r_xprt = container_of(ep, struct rpcrdma_xprt,
833+
rx_ep);
840834
int rc;
841835

836+
/* returns without wait if ID is not connected */
842837
rc = rdma_disconnect(ia->ri_id);
843838
if (!rc)
844-
/* returns without wait if not connected */
845839
wait_event_interruptible(ep->rep_connect_wait,
846840
ep->rep_connected != 1);
847841
else
848842
ep->rep_connected = rc;
849-
trace_xprtrdma_disconnect(container_of(ep, struct rpcrdma_xprt,
850-
rx_ep), rc);
843+
trace_xprtrdma_disconnect(r_xprt, rc);
851844

852-
ib_drain_qp(ia->ri_id->qp);
845+
rpcrdma_xprt_drain(r_xprt);
853846
}
854847

855848
/* Fixed-size circular FIFO queue. This implementation is wait-free and
@@ -1183,6 +1176,13 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
11831176
if (rc)
11841177
goto out;
11851178

1179+
buf->rb_completion_wq = alloc_workqueue("rpcrdma-%s",
1180+
WQ_MEM_RECLAIM | WQ_HIGHPRI,
1181+
0,
1182+
r_xprt->rx_xprt.address_strings[RPC_DISPLAY_ADDR]);
1183+
if (!buf->rb_completion_wq)
1184+
goto out;
1185+
11861186
return 0;
11871187
out:
11881188
rpcrdma_buffer_destroy(buf);
@@ -1241,6 +1241,11 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
12411241
{
12421242
cancel_delayed_work_sync(&buf->rb_refresh_worker);
12431243

1244+
if (buf->rb_completion_wq) {
1245+
destroy_workqueue(buf->rb_completion_wq);
1246+
buf->rb_completion_wq = NULL;
1247+
}
1248+
12441249
rpcrdma_sendctxs_destroy(buf);
12451250

12461251
while (!list_empty(&buf->rb_recv_bufs)) {

net/sunrpc/xprtrdma/xprt_rdma.h

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,7 @@ struct rpcrdma_buffer {
412412

413413
u32 rb_bc_max_requests;
414414

415+
struct workqueue_struct *rb_completion_wq;
415416
struct delayed_work rb_refresh_worker;
416417
};
417418
#define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
@@ -547,8 +548,6 @@ void rpcrdma_ia_close(struct rpcrdma_ia *);
547548
bool frwr_is_supported(struct rpcrdma_ia *);
548549
bool fmr_is_supported(struct rpcrdma_ia *);
549550

550-
extern struct workqueue_struct *rpcrdma_receive_wq;
551-
552551
/*
553552
* Endpoint calls - xprtrdma/verbs.c
554553
*/
@@ -603,9 +602,6 @@ rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
603602
return __rpcrdma_dma_map_regbuf(ia, rb);
604603
}
605604

606-
int rpcrdma_alloc_wq(void);
607-
void rpcrdma_destroy_wq(void);
608-
609605
/*
610606
* Wrappers for chunk registration, shared by read/write chunk code.
611607
*/

0 commit comments

Comments
 (0)