Skip to content

Commit a4ed38d

Browse files
Douglas Fulleridryomov
authored andcommitted
libceph: support for CEPH_OSD_OP_LIST_WATCHERS
Add support for this Ceph OSD op, needed to support the RBD exclusive lock feature. Signed-off-by: Douglas Fuller <dfuller@redhat.com> [idryomov@gmail.com: refactor, misc fixes throughout] Signed-off-by: Ilya Dryomov <idryomov@gmail.com> Reviewed-by: Mike Christie <mchristi@redhat.com> Reviewed-by: Alex Elder <elder@linaro.org>
1 parent f01d5cb commit a4ed38d

File tree

2 files changed

+131
-1
lines changed

2 files changed

+131
-1
lines changed

include/linux/ceph/osd_client.h

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,9 @@ struct ceph_osd_req_op {
120120
struct ceph_osd_data request_data;
121121
struct ceph_osd_data response_data;
122122
} notify;
123+
struct {
124+
struct ceph_osd_data response_data;
125+
} list_watchers;
123126
struct {
124127
u64 expected_object_size;
125128
u64 expected_write_size;
@@ -249,6 +252,12 @@ struct ceph_osd_linger_request {
249252
size_t *preply_len;
250253
};
251254

255+
struct ceph_watch_item {
256+
struct ceph_entity_name name;
257+
u64 cookie;
258+
struct ceph_entity_addr addr;
259+
};
260+
252261
struct ceph_osd_client {
253262
struct ceph_client *client;
254263

@@ -346,7 +355,6 @@ extern void osd_req_op_cls_response_data_pages(struct ceph_osd_request *,
346355
struct page **pages, u64 length,
347356
u32 alignment, bool pages_from_pool,
348357
bool own_pages);
349-
350358
extern void osd_req_op_cls_init(struct ceph_osd_request *osd_req,
351359
unsigned int which, u16 opcode,
352360
const char *class, const char *method);
@@ -434,5 +442,10 @@ int ceph_osdc_notify(struct ceph_osd_client *osdc,
434442
size_t *preply_len);
435443
int ceph_osdc_watch_check(struct ceph_osd_client *osdc,
436444
struct ceph_osd_linger_request *lreq);
445+
int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
446+
struct ceph_object_id *oid,
447+
struct ceph_object_locator *oloc,
448+
struct ceph_watch_item **watchers,
449+
u32 *num_watchers);
437450
#endif
438451

net/ceph/osd_client.c

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,9 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
338338
ceph_osd_data_release(&op->notify.request_data);
339339
ceph_osd_data_release(&op->notify.response_data);
340340
break;
341+
case CEPH_OSD_OP_LIST_WATCHERS:
342+
ceph_osd_data_release(&op->list_watchers.response_data);
343+
break;
341344
default:
342345
break;
343346
}
@@ -863,6 +866,8 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst,
863866
case CEPH_OSD_OP_NOTIFY:
864867
dst->notify.cookie = cpu_to_le64(src->notify.cookie);
865868
break;
869+
case CEPH_OSD_OP_LIST_WATCHERS:
870+
break;
866871
case CEPH_OSD_OP_SETALLOCHINT:
867872
dst->alloc_hint.expected_object_size =
868873
cpu_to_le64(src->alloc_hint.expected_object_size);
@@ -1445,6 +1450,10 @@ static void setup_request_data(struct ceph_osd_request *req,
14451450
ceph_osdc_msg_data_add(req->r_reply,
14461451
&op->extent.osd_data);
14471452
break;
1453+
case CEPH_OSD_OP_LIST_WATCHERS:
1454+
ceph_osdc_msg_data_add(req->r_reply,
1455+
&op->list_watchers.response_data);
1456+
break;
14481457

14491458
/* both */
14501459
case CEPH_OSD_OP_CALL:
@@ -3891,6 +3900,114 @@ int ceph_osdc_watch_check(struct ceph_osd_client *osdc,
38913900
return ret;
38923901
}
38933902

3903+
static int decode_watcher(void **p, void *end, struct ceph_watch_item *item)
3904+
{
3905+
u8 struct_v;
3906+
u32 struct_len;
3907+
int ret;
3908+
3909+
ret = ceph_start_decoding(p, end, 2, "watch_item_t",
3910+
&struct_v, &struct_len);
3911+
if (ret)
3912+
return ret;
3913+
3914+
ceph_decode_copy(p, &item->name, sizeof(item->name));
3915+
item->cookie = ceph_decode_64(p);
3916+
*p += 4; /* skip timeout_seconds */
3917+
if (struct_v >= 2) {
3918+
ceph_decode_copy(p, &item->addr, sizeof(item->addr));
3919+
ceph_decode_addr(&item->addr);
3920+
}
3921+
3922+
dout("%s %s%llu cookie %llu addr %s\n", __func__,
3923+
ENTITY_NAME(item->name), item->cookie,
3924+
ceph_pr_addr(&item->addr.in_addr));
3925+
return 0;
3926+
}
3927+
3928+
static int decode_watchers(void **p, void *end,
3929+
struct ceph_watch_item **watchers,
3930+
u32 *num_watchers)
3931+
{
3932+
u8 struct_v;
3933+
u32 struct_len;
3934+
int i;
3935+
int ret;
3936+
3937+
ret = ceph_start_decoding(p, end, 1, "obj_list_watch_response_t",
3938+
&struct_v, &struct_len);
3939+
if (ret)
3940+
return ret;
3941+
3942+
*num_watchers = ceph_decode_32(p);
3943+
*watchers = kcalloc(*num_watchers, sizeof(**watchers), GFP_NOIO);
3944+
if (!*watchers)
3945+
return -ENOMEM;
3946+
3947+
for (i = 0; i < *num_watchers; i++) {
3948+
ret = decode_watcher(p, end, *watchers + i);
3949+
if (ret) {
3950+
kfree(*watchers);
3951+
return ret;
3952+
}
3953+
}
3954+
3955+
return 0;
3956+
}
3957+
3958+
/*
3959+
* On success, the caller is responsible for:
3960+
*
3961+
* kfree(watchers);
3962+
*/
3963+
int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
3964+
struct ceph_object_id *oid,
3965+
struct ceph_object_locator *oloc,
3966+
struct ceph_watch_item **watchers,
3967+
u32 *num_watchers)
3968+
{
3969+
struct ceph_osd_request *req;
3970+
struct page **pages;
3971+
int ret;
3972+
3973+
req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
3974+
if (!req)
3975+
return -ENOMEM;
3976+
3977+
ceph_oid_copy(&req->r_base_oid, oid);
3978+
ceph_oloc_copy(&req->r_base_oloc, oloc);
3979+
req->r_flags = CEPH_OSD_FLAG_READ;
3980+
3981+
ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
3982+
if (ret)
3983+
goto out_put_req;
3984+
3985+
pages = ceph_alloc_page_vector(1, GFP_NOIO);
3986+
if (IS_ERR(pages)) {
3987+
ret = PTR_ERR(pages);
3988+
goto out_put_req;
3989+
}
3990+
3991+
osd_req_op_init(req, 0, CEPH_OSD_OP_LIST_WATCHERS, 0);
3992+
ceph_osd_data_pages_init(osd_req_op_data(req, 0, list_watchers,
3993+
response_data),
3994+
pages, PAGE_SIZE, 0, false, true);
3995+
3996+
ceph_osdc_start_request(osdc, req, false);
3997+
ret = ceph_osdc_wait_request(osdc, req);
3998+
if (ret >= 0) {
3999+
void *p = page_address(pages[0]);
4000+
void *const end = p + req->r_ops[0].outdata_len;
4001+
4002+
ret = decode_watchers(&p, end, watchers, num_watchers);
4003+
}
4004+
4005+
out_put_req:
4006+
ceph_osdc_put_request(req);
4007+
return ret;
4008+
}
4009+
EXPORT_SYMBOL(ceph_osdc_list_watchers);
4010+
38944011
/*
38954012
* Call all pending notify callbacks - for use after a watch is
38964013
* unregistered, to make sure no more callbacks for it will be invoked

0 commit comments

Comments
 (0)