Skip to content

Commit 50100a5

Browse files
Jon Paul Maloydavem330
authored andcommitted
tipc: use pseudo message to wake up sockets after link congestion
The current link implementation keeps a linked list of blocked ports/ sockets that is populated when there is link congestion. The purpose of this is to let the link know which users to wake up when the congestion abates. This adds unnecessary complexity to the data structure and the code, since it forces us to involve the link each time we want to delete a socket. It also forces us to grab the spinlock port_lock within the scope of node_lock. We want to get rid of this direct dependence, as well as the deadlock hazard resulting from the usage of port_lock. In this commit, we instead let the link keep list of a "wakeup" pseudo messages for use in such situations. Those messages are sent to the pending sockets via the ordinary message reception path, and wake up the socket's owner when they are received. This enables us to get rid of the 'waiting_ports' linked lists in struct tipc_port that manifest this direct reference. As a consequence, we can eliminate another BH entry into the socket, and hence the need to grab port_lock. This is a further step in our effort to remove port_lock altogether. Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Reviewed-by: Erik Hugne <erik.hugne@ericsson.com> Reviewed-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: David S. Miller <davem@davemloft.net>
1 parent 1dd0bd2 commit 50100a5

File tree

12 files changed

+99
-91
lines changed

12 files changed

+99
-91
lines changed

net/tipc/bcast.c

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -300,8 +300,8 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
300300
tipc_link_push_queue(bcl);
301301
bclink_set_last_sent();
302302
}
303-
if (unlikely(released && !list_empty(&bcl->waiting_ports)))
304-
tipc_link_wakeup_ports(bcl, 0);
303+
if (unlikely(released && !skb_queue_empty(&bcl->waiting_sks)))
304+
bclink->node.action_flags |= TIPC_WAKEUP_USERS;
305305
exit:
306306
tipc_bclink_unlock();
307307
}
@@ -840,9 +840,10 @@ int tipc_bclink_init(void)
840840
sprintf(bcbearer->media.name, "tipc-broadcast");
841841

842842
spin_lock_init(&bclink->lock);
843-
INIT_LIST_HEAD(&bcl->waiting_ports);
843+
__skb_queue_head_init(&bcl->waiting_sks);
844844
bcl->next_out_no = 1;
845845
spin_lock_init(&bclink->node.lock);
846+
__skb_queue_head_init(&bclink->node.waiting_sks);
846847
bcl->owner = &bclink->node;
847848
bcl->max_pkt = MAX_PKT_DEFAULT_MCAST;
848849
tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT);

net/tipc/core.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,8 +187,11 @@ static inline void k_term_timer(struct timer_list *timer)
187187

188188
struct tipc_skb_cb {
189189
void *handle;
190-
bool deferred;
191190
struct sk_buff *tail;
191+
bool deferred;
192+
bool wakeup_pending;
193+
u16 chain_sz;
194+
u16 chain_imp;
192195
};
193196

194197
#define TIPC_SKB_CB(__skb) ((struct tipc_skb_cb *)&((__skb)->cb[0]))

net/tipc/link.c

Lines changed: 54 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
275275
link_init_max_pkt(l_ptr);
276276

277277
l_ptr->next_out_no = 1;
278-
INIT_LIST_HEAD(&l_ptr->waiting_ports);
278+
__skb_queue_head_init(&l_ptr->waiting_sks);
279279

280280
link_reset_statistics(l_ptr);
281281

@@ -322,66 +322,47 @@ void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down)
322322
}
323323

324324
/**
325-
* link_schedule_port - schedule port for deferred sending
326-
* @l_ptr: pointer to link
327-
* @origport: reference to sending port
328-
* @sz: amount of data to be sent
329-
*
330-
* Schedules port for renewed sending of messages after link congestion
331-
* has abated.
325+
* link_schedule_user - schedule user for wakeup after congestion
326+
* @link: congested link
327+
* @oport: sending port
328+
* @chain_sz: size of buffer chain that was attempted sent
329+
* @imp: importance of message attempted sent
330+
* Create pseudo msg to send back to user when congestion abates
332331
*/
333-
static int link_schedule_port(struct tipc_link *l_ptr, u32 origport, u32 sz)
332+
static bool link_schedule_user(struct tipc_link *link, u32 oport,
333+
uint chain_sz, uint imp)
334334
{
335-
struct tipc_port *p_ptr;
336-
struct tipc_sock *tsk;
335+
struct sk_buff *buf;
337336

338-
spin_lock_bh(&tipc_port_list_lock);
339-
p_ptr = tipc_port_lock(origport);
340-
if (p_ptr) {
341-
if (!list_empty(&p_ptr->wait_list))
342-
goto exit;
343-
tsk = tipc_port_to_sock(p_ptr);
344-
tsk->link_cong = 1;
345-
p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt);
346-
list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports);
347-
l_ptr->stats.link_congs++;
348-
exit:
349-
tipc_port_unlock(p_ptr);
350-
}
351-
spin_unlock_bh(&tipc_port_list_lock);
352-
return -ELINKCONG;
337+
buf = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0, tipc_own_addr,
338+
tipc_own_addr, oport, 0, 0);
339+
if (!buf)
340+
return false;
341+
TIPC_SKB_CB(buf)->chain_sz = chain_sz;
342+
TIPC_SKB_CB(buf)->chain_imp = imp;
343+
__skb_queue_tail(&link->waiting_sks, buf);
344+
link->stats.link_congs++;
345+
return true;
353346
}
354347

