Skip to content

Commit 6644ed7

Browse files
Alex ElderSage Weil
authored andcommitted
libceph: make message data be a pointer
Begin the transition from a single message data item to a list of them by replacing the "data" structure in a message with a pointer to a ceph_msg_data structure. A null pointer will indicate the message has no data; replace the use of ceph_msg_has_data() with a simple check for a null pointer. Create functions ceph_msg_data_create() and ceph_msg_data_destroy() to dynamically allocate and free a data item structure of a given type. When a message has its data item "set," allocate one of these to hold the data description, and free it when the last reference to the message is dropped. This partially resolves: http://tracker.ceph.com/issues/4429 Signed-off-by: Alex Elder <elder@inktank.com> Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
1 parent 8ea299b commit 6644ed7

File tree

2 files changed

+62
-37
lines changed

2 files changed

+62
-37
lines changed

include/linux/ceph/messenger.h

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,6 @@ struct ceph_messenger {
6464
u32 required_features;
6565
};
6666

67-
#define ceph_msg_has_data(m) ((m)->data.type != CEPH_MSG_DATA_NONE)
68-
6967
enum ceph_msg_data_type {
7068
CEPH_MSG_DATA_NONE, /* message contains no data payload */
7169
CEPH_MSG_DATA_PAGES, /* data source/destination is a page array */
@@ -141,8 +139,7 @@ struct ceph_msg {
141139
struct kvec front; /* unaligned blobs of message */
142140
struct ceph_buffer *middle;
143141

144-
/* data payload */
145-
struct ceph_msg_data data;
142+
struct ceph_msg_data *data; /* data payload */
146143

147144
struct ceph_connection *con;
148145
struct list_head list_head; /* links for connection lists */

net/ceph/messenger.c

Lines changed: 61 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1086,7 +1086,7 @@ static void prepare_message_data(struct ceph_msg *msg)
10861086

10871087
/* Initialize data cursor */
10881088

1089-
ceph_msg_data_cursor_init(&msg->data, data_len);
1089+
ceph_msg_data_cursor_init(msg->data, data_len);
10901090
}
10911091

10921092
/*
@@ -1406,13 +1406,13 @@ static u32 ceph_crc32c_page(u32 crc, struct page *page,
14061406
static int write_partial_message_data(struct ceph_connection *con)
14071407
{
14081408
struct ceph_msg *msg = con->out_msg;
1409-
struct ceph_msg_data_cursor *cursor = &msg->data.cursor;
1409+
struct ceph_msg_data_cursor *cursor = &msg->data->cursor;
14101410
bool do_datacrc = !con->msgr->nocrc;
14111411
u32 crc;
14121412

14131413
dout("%s %p msg %p\n", __func__, con, msg);
14141414

1415-
if (WARN_ON(!ceph_msg_has_data(msg)))
1415+
if (WARN_ON(!msg->data))
14161416
return -EINVAL;
14171417

14181418
/*
@@ -1432,7 +1432,7 @@ static int write_partial_message_data(struct ceph_connection *con)
14321432
bool need_crc;
14331433
int ret;
14341434

1435-
page = ceph_msg_data_next(&msg->data, &page_offset, &length,
1435+
page = ceph_msg_data_next(msg->data, &page_offset, &length,
14361436
&last_piece);
14371437
ret = ceph_tcp_sendpage(con->sock, page, page_offset,
14381438
length, last_piece);
@@ -1444,7 +1444,7 @@ static int write_partial_message_data(struct ceph_connection *con)
14441444
}
14451445
if (do_datacrc && cursor->need_crc)
14461446
crc = ceph_crc32c_page(crc, page, page_offset, length);
1447-
need_crc = ceph_msg_data_advance(&msg->data, (size_t) ret);
1447+
need_crc = ceph_msg_data_advance(msg->data, (size_t)ret);
14481448
}
14491449

14501450
dout("%s %p msg %p done\n", __func__, con, msg);
@@ -2104,7 +2104,7 @@ static int read_partial_message_section(struct ceph_connection *con,
21042104
static int read_partial_msg_data(struct ceph_connection *con)
21052105
{
21062106
struct ceph_msg *msg = con->in_msg;
2107-
struct ceph_msg_data_cursor *cursor = &msg->data.cursor;
2107+
struct ceph_msg_data_cursor *cursor = &msg->data->cursor;
21082108
const bool do_datacrc = !con->msgr->nocrc;
21092109
struct page *page;
21102110
size_t page_offset;
@@ -2113,13 +2113,13 @@ static int read_partial_msg_data(struct ceph_connection *con)
21132113
int ret;
21142114

21152115
BUG_ON(!msg);
2116-
if (WARN_ON(!ceph_msg_has_data(msg)))
2116+
if (!msg->data)
21172117
return -EIO;
21182118

21192119
if (do_datacrc)
21202120
crc = con->in_data_crc;
21212121
while (cursor->resid) {
2122-
page = ceph_msg_data_next(&msg->data, &page_offset, &length,
2122+
page = ceph_msg_data_next(msg->data, &page_offset, &length,
21232123
NULL);
21242124
ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
21252125
if (ret <= 0) {
@@ -2131,7 +2131,7 @@ static int read_partial_msg_data(struct ceph_connection *con)
21312131

21322132
if (do_datacrc)
21332133
crc = ceph_crc32c_page(crc, page, page_offset, ret);
2134-
(void) ceph_msg_data_advance(&msg->data, (size_t) ret);
2134+
(void) ceph_msg_data_advance(msg->data, (size_t)ret);
21352135
}
21362136
if (do_datacrc)
21372137
con->in_data_crc = crc;
@@ -2947,44 +2947,80 @@ void ceph_con_keepalive(struct ceph_connection *con)
29472947
}
29482948
EXPORT_SYMBOL(ceph_con_keepalive);
29492949

2950-
static void ceph_msg_data_init(struct ceph_msg_data *data)
2950+
static struct ceph_msg_data *ceph_msg_data_create(enum ceph_msg_data_type type)
29512951
{
2952-
data->type = CEPH_MSG_DATA_NONE;
2952+
struct ceph_msg_data *data;
2953+
2954+
if (WARN_ON(!ceph_msg_data_type_valid(type)))
2955+
return NULL;
2956+
2957+
data = kzalloc(sizeof (*data), GFP_NOFS);
2958+
if (data)
2959+
data->type = type;
2960+
2961+
return data;
2962+
}
2963+
2964+
static void ceph_msg_data_destroy(struct ceph_msg_data *data)
2965+
{
2966+
if (!data)
2967+
return;
2968+
2969+
if (data->type == CEPH_MSG_DATA_PAGELIST) {
2970+
ceph_pagelist_release(data->pagelist);
2971+
kfree(data->pagelist);
2972+
}
2973+
kfree(data);
29532974
}
29542975

29552976
void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages,
29562977
size_t length, size_t alignment)
29572978
{
2979+
struct ceph_msg_data *data;
2980+
29582981
BUG_ON(!pages);
29592982
BUG_ON(!length);
2960-
BUG_ON(msg->data.type != CEPH_MSG_DATA_NONE);
2983+
BUG_ON(msg->data != NULL);
2984+
2985+
data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES);
2986+
BUG_ON(!data);
2987+
data->pages = pages;
2988+
data->length = length;
2989+
data->alignment = alignment & ~PAGE_MASK;
29612990

2962-
msg->data.type = CEPH_MSG_DATA_PAGES;
2963-
msg->data.pages = pages;
2964-
msg->data.length = length;
2965-
msg->data.alignment = alignment & ~PAGE_MASK;
2991+
msg->data = data;
29662992
}
29672993
EXPORT_SYMBOL(ceph_msg_data_set_pages);
29682994

29692995
void ceph_msg_data_set_pagelist(struct ceph_msg *msg,
29702996
struct ceph_pagelist *pagelist)
29712997
{
2998+
struct ceph_msg_data *data;
2999+
29723000
BUG_ON(!pagelist);
29733001
BUG_ON(!pagelist->length);
2974-
BUG_ON(msg->data.type != CEPH_MSG_DATA_NONE);
3002+
BUG_ON(msg->data != NULL);
29753003

2976-
msg->data.type = CEPH_MSG_DATA_PAGELIST;
2977-
msg->data.pagelist = pagelist;
3004+
data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST);
3005+
BUG_ON(!data);
3006+
data->pagelist = pagelist;
3007+
3008+
msg->data = data;
29783009
}
29793010
EXPORT_SYMBOL(ceph_msg_data_set_pagelist);
29803011

29813012
void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio)
29823013
{
3014+
struct ceph_msg_data *data;
3015+
29833016
BUG_ON(!bio);
2984-
BUG_ON(msg->data.type != CEPH_MSG_DATA_NONE);
3017+
BUG_ON(msg->data != NULL);
29853018

2986-
msg->data.type = CEPH_MSG_DATA_BIO;
2987-
msg->data.bio = bio;
3019+
data = ceph_msg_data_create(CEPH_MSG_DATA_BIO);
3020+
BUG_ON(!data);
3021+
data->bio = bio;
3022+
3023+
msg->data = data;
29883024
}
29893025
EXPORT_SYMBOL(ceph_msg_data_set_bio);
29903026

@@ -3008,8 +3044,6 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
30083044
INIT_LIST_HEAD(&m->list_head);
30093045
kref_init(&m->kref);
30103046

3011-
ceph_msg_data_init(&m->data);
3012-
30133047
/* front */
30143048
m->front_max = front_len;
30153049
if (front_len) {
@@ -3163,14 +3197,8 @@ void ceph_msg_last_put(struct kref *kref)
31633197
ceph_buffer_put(m->middle);
31643198
m->middle = NULL;
31653199
}
3166-
if (ceph_msg_has_data(m)) {
3167-
if (m->data.type == CEPH_MSG_DATA_PAGELIST) {
3168-
ceph_pagelist_release(m->data.pagelist);
3169-
kfree(m->data.pagelist);
3170-
}
3171-
memset(&m->data, 0, sizeof m->data);
3172-
ceph_msg_data_init(&m->data);
3173-
}
3200+
ceph_msg_data_destroy(m->data);
3201+
m->data = NULL;
31743202

31753203
if (m->pool)
31763204
ceph_msgpool_put(m->pool, m);
@@ -3182,7 +3210,7 @@ EXPORT_SYMBOL(ceph_msg_last_put);
31823210
void ceph_msg_dump(struct ceph_msg *msg)
31833211
{
31843212
pr_debug("msg_dump %p (front_max %d length %zd)\n", msg,
3185-
msg->front_max, msg->data.length);
3213+
msg->front_max, msg->data->length);
31863214
print_hex_dump(KERN_DEBUG, "header: ",
31873215
DUMP_PREFIX_OFFSET, 16, 1,
31883216
&msg->hdr, sizeof(msg->hdr), true);

0 commit comments

Comments
 (0)