Skip to content

Commit 01883ed

Browse files
sowminivdavem330
authored andcommitted
rds: support for zcopy completion notification
RDS removes a datagram (rds_message) from the retransmit queue when an ACK is received. The ACK indicates that the receiver has queued the RDS datagram, so that the sender can safely forget the datagram. When all references to the rds_message are quiesced, rds_message_purge is called to release resources used by the rds_message If the datagram to be removed had pinned pages set up, add an entry to the rs->rs_znotify_queue so that the notifcation will be sent up via rds_rm_zerocopy_callback() when the rds_message is eventually freed by rds_message_purge. rds_rm_zerocopy_callback() attempts to batch the number of cookies sent with each notification to a max of SO_EE_ORIGIN_MAX_ZCOOKIES. This is achieved by checking the tail skb in the sk_error_queue: if this has room for one more cookie, the cookie from the current notification is added; else a new skb is added to the sk_error_queue. Every invocation of rds_rm_zerocopy_callback() will trigger a ->sk_error_report to notify the application. Signed-off-by: Sowmini Varadhan <sowmini.varadhan@oracle.com> Acked-by: Santosh Shilimkar <santosh.shilimkar@oracle.com> Acked-by: Willem de Bruijn <willemb@google.com> Signed-off-by: David S. Miller <davem@davemloft.net>
1 parent 2819075 commit 01883ed

File tree

5 files changed

+96
-7
lines changed

5 files changed

+96
-7
lines changed

include/uapi/linux/errqueue.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@ struct sock_extended_err {
2020
#define SO_EE_ORIGIN_ICMP6 3
2121
#define SO_EE_ORIGIN_TXSTATUS 4
2222
#define SO_EE_ORIGIN_ZEROCOPY 5
23+
#define SO_EE_ORIGIN_ZCOOKIE 6
2324
#define SO_EE_ORIGIN_TIMESTAMPING SO_EE_ORIGIN_TXSTATUS
2425

2526
#define SO_EE_OFFENDER(ee) ((struct sockaddr*)((ee)+1))
2627

2728
#define SO_EE_CODE_ZEROCOPY_COPIED 1
29+
#define SO_EE_ORIGIN_MAX_ZCOOKIES 8
2830

2931
/**
3032
* struct scm_timestamping - timestamps exposed through cmsg

net/rds/af_rds.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,8 @@ static __poll_t rds_poll(struct file *file, struct socket *sock,
182182
mask |= (EPOLLIN | EPOLLRDNORM);
183183
if (rs->rs_snd_bytes < rds_sk_sndbuf(rs))
184184
mask |= (EPOLLOUT | EPOLLWRNORM);
185+
if (sk->sk_err || !skb_queue_empty(&sk->sk_error_queue))
186+
mask |= POLLERR;
185187
read_unlock_irqrestore(&rs->rs_recv_lock, flags);
186188

187189
/* clear state any time we wake a seen-congested socket */

net/rds/message.c

Lines changed: 76 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@
3333
#include <linux/kernel.h>
3434
#include <linux/slab.h>
3535
#include <linux/export.h>
36+
#include <linux/skbuff.h>
37+
#include <linux/list.h>
38+
#include <linux/errqueue.h>
3639

3740
#include "rds.h"
3841

@@ -53,29 +56,95 @@ void rds_message_addref(struct rds_message *rm)
5356
}
5457
EXPORT_SYMBOL_GPL(rds_message_addref);
5558

