Skip to content

Commit 8b9558a

Browse files
ukernelidryomov
authored andcommitted
libceph: use keepalive2 to verify the mon session is alive
Signed-off-by: Yan, Zheng <zyan@redhat.com> Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
1 parent d194cd1 commit 8b9558a

File tree

6 files changed

+93
-14
lines changed

6 files changed

+93
-14
lines changed

include/linux/ceph/libceph.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ struct ceph_options {
4646
unsigned long mount_timeout; /* jiffies */
4747
unsigned long osd_idle_ttl; /* jiffies */
4848
unsigned long osd_keepalive_timeout; /* jiffies */
49+
unsigned long monc_ping_timeout; /* jiffies */
4950

5051
/*
5152
* any type that can't be simply compared or doesn't need need
@@ -66,6 +67,7 @@ struct ceph_options {
6667
#define CEPH_MOUNT_TIMEOUT_DEFAULT msecs_to_jiffies(60 * 1000)
6768
#define CEPH_OSD_KEEPALIVE_DEFAULT msecs_to_jiffies(5 * 1000)
6869
#define CEPH_OSD_IDLE_TTL_DEFAULT msecs_to_jiffies(60 * 1000)
70+
#define CEPH_MONC_PING_TIMEOUT_DEFAULT msecs_to_jiffies(30 * 1000)
6971

7072
#define CEPH_MSG_MAX_FRONT_LEN (16*1024*1024)
7173
#define CEPH_MSG_MAX_MIDDLE_LEN (16*1024*1024)

include/linux/ceph/messenger.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,8 @@ struct ceph_connection {
248248
int in_base_pos; /* bytes read */
249249
__le64 in_temp_ack; /* for reading an ack */
250250

251+
struct timespec last_keepalive_ack;
252+
251253
struct delayed_work work; /* send|recv work */
252254
unsigned long delay; /* current delay interval */
253255
};
@@ -285,6 +287,8 @@ extern void ceph_msg_revoke(struct ceph_msg *msg);
285287
extern void ceph_msg_revoke_incoming(struct ceph_msg *msg);
286288

287289
extern void ceph_con_keepalive(struct ceph_connection *con);
290+
extern bool ceph_con_keepalive_expired(struct ceph_connection *con,
291+
unsigned long interval);
288292

289293
extern void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
290294
size_t length, size_t alignment);

include/linux/ceph/msgr.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,12 @@ struct ceph_entity_inst {
8484
#define CEPH_MSGR_TAG_MSG 7 /* message */
8585
#define CEPH_MSGR_TAG_ACK 8 /* message ack */
8686
#define CEPH_MSGR_TAG_KEEPALIVE 9 /* just a keepalive byte! */
87-
#define CEPH_MSGR_TAG_BADPROTOVER 10 /* bad protocol version */
87+
#define CEPH_MSGR_TAG_BADPROTOVER 10 /* bad protocol version */
8888
#define CEPH_MSGR_TAG_BADAUTHORIZER 11 /* bad authorizer */
8989
#define CEPH_MSGR_TAG_FEATURES 12 /* insufficient features */
9090
#define CEPH_MSGR_TAG_SEQ 13 /* 64-bit int follows with seen seq number */
91+
#define CEPH_MSGR_TAG_KEEPALIVE2 14 /* keepalive2 byte + ceph_timespec */
92+
#define CEPH_MSGR_TAG_KEEPALIVE2_ACK 15 /* keepalive2 reply */
9193

9294

9395
/*

net/ceph/ceph_common.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,7 @@ ceph_parse_options(char *options, const char *dev_name,
357357
opt->osd_keepalive_timeout = CEPH_OSD_KEEPALIVE_DEFAULT;
358358
opt->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT;
359359
opt->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT;
360+
opt->monc_ping_timeout = CEPH_MONC_PING_TIMEOUT_DEFAULT;
360361

361362
/* get mon ip(s) */
362363
/* ip1[:port1][,ip2[:port2]...] */

net/ceph/messenger.c

Lines changed: 54 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ static struct kmem_cache *ceph_msg_data_cache;
163163
static char tag_msg = CEPH_MSGR_TAG_MSG;
164164
static char tag_ack = CEPH_MSGR_TAG_ACK;
165165
static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE;
166+
static char tag_keepalive2 = CEPH_MSGR_TAG_KEEPALIVE2;
166167

167168
#ifdef CONFIG_LOCKDEP
168169
static struct lock_class_key socket_class;
@@ -1351,7 +1352,15 @@ static void prepare_write_keepalive(struct ceph_connection *con)
13511352
{
13521353
dout("prepare_write_keepalive %p\n", con);
13531354
con_out_kvec_reset(con);
1354-
con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive);
1355+
if (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2) {
1356+
struct timespec ts = CURRENT_TIME;
1357+
struct ceph_timespec ceph_ts;
1358+
ceph_encode_timespec(&ceph_ts, &ts);
1359+
con_out_kvec_add(con, sizeof(tag_keepalive2), &tag_keepalive2);
1360+
con_out_kvec_add(con, sizeof(ceph_ts), &ceph_ts);
1361+
} else {
1362+
con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive);
1363+
}
13551364
con_flag_set(con, CON_FLAG_WRITE_PENDING);
13561365
}
13571366