355-
void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all)
348+
/**
349+
* link_prepare_wakeup - prepare users for wakeup after congestion
350+
* @link: congested link
351+
* Move a number of waiting users, as permitted by available space in
352+
* the send queue, from link wait queue to node wait queue for wakeup
353+
*/
354+
static void link_prepare_wakeup(struct tipc_link *link)
356355
{
357-
struct tipc_port *p_ptr;
358-
struct tipc_sock *tsk;
359-
struct tipc_port *temp_p_ptr;
360-
int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size;
361-
362-
if (all)
363-
win = 100000;
364-
if (win <= 0)
365-
return;
366-
if (!spin_trylock_bh(&tipc_port_list_lock))
367-
return;
368-
if (link_congested(l_ptr))
369-
goto exit;
370-
list_for_each_entry_safe(p_ptr, temp_p_ptr, &l_ptr->waiting_ports,
371-
wait_list) {
372-
if (win <= 0)
356+
struct sk_buff_head *wq = &link->waiting_sks;
357+
struct sk_buff *buf;
358+
uint pend_qsz = link->out_queue_size;
359+
360+
for (buf = skb_peek(wq); buf; buf = skb_peek(wq)) {
361+
if (pend_qsz >= link->queue_limit[TIPC_SKB_CB(buf)->chain_imp])
373362
break;
374-
tsk = tipc_port_to_sock(p_ptr);
375-
list_del_init(&p_ptr->wait_list);
376-
spin_lock_bh(p_ptr->lock);
377-
tsk->link_cong = 0;
378-
tipc_sock_wakeup(tsk);
379-
win -= p_ptr->waiting_pkts;
380-
spin_unlock_bh(p_ptr->lock);
363+
pend_qsz += TIPC_SKB_CB(buf)->chain_sz;
364+
__skb_queue_tail(&link->owner->waiting_sks, __skb_dequeue(wq));
381365
}
382-
383-
exit:
384-
spin_unlock_bh(&tipc_port_list_lock);
385366
}
386367

387368
/**
@@ -423,6 +404,7 @@ void tipc_link_reset(struct tipc_link *l_ptr)
423404
u32 prev_state = l_ptr->state;
424405
u32 checkpoint = l_ptr->next_in_no;
425406
int was_active_link = tipc_link_is_active(l_ptr);
407+
struct tipc_node *owner = l_ptr->owner;
426408

427409
msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff));
428410

@@ -450,9 +432,10 @@ void tipc_link_reset(struct tipc_link *l_ptr)
450432
kfree_skb(l_ptr->proto_msg_queue);
451433
l_ptr->proto_msg_queue = NULL;
452434
kfree_skb_list(l_ptr->oldest_deferred_in);
453-
if (!list_empty(&l_ptr->waiting_ports))
454-
tipc_link_wakeup_ports(l_ptr, 1);
455-
435+
if (!skb_queue_empty(&l_ptr->waiting_sks)) {
436+
skb_queue_splice_init(&l_ptr->waiting_sks, &owner->waiting_sks);
437+
owner->action_flags |= TIPC_WAKEUP_USERS;
438+
}
456439
l_ptr->retransm_queue_head = 0;
457440
l_ptr->retransm_queue_size = 0;
458441
l_ptr->last_out = NULL;
@@ -688,19 +671,23 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
688671
static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf)
689672
{
690673
struct tipc_msg *msg = buf_msg(buf);
691-
uint psz = msg_size(msg);
692674
uint imp = tipc_msg_tot_importance(msg);
693675
u32 oport = msg_tot_origport(msg);
694676

695-
if (likely(imp <= TIPC_CRITICAL_IMPORTANCE)) {
696-
if (!msg_errcode(msg) && !msg_reroute_cnt(msg)) {
697-
link_schedule_port(link, oport, psz);
698-
return -ELINKCONG;
699-
}
700-
} else {
677+
if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
701678
pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
702679
tipc_link_reset(link);
680+
goto drop;
703681
}
682+
if (unlikely(msg_errcode(msg)))
683+
goto drop;
684+
if (unlikely(msg_reroute_cnt(msg)))
685+
goto drop;
686+
if (TIPC_SKB_CB(buf)->wakeup_pending)
687+
return -ELINKCONG;
688+
if (link_schedule_user(link, oport, TIPC_SKB_CB(buf)->chain_sz, imp))
689+
return -ELINKCONG;
690+
drop:
704691
kfree_skb_list(buf);
705692
return -EHOSTUNREACH;
706693
}
@@ -1202,8 +1189,10 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
12021189
if (unlikely(l_ptr->next_out))
12031190
tipc_link_push_queue(l_ptr);
12041191

1205-
if (unlikely(!list_empty(&l_ptr->waiting_ports)))
1206-
tipc_link_wakeup_ports(l_ptr, 0);
1192+
if (released && !skb_queue_empty(&l_ptr->waiting_sks)) {
1193+
link_prepare_wakeup(l_ptr);
1194+
l_ptr->owner->action_flags |= TIPC_WAKEUP_USERS;
1195+
}
12071196

12081197
/* Process the incoming packet */
12091198
if (unlikely(!link_working_working(l_ptr))) {

net/tipc/link.h

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/*
22
* net/tipc/link.h: Include file for TIPC link code
33
*
4-
* Copyright (c) 1995-2006, 2013, Ericsson AB
4+
* Copyright (c) 1995-2006, 2013-2014, Ericsson AB
55
* Copyright (c) 2004-2005, 2010-2011, Wind River Systems
66
* All rights reserved.
77
*
@@ -133,7 +133,7 @@ struct tipc_stats {
133133
* @retransm_queue_size: number of messages to retransmit
134134
* @retransm_queue_head: sequence number of first message to retransmit
135135
* @next_out: ptr to first unsent outbound message in queue
136-
* @waiting_ports: linked list of ports waiting for link congestion to abate
136+
* @waiting_sks: linked list of sockets waiting for link congestion to abate
137137
* @long_msg_seq_no: next identifier to use for outbound fragmented messages
138138
* @reasm_buf: head of partially reassembled inbound message fragments
139139
* @stats: collects statistics regarding link activity
@@ -194,7 +194,7 @@ struct tipc_link {
194194
u32 retransm_queue_size;
195195
u32 retransm_queue_head;
196196
struct sk_buff *next_out;
197-
struct list_head waiting_ports;
197+
struct sk_buff_head waiting_sks;
198198

199199
/* Fragmentation/reassembly */
200200
u32 long_msg_seq_no;
@@ -235,7 +235,6 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob,
235235
void tipc_link_push_queue(struct tipc_link *l_ptr);
236236
u32 tipc_link_defer_pkt(struct sk_buff **head, struct sk_buff **tail,
237237
struct sk_buff *buf);
238-
void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all);
239238
void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window);
240239
void tipc_link_retransmit(struct tipc_link *l_ptr,
241240
struct sk_buff *start, u32 retransmits);

