Skip to content

Commit 95f7691

Browse files
author
Trond Myklebust
committed
SUNRPC: Convert xprt receive queue to use an rbtree
If the server is slow, we can find ourselves with quite a lot of entries on the receive queue. Converting the search from an O(n) to O(log(n)) can make a significant difference, particularly since we have to hold a number of locks while searching. Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
1 parent bd79bc5 commit 95f7691

File tree

2 files changed

+84
-13
lines changed

2 files changed

+84
-13
lines changed

include/linux/sunrpc/xprt.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ struct rpc_rqst {
8585

8686
union {
8787
struct list_head rq_list; /* Slot allocation list */
88-
struct list_head rq_recv; /* Receive queue */
88+
struct rb_node rq_recv; /* Receive queue */
8989
};
9090

9191
struct list_head rq_xmit; /* Send queue */
@@ -260,7 +260,7 @@ struct rpc_xprt {
260260
* backchannel rpc_rqst's */
261261
#endif /* CONFIG_SUNRPC_BACKCHANNEL */
262262

263-
struct list_head recv_queue; /* Receive queue */
263+
struct rb_root recv_queue; /* Receive queue */
264264

265265
struct {
266266
unsigned long bind_count, /* total number of binds */

net/sunrpc/xprt.c

Lines changed: 82 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -753,7 +753,7 @@ static void
753753
xprt_schedule_autodisconnect(struct rpc_xprt *xprt)
754754
__must_hold(&xprt->transport_lock)
755755
{
756-
if (list_empty(&xprt->recv_queue) && xprt_has_timer(xprt))
756+
if (RB_EMPTY_ROOT(&xprt->recv_queue) && xprt_has_timer(xprt))
757757
mod_timer(&xprt->timer, xprt->last_used + xprt->idle_timeout);
758758
}
759759

@@ -763,7 +763,7 @@ xprt_init_autodisconnect(struct timer_list *t)
763763
struct rpc_xprt *xprt = from_timer(xprt, t, timer);
764764

765765
spin_lock(&xprt->transport_lock);
766-
if (!list_empty(&xprt->recv_queue))
766+
if (!RB_EMPTY_ROOT(&xprt->recv_queue))
767767
goto out_abort;
768768
/* Reset xprt->last_used to avoid connect/autodisconnect cycling */
769769
xprt->last_used = jiffies;
@@ -880,6 +880,75 @@ static void xprt_connect_status(struct rpc_task *task)
880880
}
881881
}
882882

883+
enum xprt_xid_rb_cmp {
884+
XID_RB_EQUAL,
885+
XID_RB_LEFT,
886+
XID_RB_RIGHT,
887+
};
888+
static enum xprt_xid_rb_cmp
889+
xprt_xid_cmp(__be32 xid1, __be32 xid2)
890+
{
891+
if (xid1 == xid2)
892+
return XID_RB_EQUAL;
893+
if ((__force u32)xid1 < (__force u32)xid2)
894+
return XID_RB_LEFT;
895+
return XID_RB_RIGHT;
896+
}
897+
898+
static struct rpc_rqst *
899+
xprt_request_rb_find(struct rpc_xprt *xprt, __be32 xid)
900+
{
901+
struct rb_node *n = xprt->recv_queue.rb_node;
902+
struct rpc_rqst *req;
903+
904+
while (n != NULL) {
905+
req = rb_entry(n, struct rpc_rqst, rq_recv);
906+
switch (xprt_xid_cmp(xid, req->rq_xid)) {
907+
case XID_RB_LEFT:
908+
n = n->rb_left;
909+
break;
910+
case XID_RB_RIGHT:
911+
n = n->rb_right;
912+
break;
913+
case XID_RB_EQUAL:
914+
return req;
915+
}
916+
}
917+
return NULL;
918+
}
919+
920+
static void
921+
xprt_request_rb_insert(struct rpc_xprt *xprt, struct rpc_rqst *new)
922+
{
923+
struct rb_node **p = &xprt->recv_queue.rb_node;
924+
struct rb_node *n = NULL;
925+
struct rpc_rqst *req;
926+
927+
while (*p != NULL) {
928+
n = *p;
929+
req = rb_entry(n, struct rpc_rqst, rq_recv);
930+
switch(xprt_xid_cmp(new->rq_xid, req->rq_xid)) {
931+
case XID_RB_LEFT:
932+
p = &n->rb_left;
933+
break;
934+
case XID_RB_RIGHT:
935+
p = &n->rb_right;
936+
break;
937+
case XID_RB_EQUAL:
938+
WARN_ON_ONCE(new != req);
939+
return;
940+
}
941+
}
942+
rb_link_node(&new->rq_recv, n, p);
943+
rb_insert_color(&new->rq_recv, &xprt->recv_queue);
944+
}
945+
946+
static void
947+
xprt_request_rb_remove(struct rpc_xprt *xprt, struct rpc_rqst *req)
948+
{
949+
rb_erase(&req->rq_recv, &xprt->recv_queue);
950+
}
951+
883952
/**
884953
* xprt_lookup_rqst - find an RPC request corresponding to an XID
885954
* @xprt: transport on which the original request was transmitted
@@ -891,12 +960,12 @@ struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid)
891960
{
892961
struct rpc_rqst *entry;
893962

894-
list_for_each_entry(entry, &xprt->recv_queue, rq_recv)
895-
if (entry->rq_xid == xid) {
896-
trace_xprt_lookup_rqst(xprt, xid, 0);
897-
entry->rq_rtt = ktime_sub(ktime_get(), entry->rq_xtime);
898-
return entry;
899-
}
963+
entry = xprt_request_rb_find(xprt, xid);
964+
if (entry != NULL) {
965+
trace_xprt_lookup_rqst(xprt, xid, 0);
966+
entry->rq_rtt = ktime_sub(ktime_get(), entry->rq_xtime);
967+
return entry;
968+
}
900969

901970
dprintk("RPC: xprt_lookup_rqst did not find xid %08x\n",
902971
ntohl(xid));
@@ -981,7 +1050,7 @@ xprt_request_enqueue_receive(struct rpc_task *task)
9811050
sizeof(req->rq_private_buf));
9821051

9831052
/* Add request to the receive list */
984-
list_add_tail(&req->rq_recv, &xprt->recv_queue);
1053+
xprt_request_rb_insert(xprt, req);
9851054
set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate);
9861055
spin_unlock(&xprt->queue_lock);
9871056

@@ -999,8 +1068,10 @@ xprt_request_enqueue_receive(struct rpc_task *task)
9991068
static void
10001069
xprt_request_dequeue_receive_locked(struct rpc_task *task)
10011070
{
1071+
struct rpc_rqst *req = task->tk_rqstp;
1072+
10021073
if (test_and_clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate))
1003-
list_del(&task->tk_rqstp->rq_recv);
1074+
xprt_request_rb_remove(req->rq_xprt, req);
10041075
}
10051076

10061077
/**
@@ -1711,7 +1782,7 @@ static void xprt_init(struct rpc_xprt *xprt, struct net *net)
17111782
spin_lock_init(&xprt->queue_lock);
17121783

17131784
INIT_LIST_HEAD(&xprt->free);
1714-
INIT_LIST_HEAD(&xprt->recv_queue);
1785+
xprt->recv_queue = RB_ROOT;
17151786
INIT_LIST_HEAD(&xprt->xmit_queue);
17161787
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
17171788
spin_lock_init(&xprt->bc_pa_lock);

0 commit comments

Comments
 (0)