59+
static inline bool skb_zcookie_add(struct sk_buff *skb, u32 cookie)
60+
{
61+
struct sock_exterr_skb *serr = SKB_EXT_ERR(skb);
62+
int ncookies;
63+
u32 *ptr;
64+
65+
if (serr->ee.ee_origin != SO_EE_ORIGIN_ZCOOKIE)
66+
return false;
67+
ncookies = serr->ee.ee_data;
68+
if (ncookies == SO_EE_ORIGIN_MAX_ZCOOKIES)
69+
return false;
70+
ptr = skb_put(skb, sizeof(u32));
71+
*ptr = cookie;
72+
serr->ee.ee_data = ++ncookies;
73+
return true;
74+
}
75+
76+
static void rds_rm_zerocopy_callback(struct rds_sock *rs,
77+
struct rds_znotifier *znotif)
78+
{
79+
struct sock *sk = rds_rs_to_sk(rs);
80+
struct sk_buff *skb, *tail;
81+
struct sock_exterr_skb *serr;
82+
unsigned long flags;
83+
struct sk_buff_head *q;
84+
u32 cookie = znotif->z_cookie;
85+
86+
q = &sk->sk_error_queue;
87+
spin_lock_irqsave(&q->lock, flags);
88+
tail = skb_peek_tail(q);
89+
90+
if (tail && skb_zcookie_add(tail, cookie)) {
91+
spin_unlock_irqrestore(&q->lock, flags);
92+
mm_unaccount_pinned_pages(&znotif->z_mmp);
93+
consume_skb(rds_skb_from_znotifier(znotif));
94+
sk->sk_error_report(sk);
95+
return;
96+
}
97+
98+
skb = rds_skb_from_znotifier(znotif);
99+
serr = SKB_EXT_ERR(skb);
100+
memset(&serr->ee, 0, sizeof(serr->ee));
101+
serr->ee.ee_errno = 0;
102+
serr->ee.ee_origin = SO_EE_ORIGIN_ZCOOKIE;
103+
serr->ee.ee_info = 0;
104+
WARN_ON(!skb_zcookie_add(skb, cookie));
105+
106+
__skb_queue_tail(q, skb);
107+
108+
spin_unlock_irqrestore(&q->lock, flags);
109+
sk->sk_error_report(sk);
110+
111+
mm_unaccount_pinned_pages(&znotif->z_mmp);
112+
}
113+
56114
/*
57115
* This relies on dma_map_sg() not touching sg[].page during merging.
58116
*/
59117
static void rds_message_purge(struct rds_message *rm)
60118
{
61119
unsigned long i, flags;
120+
bool zcopy = false;
62121

63122
if (unlikely(test_bit(RDS_MSG_PAGEVEC, &rm->m_flags)))
64123
return;
65124

66-
for (i = 0; i < rm->data.op_nents; i++) {
67-
rdsdebug("putting data page %p\n", (void *)sg_page(&rm->data.op_sg[i]));
68-
/* XXX will have to put_page for page refs */
69-
__free_page(sg_page(&rm->data.op_sg[i]));
70-
}
71-
rm->data.op_nents = 0;
72125
spin_lock_irqsave(&rm->m_rs_lock, flags);
73126
if (rm->m_rs) {
74-
sock_put(rds_rs_to_sk(rm->m_rs));
127+
struct rds_sock *rs = rm->m_rs;
128+
129+
if (rm->data.op_mmp_znotifier) {
130+
zcopy = true;
131+
rds_rm_zerocopy_callback(rs, rm->data.op_mmp_znotifier);
132+
rm->data.op_mmp_znotifier = NULL;
133+
}
134+
sock_put(rds_rs_to_sk(rs));
75135
rm->m_rs = NULL;
76136
}
77137
spin_unlock_irqrestore(&rm->m_rs_lock, flags);
78138

139+
for (i = 0; i < rm->data.op_nents; i++) {
140+
/* XXX will have to put_page for page refs */
141+
if (!zcopy)
142+
__free_page(sg_page(&rm->data.op_sg[i]));
143+
else
144+
put_page(sg_page(&rm->data.op_sg[i]));
145+
}
146+
rm->data.op_nents = 0;
147+
79148
if (rm->rdma.op_active)
80149
rds_rdma_free_op(&rm->rdma);
81150
if (rm->rdma.op_rdma_mr)

net/rds/rds.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,19 @@ static inline u32 rds_rdma_cookie_offset(rds_rdma_cookie_t cookie)
356356
#define RDS_MSG_PAGEVEC 7
357357
#define RDS_MSG_FLUSH 8
358358

359+
struct rds_znotifier {
360+
struct list_head z_list;
361+
struct mmpin z_mmp;
362+
u32 z_cookie;
363+
};
364+
365+
#define RDS_ZCOPY_SKB(__skb) ((struct rds_znotifier *)&((__skb)->cb[0]))
366+
367+
static inline struct sk_buff *rds_skb_from_znotifier(struct rds_znotifier *z)
368+
{
369+
return container_of((void *)z, struct sk_buff, cb);
370+
}
371+
359372
struct rds_message {
360373
refcount_t m_refcount;
361374
struct list_head m_sock_item;
@@ -436,6 +449,7 @@ struct rds_message {
436449
unsigned int op_count;
437450
unsigned int op_dmasg;
438451
unsigned int op_dmaoff;
452+
struct rds_znotifier *op_mmp_znotifier;
439453
struct scatterlist *op_sg;
440454
} data;
441455
};

net/rds/recv.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -594,6 +594,8 @@ int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
594594

595595
if (msg_flags & MSG_OOB)
596596
goto out;
597+
if (msg_flags & MSG_ERRQUEUE)
598+
return sock_recv_errqueue(sk, msg, size, SOL_IP, IP_RECVERR);
597599

598600
while (1) {
599601
/* If there are pending notifications, do those - and nothing else */

0 commit comments

Comments
 (0)