@@ -1625,6 +1634,12 @@ static void prepare_read_tag(struct ceph_connection *con)
16251634
con->in_tag = CEPH_MSGR_TAG_READY;
16261635
}
16271636

1637+
static void prepare_read_keepalive_ack(struct ceph_connection *con)
1638+
{
1639+
dout("prepare_read_keepalive_ack %p\n", con);
1640+
con->in_base_pos = 0;
1641+
}
1642+
16281643
/*
16291644
* Prepare to read a message.
16301645
*/
@@ -2457,6 +2472,17 @@ static void process_message(struct ceph_connection *con)
24572472
mutex_lock(&con->mutex);
24582473
}
24592474

2475+
static int read_keepalive_ack(struct ceph_connection *con)
2476+
{
2477+
struct ceph_timespec ceph_ts;
2478+
size_t size = sizeof(ceph_ts);
2479+
int ret = read_partial(con, size, size, &ceph_ts);
2480+
if (ret <= 0)
2481+
return ret;
2482+
ceph_decode_timespec(&con->last_keepalive_ack, &ceph_ts);
2483+
prepare_read_tag(con);
2484+
return 1;
2485+
}
24602486

24612487
/*
24622488
* Write something to the socket. Called in a worker thread when the
@@ -2526,6 +2552,10 @@ static int try_write(struct ceph_connection *con)
25262552

25272553
do_next:
25282554
if (con->state == CON_STATE_OPEN) {
2555+
if (con_flag_test_and_clear(con, CON_FLAG_KEEPALIVE_PENDING)) {
2556+
prepare_write_keepalive(con);
2557+
goto more;
2558+
}
25292559
/* is anything else pending? */
25302560
if (!list_empty(&con->out_queue)) {
25312561
prepare_write_message(con);
@@ -2535,10 +2565,6 @@ static int try_write(struct ceph_connection *con)
25352565
prepare_write_ack(con);
25362566
goto more;
25372567
}
2538-
if (con_flag_test_and_clear(con, CON_FLAG_KEEPALIVE_PENDING)) {
2539-
prepare_write_keepalive(con);
2540-
goto more;
2541-
}
25422568
}
25432569

25442570
/* Nothing to do! */
@@ -2641,6 +2667,9 @@ static int try_read(struct ceph_connection *con)
26412667
case CEPH_MSGR_TAG_ACK:
26422668
prepare_read_ack(con);
26432669
break;
2670+
case CEPH_MSGR_TAG_KEEPALIVE2_ACK:
2671+
prepare_read_keepalive_ack(con);
2672+
break;
26442673
case CEPH_MSGR_TAG_CLOSE:
26452674
con_close_socket(con);
26462675
con->state = CON_STATE_CLOSED;
@@ -2684,6 +2713,12 @@ static int try_read(struct ceph_connection *con)
26842713
process_ack(con);
26852714
goto more;
26862715
}
2716+
if (con->in_tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
2717+
ret = read_keepalive_ack(con);
2718+
if (ret <= 0)
2719+
goto out;
2720+
goto more;
2721+
}
26872722

