Skip to content

Commit 80c6d2b

Browse files
committed
Merge branch 'RDS-zerocopy-support'
Sowmini Varadhan says: ==================== RDS: zerocopy support This is version 3 of the series, following up on review comments for http://patchwork.ozlabs.org/project/netdev/list/?series=28530 Review comments addressed Patch 4 - fix fragile use of skb->cb[], do not set ee_code incorrectly. Patch 5: - remove needless bzero of skb->cb[], consolidate err cleanup A brief overview of this feature follows. This patch series provides support for MSG_ZERCOCOPY on a PF_RDS socket based on the APIs and infrastructure added by Commit f214f91 ("tcp: enable MSG_ZEROCOPY") For single threaded rds-stress testing using rds-tcp with the ixgbe driver using 1M message sizes (-a 1M -q 1M) preliminary results show that there is a significant reduction in latency: about 90 usec with zerocopy, compared with 200 usec without zerocopy. This patchset modifies the above for zerocopy in the following manner. - if the MSG_ZEROCOPY flag is specified with rds_sendmsg(), and, - if the SO_ZEROCOPY socket option has been set on the PF_RDS socket, application pages sent down with rds_sendmsg are pinned. The pinning uses the accounting infrastructure added by a91dbff ("sock: ulimit on MSG_ZEROCOPY pages"). The message is unpinned when all references to the message go down to 0, and the message is freed by rds_message_purge. A multithreaded application using this infrastructure must send down a unique 32 bit cookie as ancillary data with each sendmsg invocation. The format of this ancillary data is described in Patch 5 of the series. The cookie is passed up to the application on the sk_error_queue when the message is unpinned, indicating to the application that it is now safe to free/reuse the message buffer. The details of the completion notification are provided in Patch 4 of this series. ==================== Signed-off-by: David S. Miller <davem@davemloft.net>
2 parents ee99b2d + dfb8434 commit 80c6d2b

File tree

11 files changed

+339
-35
lines changed

11 files changed

+339
-35
lines changed