net/tipc/msg.c

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov,
182182
struct sk_buff *buf, *prev;
183183
char *pktpos;
184184
int rc;
185-
185+
uint chain_sz = 0;
186186
msg_set_size(mhdr, msz);
187187

188188
/* No fragmentation needed? */
@@ -193,6 +193,7 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov,
193193
return -ENOMEM;
194194
skb_copy_to_linear_data(buf, mhdr, mhsz);
195195
pktpos = buf->data + mhsz;
196+
TIPC_SKB_CB(buf)->chain_sz = 1;
196197
if (!dsz || !memcpy_fromiovecend(pktpos, iov, offset, dsz))
197198
return dsz;
198199
rc = -EFAULT;
@@ -209,6 +210,7 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov,
209210
*chain = buf = tipc_buf_acquire(pktmax);
210211
if (!buf)
211212
return -ENOMEM;
213+
chain_sz = 1;
212214
pktpos = buf->data;
213215
skb_copy_to_linear_data(buf, &pkthdr, INT_H_SIZE);
214216
pktpos += INT_H_SIZE;
@@ -242,6 +244,7 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov,
242244
rc = -ENOMEM;
243245
goto error;
244246
}
247+
chain_sz++;
245248
prev->next = buf;
246249
msg_set_type(&pkthdr, FRAGMENT);
247250
msg_set_size(&pkthdr, pktsz);
@@ -251,7 +254,7 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov,
251254
pktrem = pktsz - INT_H_SIZE;
252255

253256
} while (1);
254-
257+
TIPC_SKB_CB(*chain)->chain_sz = chain_sz;
255258
msg_set_type(buf_msg(buf), LAST_FRAGMENT);
256259
return dsz;
257260
error:

net/tipc/msg.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,7 @@ static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m)
442442
#define NAME_DISTRIBUTOR 11
443443
#define MSG_FRAGMENTER 12
444444
#define LINK_CONFIG 13
445+
#define SOCK_WAKEUP 14 /* pseudo user */
445446