26882723
out:
26892724
dout("try_read done on %p ret %d\n", con, ret);
@@ -3101,6 +3136,20 @@ void ceph_con_keepalive(struct ceph_connection *con)
31013136
}
31023137
EXPORT_SYMBOL(ceph_con_keepalive);
31033138

3139+
bool ceph_con_keepalive_expired(struct ceph_connection *con,
3140+
unsigned long interval)
3141+
{
3142+
if (interval > 0 &&
3143+
(con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2)) {
3144+
struct timespec now = CURRENT_TIME;
3145+
struct timespec ts;
3146+
jiffies_to_timespec(interval, &ts);
3147+
ts = timespec_add(con->last_keepalive_ack, ts);
3148+
return timespec_compare(&now, &ts) >= 0;
3149+
}
3150+
return false;
3151+
}
3152+
31043153
static struct ceph_msg_data *ceph_msg_data_create(enum ceph_msg_data_type type)
31053154
{
31063155
struct ceph_msg_data *data;

net/ceph/mon_client.c

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,10 @@ static int __open_session(struct ceph_mon_client *monc)
149149
CEPH_ENTITY_TYPE_MON, monc->cur_mon,
150150
&monc->monmap->mon_inst[monc->cur_mon].addr);
151151

152+
/* send an initial keepalive to ensure our timestamp is
153+
* valid by the time we are in an OPENED state */
154+
ceph_con_keepalive(&monc->con);
155+
152156
/* initiatiate authentication handshake */
153157
ret = ceph_auth_build_hello(monc->auth,
154158
monc->m_auth->front.iov_base,
@@ -170,14 +174,19 @@ static bool __sub_expired(struct ceph_mon_client *monc)
170174
*/
171175
static void __schedule_delayed(struct ceph_mon_client *monc)
172176
{
173-
unsigned int delay;
177+
struct ceph_options *opt = monc->client->options;
178+
unsigned long delay;
174179

175-
if (monc->cur_mon < 0 || __sub_expired(monc))
180+
if (monc->cur_mon < 0 || __sub_expired(monc)) {
176181
delay = 10 * HZ;
177-
else
182+
} else {
178183
delay = 20 * HZ;
179-
dout("__schedule_delayed after %u\n", delay);
180-
schedule_delayed_work(&monc->delayed_work, delay);
184+
if (opt->monc_ping_timeout > 0)
185+
delay = min(delay, opt->monc_ping_timeout / 3);
186+
}
187+
dout("__schedule_delayed after %lu\n", delay);
188+
schedule_delayed_work(&monc->delayed_work,
189+
round_jiffies_relative(delay));
181190
}
182191

183192
/*
@@ -743,11 +752,23 @@ static void delayed_work(struct work_struct *work)
743752
__close_session(monc);
744753
__open_session(monc); /* continue hunting */
745754
} else {
746-
ceph_con_keepalive(&monc->con);
755+
struct ceph_options *opt = monc->client->options;
756+
int is_auth = ceph_auth_is_authenticated(monc->auth);
757+
if (ceph_con_keepalive_expired(&monc->con,
758+
opt->monc_ping_timeout)) {
759+
dout("monc keepalive timeout\n");
760+
is_auth = 0;
761+
__close_session(monc);
762+
monc->hunting = true;
763+
__open_session(monc);
764+
}
747765

748-
__validate_auth(monc);
766+
if (!monc->hunting) {
767+
ceph_con_keepalive(&monc->con);
768+
__validate_auth(monc);
769+
}
749770

750-
if (ceph_auth_is_authenticated(monc->auth))
771+
if (is_auth)
751772
__send_subscribe(monc);
752773
}
753774
__schedule_delayed(monc);

0 commit comments

Comments
 (0)