include/linux/skbuff.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,9 @@ struct ubuf_info {
466466

467467
#define skb_uarg(SKB) ((struct ubuf_info *)(skb_shinfo(SKB)->destructor_arg))
468468

469+
int mm_account_pinned_pages(struct mmpin *mmp, size_t size);
470+
void mm_unaccount_pinned_pages(struct mmpin *mmp);
471+
469472
struct ubuf_info *sock_zerocopy_alloc(struct sock *sk, size_t size);
470473
struct ubuf_info *sock_zerocopy_realloc(struct sock *sk, size_t size,
471474
struct ubuf_info *uarg);

include/uapi/linux/errqueue.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@ struct sock_extended_err {
2020
#define SO_EE_ORIGIN_ICMP6 3
2121
#define SO_EE_ORIGIN_TXSTATUS 4
2222
#define SO_EE_ORIGIN_ZEROCOPY 5
23+
#define SO_EE_ORIGIN_ZCOOKIE 6
2324
#define SO_EE_ORIGIN_TIMESTAMPING SO_EE_ORIGIN_TXSTATUS
2425

2526
#define SO_EE_OFFENDER(ee) ((struct sockaddr*)((ee)+1))
2627

2728
#define SO_EE_CODE_ZEROCOPY_COPIED 1
29+
#define SO_EE_ORIGIN_MAX_ZCOOKIES 8
2830

2931
/**
3032
* struct scm_timestamping - timestamps exposed through cmsg

include/uapi/linux/rds.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@
103103
#define RDS_CMSG_MASKED_ATOMIC_FADD 8
104104
#define RDS_CMSG_MASKED_ATOMIC_CSWP 9
105105
#define RDS_CMSG_RXPATH_LATENCY 11
106+
#define RDS_CMSG_ZCOPY_COOKIE 12
106107

107108
#define RDS_INFO_FIRST 10000
108109
#define RDS_INFO_COUNTERS 10000

net/core/skbuff.c

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -890,7 +890,7 @@ struct sk_buff *skb_morph(struct sk_buff *dst, struct sk_buff *src)
890890
}
891891
EXPORT_SYMBOL_GPL(skb_morph);
892892

893-
static int mm_account_pinned_pages(struct mmpin *mmp, size_t size)
893+
int mm_account_pinned_pages(struct mmpin *mmp, size_t size)
894894
{
895895
unsigned long max_pg, num_pg, new_pg, old_pg;
896896
struct user_struct *user;
@@ -919,14 +919,16 @@ static int mm_account_pinned_pages(struct mmpin *mmp, size_t size)
919919

920920
return 0;
921921
}
922+
EXPORT_SYMBOL_GPL(mm_account_pinned_pages);
922923

923-
static void mm_unaccount_pinned_pages(struct mmpin *mmp)
924+
void mm_unaccount_pinned_pages(struct mmpin *mmp)
924925
{
925926
if (mmp->user) {
926927
atomic_long_sub(mmp->num_pg, &mmp->user->locked_vm);
927928
free_uid(mmp->user);
928929
}
929930
}
931+
EXPORT_SYMBOL_GPL(mm_unaccount_pinned_pages);
930932

931933
struct ubuf_info *sock_zerocopy_alloc(struct sock *sk, size_t size)
932934
{

net/core/sock.c

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1049,18 +1049,21 @@ int sock_setsockopt(struct socket *sock, int level, int optname,
10491049
break;
10501050

10511051
case SO_ZEROCOPY:
1052-
if (sk->sk_family != PF_INET && sk->sk_family != PF_INET6)
1052+
if (sk->sk_family == PF_INET || sk->sk_family == PF_INET6) {
1053+
if (sk->sk_protocol != IPPROTO_TCP)
1054+
ret = -ENOTSUPP;
1055+
else if (sk->sk_state != TCP_CLOSE)
1056+
ret = -EBUSY;
1057+
} else if (sk->sk_family != PF_RDS) {
10531058
ret = -ENOTSUPP;
1054-
else if (sk->sk_protocol != IPPROTO_TCP)
1055-
ret = -ENOTSUPP;
1056-
else if (sk->sk_state != TCP_CLOSE)
1057-
ret = -EBUSY;
1058-
else if (val < 0 || val > 1)
1059-
ret = -EINVAL;
1060-
else
1061-
sock_valbool_flag(sk, SOCK_ZEROCOPY, valbool);
1062-
break;
1063-
1059+
}
1060+
if (!ret) {
1061+
if (val < 0 || val > 1)
1062+
ret = -EINVAL;
1063+
else
1064+
sock_valbool_flag(sk, SOCK_ZEROCOPY, valbool);
1065+
break;
1066+
}
10641067
default:
10651068
ret = -ENOPROTOOPT;
10661069
break;

net/rds/af_rds.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,8 @@ static __poll_t rds_poll(struct file *file, struct socket *sock,
182182
mask |= (EPOLLIN | EPOLLRDNORM);
183183
if (rs->rs_snd_bytes < rds_sk_sndbuf(rs))
184184
mask |= (EPOLLOUT | EPOLLWRNORM);
185+
if (sk->sk_err || !skb_queue_empty(&sk->sk_error_queue))
186+
mask |= POLLERR;
185187
read_unlock_irqrestore(&rs->rs_recv_lock, flags);
186188

187189
/* clear state any time we wake a seen-congested socket */

net/rds/message.c

Lines changed: 128 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@
3333
#include <linux/kernel.h>
3434
#include <linux/slab.h>
3535
#include <linux/export.h>
36+
#include <linux/skbuff.h>
37+
#include <linux/list.h>
38+
#include <linux/errqueue.h>
3639

3740
#include "rds.h"
3841

@@ -53,20 +56,92 @@ void rds_message_addref(struct rds_message *rm)
5356
}
5457
EXPORT_SYMBOL_GPL(rds_message_addref);
5558

59+
static inline bool skb_zcookie_add(struct sk_buff *skb, u32 cookie)
60+
{
61+
struct sock_exterr_skb *serr = SKB_EXT_ERR(skb);
62+
int ncookies;
63+
u32 *ptr;
64+
65+
if (serr->ee.ee_origin != SO_EE_ORIGIN_ZCOOKIE)
66+
return false;
67+
ncookies = serr->ee.ee_data;
68+
if (ncookies == SO_EE_ORIGIN_MAX_ZCOOKIES)
69+
return false;
70+
ptr = skb_put(skb, sizeof(u32));
71+
*ptr = cookie;
72+
serr->ee.ee_data = ++ncookies;
73+
return true;
74+
}
75+
76+
static void rds_rm_zerocopy_callback(struct rds_sock *rs,
77+
struct rds_znotifier *znotif)
78+
{
79+
struct sock *sk = rds_rs_to_sk(rs);
80+
struct sk_buff *skb, *tail;
81+
struct sock_exterr_skb *serr;
82+
unsigned long flags;
83+
struct sk_buff_head *q;
84+
u32 cookie = znotif->z_cookie;
85+
86+
q = &sk->sk_error_queue;
87+
spin_lock_irqsave(&q->lock, flags);
88+
tail = skb_peek_tail(q);
89+
90+
if (tail && skb_zcookie_add(tail, cookie)) {
91+
spin_unlock_irqrestore(&q->lock, flags);
92+
mm_unaccount_pinned_pages(&znotif->z_mmp);
93+
consume_skb(rds_skb_from_znotifier(znotif));
94+
sk->sk_error_report(sk);
95+
return;
96+
}
97+
98+
skb = rds_skb_from_znotifier(znotif);
99+
serr = SKB_EXT_ERR(skb);
100+
memset(&serr->ee, 0, sizeof(serr->ee));
101+
serr->ee.ee_errno = 0;
102+
serr->ee.ee_origin = SO_EE_ORIGIN_ZCOOKIE;
103+
serr->ee.ee_info = 0;
104+
WARN_ON(!skb_zcookie_add(skb, cookie));
105+
106+
__skb_queue_tail(q, skb);
107+
108+
spin_unlock_irqrestore(&q->lock, flags);
109+
sk->sk_error_report(sk);
110+
111+
mm_unaccount_pinned_pages(&znotif->z_mmp);
112+
}
113+
56114
/*
57115
* This relies on dma_map_sg() not touching sg[].page during merging.
58116
*/
59117
static void rds_message_purge(struct rds_message *rm)
60118
{
61-
unsigned long i;
119+
unsigned long i, flags;
120+
bool zcopy = false;
62121

63122
if (unlikely(test_bit(RDS_MSG_PAGEVEC, &rm->m_flags)))
64123
return;
65124

125+
spin_lock_irqsave(&rm->m_rs_lock, flags);
126+
if (rm->m_rs) {
127+
struct rds_sock *rs = rm->m_rs;
128+
129+
if (rm->data.op_mmp_znotifier) {
130+
zcopy = true;
131+
rds_rm_zerocopy_callback(rs, rm->data.op_mmp_znotifier);
132+
rm->data.op_mmp_znotifier = NULL;
133+
}
134+
sock_put(rds_rs_to_sk(rs));
135+
rm->m_rs = NULL;
136+
}
137+
spin_unlock_irqrestore(&rm->m_rs_lock, flags);
138+
66139
for (i = 0; i < rm->data.op_nents; i++) {
67-
rdsdebug("putting data page %p\n", (void *)sg_page(&rm->data.op_sg[i]));
68140
/* XXX will have to put_page for page refs */
69-
__free_page(sg_page(&rm->data.op_sg[i]));
141+
if (!zcopy)
142+
__free_page(sg_page(&rm->data.op_sg[i]));
143+
else
144+
put_page(sg_page(&rm->data.op_sg[i]));
70145
}
71146
rm->data.op_nents = 0;
72147

@@ -266,12 +341,14 @@ struct rds_message *rds_message_map_pages(unsigned long *page_addrs, unsigned in
266341
return rm;
267342
}
268343

269-
int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from)
344+
int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from,
345+
bool zcopy)
270346
{
271347
unsigned long to_copy, nbytes;
272348
unsigned long sg_off;
273349
struct scatterlist *sg;
274350
int ret = 0;
351+
int length = iov_iter_count(from);
275352

276353
rm->m_inc.i_hdr.h_len = cpu_to_be32(iov_iter_count(from));
277354

@@ -281,6 +358,53 @@ int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from)
281358
sg = rm->data.op_sg;
282359
sg_off = 0; /* Dear gcc, sg->page will be null from kzalloc. */
283360