446447
/*
447448
* Connection management protocol message types

net/tipc/node.c

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
#include "config.h"
3939
#include "node.h"
4040
#include "name_distr.h"
41+
#include "socket.h"
4142

4243
#define NODE_HTABLE_SIZE 512
4344

@@ -100,6 +101,7 @@ struct tipc_node *tipc_node_create(u32 addr)
100101
INIT_HLIST_NODE(&n_ptr->hash);
101102
INIT_LIST_HEAD(&n_ptr->list);
102103
INIT_LIST_HEAD(&n_ptr->nsub);
104+
__skb_queue_head_init(&n_ptr->waiting_sks);
103105

104106
hlist_add_head_rcu(&n_ptr->hash, &node_htable[tipc_hashfn(addr)]);
105107

@@ -474,13 +476,19 @@ int tipc_node_get_linkname(u32 bearer_id, u32 addr, char *linkname, size_t len)
474476
void tipc_node_unlock(struct tipc_node *node)
475477
{
476478
LIST_HEAD(nsub_list);
479+
struct sk_buff_head waiting_sks;
477480
u32 addr = 0;
478481

479482
if (likely(!node->action_flags)) {
480483
spin_unlock_bh(&node->lock);
481484
return;
482485
}
483486

487+
__skb_queue_head_init(&waiting_sks);
488+
if (node->action_flags & TIPC_WAKEUP_USERS) {
489+
skb_queue_splice_init(&node->waiting_sks, &waiting_sks);
490+
node->action_flags &= ~TIPC_WAKEUP_USERS;
491+
}
484492
if (node->action_flags & TIPC_NOTIFY_NODE_DOWN) {
485493
list_replace_init(&node->nsub, &nsub_list);
486494
node->action_flags &= ~TIPC_NOTIFY_NODE_DOWN;
@@ -491,8 +499,12 @@ void tipc_node_unlock(struct tipc_node *node)
491499
}
492500
spin_unlock_bh(&node->lock);
493501

502+
while (!skb_queue_empty(&waiting_sks))
503+
tipc_sk_rcv(__skb_dequeue(&waiting_sks));
504+
494505
if (!list_empty(&nsub_list))
495506
tipc_nodesub_notify(&nsub_list);
507+
496508
if (addr)
497509
tipc_named_node_up(addr);
498510
}

net/tipc/node.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ enum {
5858
TIPC_WAIT_PEER_LINKS_DOWN = (1 << 1),
5959
TIPC_WAIT_OWN_LINKS_DOWN = (1 << 2),
6060
TIPC_NOTIFY_NODE_DOWN = (1 << 3),
61-
TIPC_NOTIFY_NODE_UP = (1 << 4)
61+
TIPC_NOTIFY_NODE_UP = (1 << 4),
62+
TIPC_WAKEUP_USERS = (1 << 5)
6263
};
6364

6465
/**
@@ -115,6 +116,7 @@ struct tipc_node {
115116
int working_links;
116117
u32 signature;
117118
struct list_head nsub;
119+
struct sk_buff_head waiting_sks;
118120
struct rcu_head rcu;
119121
};
120122

net/tipc/port.c

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,6 @@ u32 tipc_port_init(struct tipc_port *p_ptr,
9292

9393
p_ptr->max_pkt = MAX_PKT_DEFAULT;
9494
p_ptr->ref = ref;
95-
INIT_LIST_HEAD(&p_ptr->wait_list);
9695
INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list);
9796
k_init_timer(&p_ptr->timer, (Handler)port_timeout, ref);
9897
INIT_LIST_HEAD(&p_ptr->publications);
@@ -134,7 +133,6 @@ void tipc_port_destroy(struct tipc_port *p_ptr)
134133
}
135134
spin_lock_bh(&tipc_port_list_lock);
136135
list_del(&p_ptr->port_list);
137-
list_del(&p_ptr->wait_list);
138136
spin_unlock_bh(&tipc_port_list_lock);
139137
k_term_timer(&p_ptr->timer);
140138
}

net/tipc/port.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,6 @@
5858
* @ref: unique reference to port in TIPC object registry
5959
* @phdr: preformatted message header used when sending messages
6060
* @port_list: adjacent ports in TIPC's global list of ports
61-
* @wait_list: adjacent ports in list of ports waiting on link congestion
62-
* @waiting_pkts:
6361
* @publications: list of publications for port
6462
* @pub_count: total # of publications port has made during its lifetime
6563
* @probing_state:
@@ -77,8 +75,6 @@ struct tipc_port {
7775
u32 ref;
7876
struct tipc_msg phdr;
7977
struct list_head port_list;
80-
struct list_head wait_list;
81-
u32 waiting_pkts;
8278
struct list_head publications;
8379
u32 pub_count;
8480
u32 probing_state;

0 commit comments

Comments
 (0)