361+
if (zcopy) {
362+
int total_copied = 0;
363+
struct sk_buff *skb;
364+
365+
skb = alloc_skb(SO_EE_ORIGIN_MAX_ZCOOKIES * sizeof(u32),
366+
GFP_KERNEL);
367+
if (!skb)
368+
return -ENOMEM;
369+
rm->data.op_mmp_znotifier = RDS_ZCOPY_SKB(skb);
370+
if (mm_account_pinned_pages(&rm->data.op_mmp_znotifier->z_mmp,
371+
length)) {
372+
ret = -ENOMEM;
373+
goto err;
374+
}
375+
while (iov_iter_count(from)) {
376+
struct page *pages;
377+
size_t start;
378+
ssize_t copied;
379+
380+
copied = iov_iter_get_pages(from, &pages, PAGE_SIZE,
381+
1, &start);
382+
if (copied < 0) {
383+
struct mmpin *mmp;
384+
int i;
385+
386+
for (i = 0; i < rm->data.op_nents; i++)
387+
put_page(sg_page(&rm->data.op_sg[i]));
388+
mmp = &rm->data.op_mmp_znotifier->z_mmp;
389+
mm_unaccount_pinned_pages(mmp);
390+
ret = -EFAULT;
391+
goto err;
392+
}
393+
total_copied += copied;
394+
iov_iter_advance(from, copied);
395+
length -= copied;
396+
sg_set_page(sg, pages, copied, start);
397+
rm->data.op_nents++;
398+
sg++;
399+
}
400+
WARN_ON_ONCE(length != 0);
401+
return ret;
402+
err:
403+
consume_skb(skb);
404+
rm->data.op_mmp_znotifier = NULL;
405+
return ret;
406+
} /* zcopy */
407+
284408
while (iov_iter_count(from)) {
285409
if (!sg_page(sg)) {
286410
ret = rds_page_remainder_alloc(sg, iov_iter_count(from),

net/rds/rds.h

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,19 @@ static inline u32 rds_rdma_cookie_offset(rds_rdma_cookie_t cookie)
356356
#define RDS_MSG_PAGEVEC 7
357357
#define RDS_MSG_FLUSH 8
358358

359+
struct rds_znotifier {
360+
struct list_head z_list;
361+
struct mmpin z_mmp;
362+
u32 z_cookie;
363+
};
364+
365+
#define RDS_ZCOPY_SKB(__skb) ((struct rds_znotifier *)&((__skb)->cb[0]))
366+
367+
static inline struct sk_buff *rds_skb_from_znotifier(struct rds_znotifier *z)
368+
{
369+
return container_of((void *)z, struct sk_buff, cb);
370+
}
371+
359372
struct rds_message {
360373
refcount_t m_refcount;
361374
struct list_head m_sock_item;
@@ -436,6 +449,7 @@ struct rds_message {
436449
unsigned int op_count;
437450
unsigned int op_dmasg;
438451
unsigned int op_dmaoff;
452+
struct rds_znotifier *op_mmp_znotifier;
439453
struct scatterlist *op_sg;
440454
} data;
441455
};
@@ -771,7 +785,8 @@ rds_conn_connecting(struct rds_connection *conn)
771785
/* message.c */
772786
struct rds_message *rds_message_alloc(unsigned int nents, gfp_t gfp);
773787
struct scatterlist *rds_message_alloc_sgs(struct rds_message *rm, int nents);
774-
int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from);
788+
int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from,
789+
bool zcopy);
775790
struct rds_message *rds_message_map_pages(unsigned long *page_addrs, unsigned int total_len);
776791
void rds_message_populate_header(struct rds_header *hdr, __be16 sport,
777792
__be16 dport, u64 seq);

net/rds/recv.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -594,6 +594,8 @@ int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
594594

595595
if (msg_flags & MSG_OOB)
596596
goto out;
597+
if (msg_flags & MSG_ERRQUEUE)
598+
return sock_recv_errqueue(sk, msg, size, SOL_IP, IP_RECVERR);
597599

598600
while (1) {
599601
/* If there are pending notifications, do those - and nothing else */

0 commit comments

Comments
 (0)