diff options
Diffstat (limited to 'net/tipc')
| -rw-r--r-- | net/tipc/bcast.c | 204 | ||||
| -rw-r--r-- | net/tipc/bcast.h | 33 | ||||
| -rw-r--r-- | net/tipc/bearer.c | 15 | ||||
| -rw-r--r-- | net/tipc/bearer.h | 8 | ||||
| -rw-r--r-- | net/tipc/link.c | 87 | ||||
| -rw-r--r-- | net/tipc/msg.c | 17 | ||||
| -rw-r--r-- | net/tipc/msg.h | 11 | ||||
| -rw-r--r-- | net/tipc/name_table.c | 128 | ||||
| -rw-r--r-- | net/tipc/name_table.h | 24 | ||||
| -rw-r--r-- | net/tipc/net.c | 4 | ||||
| -rw-r--r-- | net/tipc/node.c | 42 | ||||
| -rw-r--r-- | net/tipc/node.h | 4 | ||||
| -rw-r--r-- | net/tipc/socket.c | 525 | ||||
| -rw-r--r-- | net/tipc/udp_media.c | 8 | 
14 files changed, 695 insertions, 415 deletions
| diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index aa1babbea385..7d99029df342 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -1,7 +1,7 @@  /*   * net/tipc/bcast.c: TIPC broadcast code   * - * Copyright (c) 2004-2006, 2014-2015, Ericsson AB + * Copyright (c) 2004-2006, 2014-2016, Ericsson AB   * Copyright (c) 2004, Intel Corporation.   * Copyright (c) 2005, 2010-2011, Wind River Systems   * All rights reserved. @@ -39,9 +39,8 @@  #include "socket.h"  #include "msg.h"  #include "bcast.h" -#include "name_distr.h"  #include "link.h" -#include "node.h" +#include "name_table.h"  #define	BCLINK_WIN_DEFAULT	50	/* bcast link window size (default) */  #define	BCLINK_WIN_MIN	        32	/* bcast minimum link window size */ @@ -54,12 +53,20 @@ const char tipc_bclink_name[] = "broadcast-link";   * @inputq: data input queue; will only carry SOCK_WAKEUP messages   * @dest: array keeping number of reachable destinations per bearer   * @primary_bearer: a bearer having links to all broadcast destinations, if any + * @bcast_support: indicates if primary bearer, if any, supports broadcast + * @rcast_support: indicates if all peer nodes support replicast + * @rc_ratio: dest count as percentage of cluster size where send method changes + * @bc_threshold: calculated drom rc_ratio; if dests > threshold use broadcast   */  struct tipc_bc_base {  	struct tipc_link *link;  	struct sk_buff_head inputq;  	int dests[MAX_BEARERS];  	int primary_bearer; +	bool bcast_support; +	bool rcast_support; +	int rc_ratio; +	int bc_threshold;  };  static struct tipc_bc_base *tipc_bc_base(struct net *net) @@ -69,7 +76,20 @@ static struct tipc_bc_base *tipc_bc_base(struct net *net)  int tipc_bcast_get_mtu(struct net *net)  { -	return tipc_link_mtu(tipc_bc_sndlink(net)); +	return tipc_link_mtu(tipc_bc_sndlink(net)) - INT_H_SIZE; +} + +void tipc_bcast_disable_rcast(struct net *net) +{ +	tipc_bc_base(net)->rcast_support = false; +} + +static void tipc_bcbase_calc_bc_threshold(struct net *net) +{ +	struct tipc_bc_base *bb = tipc_bc_base(net); +	int cluster_size = tipc_link_bc_peers(tipc_bc_sndlink(net)); + +	bb->bc_threshold = 1 + (cluster_size * bb->rc_ratio / 100);  }  /* tipc_bcbase_select_primary(): find a bearer with links to all destinations, @@ -79,9 +99,10 @@ static void tipc_bcbase_select_primary(struct net *net)  {  	struct tipc_bc_base *bb = tipc_bc_base(net);  	int all_dests =  tipc_link_bc_peers(bb->link); -	int i, mtu; +	int i, mtu, prim;  	bb->primary_bearer = INVALID_BEARER_ID; +	bb->bcast_support = true;  	if (!all_dests)  		return; @@ -93,7 +114,7 @@ static void tipc_bcbase_select_primary(struct net *net)  		mtu = tipc_bearer_mtu(net, i);  		if (mtu < tipc_link_mtu(bb->link))  			tipc_link_set_mtu(bb->link, mtu); - +		bb->bcast_support &= tipc_bearer_bcast_support(net, i);  		if (bb->dests[i] < all_dests)  			continue; @@ -103,6 +124,9 @@ static void tipc_bcbase_select_primary(struct net *net)  		if ((i ^ tipc_own_addr(net)) & 1)  			break;  	} +	prim = bb->primary_bearer; +	if (prim != INVALID_BEARER_ID) +		bb->bcast_support = tipc_bearer_bcast_support(net, prim);  }  void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id) @@ -170,45 +194,131 @@ static void tipc_bcbase_xmit(struct net *net, struct sk_buff_head *xmitq)  	__skb_queue_purge(&_xmitq);  } -/* tipc_bcast_xmit - deliver buffer chain to all nodes in cluster - *                    and to identified node local sockets +static void tipc_bcast_select_xmit_method(struct net *net, int dests, +					  struct tipc_mc_method *method) +{ +	struct tipc_bc_base *bb = tipc_bc_base(net); +	unsigned long exp = method->expires; + +	/* Broadcast supported by used bearer/bearers? */ +	if (!bb->bcast_support) { +		method->rcast = true; +		return; +	} +	/* Any destinations which don't support replicast ? */ +	if (!bb->rcast_support) { +		method->rcast = false; +		return; +	} +	/* Can current method be changed ? */ +	method->expires = jiffies + TIPC_METHOD_EXPIRE; +	if (method->mandatory || time_before(jiffies, exp)) +		return; + +	/* Determine method to use now */ +	method->rcast = dests <= bb->bc_threshold; +} + +/* tipc_bcast_xmit - broadcast the buffer chain to all external nodes   * @net: the applicable net namespace - * @list: chain of buffers containing message - * Consumes the buffer chain, except when returning -ELINKCONG - * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE + * @pkts: chain of buffers containing message + * @cong_link_cnt: set to 1 if broadcast link is congested, otherwise 0 + * Consumes the buffer chain. + * Returns 0 if success, otherwise errno: -EHOSTUNREACH,-EMSGSIZE   */ -int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list) +static int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts, +			   u16 *cong_link_cnt)  {  	struct tipc_link *l = tipc_bc_sndlink(net); -	struct sk_buff_head xmitq, inputq, rcvq; +	struct sk_buff_head xmitq;  	int rc = 0; -	__skb_queue_head_init(&rcvq);  	__skb_queue_head_init(&xmitq); -	skb_queue_head_init(&inputq); - -	/* Prepare message clone for local node */ -	if (unlikely(!tipc_msg_reassemble(list, &rcvq))) -		return -EHOSTUNREACH; -  	tipc_bcast_lock(net);  	if (tipc_link_bc_peers(l)) -		rc = tipc_link_xmit(l, list, &xmitq); +		rc = tipc_link_xmit(l, pkts, &xmitq);  	tipc_bcast_unlock(net); - -	/* Don't send to local node if adding to link failed */ -	if (unlikely(rc)) { -		__skb_queue_purge(&rcvq); -		return rc; +	tipc_bcbase_xmit(net, &xmitq); +	__skb_queue_purge(pkts); +	if (rc == -ELINKCONG) { +		*cong_link_cnt = 1; +		rc = 0;  	} +	return rc; +} -	/* Broadcast to all nodes, inluding local node */ -	tipc_bcbase_xmit(net, &xmitq); -	tipc_sk_mcast_rcv(net, &rcvq, &inputq); -	__skb_queue_purge(list); +/* tipc_rcast_xmit - replicate and send a message to given destination nodes + * @net: the applicable net namespace + * @pkts: chain of buffers containing message + * @dests: list of destination nodes + * @cong_link_cnt: returns number of congested links + * @cong_links: returns identities of congested links + * Returns 0 if success, otherwise errno + */ +static int tipc_rcast_xmit(struct net *net, struct sk_buff_head *pkts, +			   struct tipc_nlist *dests, u16 *cong_link_cnt) +{ +	struct sk_buff_head _pkts; +	struct u32_item *n, *tmp; +	u32 dst, selector; + +	selector = msg_link_selector(buf_msg(skb_peek(pkts))); +	__skb_queue_head_init(&_pkts); + +	list_for_each_entry_safe(n, tmp, &dests->list, list) { +		dst = n->value; +		if (!tipc_msg_pskb_copy(dst, pkts, &_pkts)) +			return -ENOMEM; + +		/* Any other return value than -ELINKCONG is ignored */ +		if (tipc_node_xmit(net, &_pkts, dst, selector) == -ELINKCONG) +			(*cong_link_cnt)++; +	}  	return 0;  } +/* tipc_mcast_xmit - deliver message to indicated destination nodes + *                   and to identified node local sockets + * @net: the applicable net namespace + * @pkts: chain of buffers containing message + * @method: send method to be used + * @dests: destination nodes for message. + * @cong_link_cnt: returns number of encountered congested destination links + * Consumes buffer chain. + * Returns 0 if success, otherwise errno + */ +int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts, +		    struct tipc_mc_method *method, struct tipc_nlist *dests, +		    u16 *cong_link_cnt) +{ +	struct sk_buff_head inputq, localq; +	int rc = 0; + +	skb_queue_head_init(&inputq); +	skb_queue_head_init(&localq); + +	/* Clone packets before they are consumed by next call */ +	if (dests->local && !tipc_msg_reassemble(pkts, &localq)) { +		rc = -ENOMEM; +		goto exit; +	} +	/* Send according to determined transmit method */ +	if (dests->remote) { +		tipc_bcast_select_xmit_method(net, dests->remote, method); +		if (method->rcast) +			rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt); +		else +			rc = tipc_bcast_xmit(net, pkts, cong_link_cnt); +	} + +	if (dests->local) +		tipc_sk_mcast_rcv(net, &localq, &inputq); +exit: +	/* This queue should normally be empty by now */ +	__skb_queue_purge(pkts); +	return rc; +} +  /* tipc_bcast_rcv - receive a broadcast packet, and deliver to rcv link   *   * RCU is locked, no other locks set @@ -313,6 +423,7 @@ void tipc_bcast_add_peer(struct net *net, struct tipc_link *uc_l,  	tipc_bcast_lock(net);  	tipc_link_add_bc_peer(snd_l, uc_l, xmitq);  	tipc_bcbase_select_primary(net); +	tipc_bcbase_calc_bc_threshold(net);  	tipc_bcast_unlock(net);  } @@ -331,6 +442,7 @@ void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_l)  	tipc_bcast_lock(net);  	tipc_link_remove_bc_peer(snd_l, rcv_l, &xmitq);  	tipc_bcbase_select_primary(net); +	tipc_bcbase_calc_bc_threshold(net);  	tipc_bcast_unlock(net);  	tipc_bcbase_xmit(net, &xmitq); @@ -413,6 +525,8 @@ int tipc_bcast_init(struct net *net)  		goto enomem;  	bb->link = l;  	tn->bcl = l; +	bb->rc_ratio = 25; +	bb->rcast_support = true;  	return 0;  enomem:  	kfree(bb); @@ -428,3 +542,33 @@ void tipc_bcast_stop(struct net *net)  	kfree(tn->bcbase);  	kfree(tn->bcl);  } + +void tipc_nlist_init(struct tipc_nlist *nl, u32 self) +{ +	memset(nl, 0, sizeof(*nl)); +	INIT_LIST_HEAD(&nl->list); +	nl->self = self; +} + +void tipc_nlist_add(struct tipc_nlist *nl, u32 node) +{ +	if (node == nl->self) +		nl->local = true; +	else if (u32_push(&nl->list, node)) +		nl->remote++; +} + +void tipc_nlist_del(struct tipc_nlist *nl, u32 node) +{ +	if (node == nl->self) +		nl->local = false; +	else if (u32_del(&nl->list, node)) +		nl->remote--; +} + +void tipc_nlist_purge(struct tipc_nlist *nl) +{ +	u32_list_purge(&nl->list); +	nl->remote = 0; +	nl->local = 0; +} diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h index 855d53c64ab3..751530ab0c49 100644 --- a/net/tipc/bcast.h +++ b/net/tipc/bcast.h @@ -42,9 +42,35 @@  struct tipc_node;  struct tipc_msg;  struct tipc_nl_msg; -struct tipc_node_map; +struct tipc_nlist; +struct tipc_nitem;  extern const char tipc_bclink_name[]; +#define TIPC_METHOD_EXPIRE msecs_to_jiffies(5000) + +struct tipc_nlist { +	struct list_head list; +	u32 self; +	u16 remote; +	bool local; +}; + +void tipc_nlist_init(struct tipc_nlist *nl, u32 self); +void tipc_nlist_purge(struct tipc_nlist *nl); +void tipc_nlist_add(struct tipc_nlist *nl, u32 node); +void tipc_nlist_del(struct tipc_nlist *nl, u32 node); + +/* Cookie to be used between socket and broadcast layer + * @rcast: replicast (instead of broadcast) was used at previous xmit + * @mandatory: broadcast/replicast indication was set by user + * @expires: re-evaluate non-mandatory transmit method if we are past this + */ +struct tipc_mc_method { +	bool rcast; +	bool mandatory; +	unsigned long expires; +}; +  int tipc_bcast_init(struct net *net);  void tipc_bcast_stop(struct net *net);  void tipc_bcast_add_peer(struct net *net, struct tipc_link *l, @@ -53,7 +79,10 @@ void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_bcl);  void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id);  void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id);  int  tipc_bcast_get_mtu(struct net *net); -int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list); +void tipc_bcast_disable_rcast(struct net *net); +int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts, +		    struct tipc_mc_method *method, struct tipc_nlist *dests, +		    u16 *cong_link_cnt);  int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb);  void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l,  			struct tipc_msg *hdr); diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c index 52d74760fb68..33a5bdfbef76 100644 --- a/net/tipc/bearer.c +++ b/net/tipc/bearer.c @@ -431,7 +431,7 @@ int tipc_enable_l2_media(struct net *net, struct tipc_bearer *b,  	memset(&b->bcast_addr, 0, sizeof(b->bcast_addr));  	memcpy(b->bcast_addr.value, dev->broadcast, b->media->hwaddr_len);  	b->bcast_addr.media_id = b->media->type_id; -	b->bcast_addr.broadcast = 1; +	b->bcast_addr.broadcast = TIPC_BROADCAST_SUPPORT;  	b->mtu = dev->mtu;  	b->media->raw2addr(b, &b->addr, (char *)dev->dev_addr);  	rcu_assign_pointer(dev->tipc_ptr, b); @@ -482,6 +482,19 @@ int tipc_l2_send_msg(struct net *net, struct sk_buff *skb,  	return 0;  } +bool tipc_bearer_bcast_support(struct net *net, u32 bearer_id) +{ +	bool supp = false; +	struct tipc_bearer *b; + +	rcu_read_lock(); +	b = bearer_get(net, bearer_id); +	if (b) +		supp = (b->bcast_addr.broadcast == TIPC_BROADCAST_SUPPORT); +	rcu_read_unlock(); +	return supp; +} +  int tipc_bearer_mtu(struct net *net, u32 bearer_id)  {  	int mtu = 0; diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h index 278ff7f616f9..635c9086e19a 100644 --- a/net/tipc/bearer.h +++ b/net/tipc/bearer.h @@ -60,9 +60,14 @@  #define TIPC_MEDIA_TYPE_IB	2  #define TIPC_MEDIA_TYPE_UDP	3 -/* minimum bearer MTU */ +/* Minimum bearer MTU */  #define TIPC_MIN_BEARER_MTU	(MAX_H_SIZE + INT_H_SIZE) +/* Identifiers for distinguishing between broadcast/multicast and replicast + */ +#define TIPC_BROADCAST_SUPPORT  1 +#define TIPC_REPLICAST_SUPPORT  2 +  /**   * struct tipc_media_addr - destination address used by TIPC bearers   * @value: address info (format defined by media) @@ -210,6 +215,7 @@ int tipc_bearer_setup(void);  void tipc_bearer_cleanup(void);  void tipc_bearer_stop(struct net *net);  int tipc_bearer_mtu(struct net *net, u32 bearer_id); +bool tipc_bearer_bcast_support(struct net *net, u32 bearer_id);  void tipc_bearer_xmit_skb(struct net *net, u32 bearer_id,  			  struct sk_buff *skb,  			  struct tipc_media_addr *dest); diff --git a/net/tipc/link.c b/net/tipc/link.c index 4e8647aef01c..ddd2dd6f77aa 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -515,6 +515,10 @@ bool tipc_link_bc_create(struct net *net, u32 ownnode, u32 peer,  	if (link_is_bc_sndlink(l))  		l->state = LINK_ESTABLISHED; +	/* Disable replicast if even a single peer doesn't support it */ +	if (link_is_bc_rcvlink(l) && !(peer_caps & TIPC_BCAST_RCAST)) +		tipc_bcast_disable_rcast(net); +  	return true;  } @@ -776,60 +780,47 @@ int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq)  /**   * link_schedule_user - schedule a message sender for wakeup after congestion - * @link: congested link - * @list: message that was attempted sent + * @l: congested link + * @hdr: header of message that is being sent   * Create pseudo msg to send back to user when congestion abates - * Does not consume buffer list   */ -static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list) +static int link_schedule_user(struct tipc_link *l, struct tipc_msg *hdr)  { -	struct tipc_msg *msg = buf_msg(skb_peek(list)); -	int imp = msg_importance(msg); -	u32 oport = msg_origport(msg); -	u32 addr = tipc_own_addr(link->net); +	u32 dnode = tipc_own_addr(l->net); +	u32 dport = msg_origport(hdr);  	struct sk_buff *skb; -	/* This really cannot happen...  */ -	if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) { -		pr_warn("%s<%s>, send queue full", link_rst_msg, link->name); -		return -ENOBUFS; -	} -	/* Non-blocking sender: */ -	if (TIPC_SKB_CB(skb_peek(list))->wakeup_pending) -		return -ELINKCONG; -  	/* Create and schedule wakeup pseudo message */  	skb = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0, -			      addr, addr, oport, 0, 0); +			      dnode, l->addr, dport, 0, 0);  	if (!skb)  		return -ENOBUFS; -	TIPC_SKB_CB(skb)->chain_sz = skb_queue_len(list); -	TIPC_SKB_CB(skb)->chain_imp = imp; -	skb_queue_tail(&link->wakeupq, skb); -	link->stats.link_congs++; +	msg_set_dest_droppable(buf_msg(skb), true); +	TIPC_SKB_CB(skb)->chain_imp = msg_importance(hdr); +	skb_queue_tail(&l->wakeupq, skb); +	l->stats.link_congs++;  	return -ELINKCONG;  }  /**   * link_prepare_wakeup - prepare users for wakeup after congestion - * @link: congested link - * Move a number of waiting users, as permitted by available space in - * the send queue, from link wait queue to node wait queue for wakeup + * @l: congested link + * Wake up a number of waiting users, as permitted by available space + * in the send queue   */  void link_prepare_wakeup(struct tipc_link *l)  { -	int pnd[TIPC_SYSTEM_IMPORTANCE + 1] = {0,}; -	int imp, lim;  	struct sk_buff *skb, *tmp; +	int imp, i = 0;  	skb_queue_walk_safe(&l->wakeupq, skb, tmp) {  		imp = TIPC_SKB_CB(skb)->chain_imp; -		lim = l->backlog[imp].limit; -		pnd[imp] += TIPC_SKB_CB(skb)->chain_sz; -		if ((pnd[imp] + l->backlog[imp].len) >= lim) +		if (l->backlog[imp].len < l->backlog[imp].limit) { +			skb_unlink(skb, &l->wakeupq); +			skb_queue_tail(l->inputq, skb); +		} else if (i++ > 10) {  			break; -		skb_unlink(skb, &l->wakeupq); -		skb_queue_tail(l->inputq, skb); +		}  	}  } @@ -869,8 +860,7 @@ void tipc_link_reset(struct tipc_link *l)   * @list: chain of buffers containing message   * @xmitq: returned list of packets to be sent by caller   * - * Consumes the buffer chain, except when returning -ELINKCONG, - * since the caller then may want to make more send attempts. + * Consumes the buffer chain.   * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS   * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted   */ @@ -879,7 +869,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,  {  	struct tipc_msg *hdr = buf_msg(skb_peek(list));  	unsigned int maxwin = l->window; -	unsigned int i, imp = msg_importance(hdr); +	int imp = msg_importance(hdr);  	unsigned int mtu = l->mtu;  	u16 ack = l->rcv_nxt - 1;  	u16 seqno = l->snd_nxt; @@ -888,19 +878,22 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,  	struct sk_buff_head *backlogq = &l->backlogq;  	struct sk_buff *skb, *_skb, *bskb;  	int pkt_cnt = skb_queue_len(list); +	int rc = 0; -	/* Match msg importance against this and all higher backlog limits: */ -	if (!skb_queue_empty(backlogq)) { -		for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) { -			if (unlikely(l->backlog[i].len >= l->backlog[i].limit)) -				return link_schedule_user(l, list); -		} -	}  	if (unlikely(msg_size(hdr) > mtu)) {  		skb_queue_purge(list);  		return -EMSGSIZE;  	} +	/* Allow oversubscription of one data msg per source at congestion */ +	if (unlikely(l->backlog[imp].len >= l->backlog[imp].limit)) { +		if (imp == TIPC_SYSTEM_IMPORTANCE) { +			pr_warn("%s<%s>, link overflow", link_rst_msg, l->name); +			return -ENOBUFS; +		} +		rc = link_schedule_user(l, hdr); +	} +  	if (pkt_cnt > 1) {  		l->stats.sent_fragmented++;  		l->stats.sent_fragments += pkt_cnt; @@ -946,7 +939,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,  		skb_queue_splice_tail_init(list, backlogq);  	}  	l->snd_nxt = seqno; -	return 0; +	return rc;  }  void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq) @@ -1043,11 +1036,17 @@ int tipc_link_retrans(struct tipc_link *l, u16 from, u16 to,  static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb,  			    struct sk_buff_head *inputq)  { -	switch (msg_user(buf_msg(skb))) { +	struct tipc_msg *hdr = buf_msg(skb); + +	switch (msg_user(hdr)) {  	case TIPC_LOW_IMPORTANCE:  	case TIPC_MEDIUM_IMPORTANCE:  	case TIPC_HIGH_IMPORTANCE:  	case TIPC_CRITICAL_IMPORTANCE: +		if (unlikely(msg_type(hdr) == TIPC_MCAST_MSG)) { +			skb_queue_tail(l->bc_rcvlink->inputq, skb); +			return true; +		}  	case CONN_MANAGER:  		skb_queue_tail(inputq, skb);  		return true; diff --git a/net/tipc/msg.c b/net/tipc/msg.c index ab02d0742476..312ef7de57d7 100644 --- a/net/tipc/msg.c +++ b/net/tipc/msg.c @@ -607,6 +607,23 @@ error:  	return false;  } +bool tipc_msg_pskb_copy(u32 dst, struct sk_buff_head *msg, +			struct sk_buff_head *cpy) +{ +	struct sk_buff *skb, *_skb; + +	skb_queue_walk(msg, skb) { +		_skb = pskb_copy(skb, GFP_ATOMIC); +		if (!_skb) { +			__skb_queue_purge(cpy); +			return false; +		} +		msg_set_destnode(buf_msg(_skb), dst); +		__skb_queue_tail(cpy, _skb); +	} +	return true; +} +  /* tipc_skb_queue_sorted(); sort pkt into list according to sequence number   * @list: list to be appended to   * @seqno: sequence number of buffer to add diff --git a/net/tipc/msg.h b/net/tipc/msg.h index 2c3dc38abf9c..c843fd2bc48d 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -98,8 +98,6 @@ struct tipc_skb_cb {  	u32 bytes_read;  	struct sk_buff *tail;  	bool validated; -	bool wakeup_pending; -	u16 chain_sz;  	u16 chain_imp;  	u16 ackers;  }; @@ -633,14 +631,11 @@ static inline void msg_set_bc_netid(struct tipc_msg *m, u32 id)  static inline u32 msg_link_selector(struct tipc_msg *m)  { +	if (msg_user(m) == MSG_FRAGMENTER) +		m = (void *)msg_data(m);  	return msg_bits(m, 4, 0, 1);  } -static inline void msg_set_link_selector(struct tipc_msg *m, u32 n) -{ -	msg_set_bits(m, 4, 0, 1, n); -} -  /*   * Word 5   */ @@ -837,6 +832,8 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,  		   int offset, int dsz, int mtu, struct sk_buff_head *list);  bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err);  bool tipc_msg_reassemble(struct sk_buff_head *list, struct sk_buff_head *rcvq); +bool tipc_msg_pskb_copy(u32 dst, struct sk_buff_head *msg, +			struct sk_buff_head *cpy);  void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno,  			     struct sk_buff *skb); diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c index e190460fe0d3..9be6592e4a6f 100644 --- a/net/tipc/name_table.c +++ b/net/tipc/name_table.c @@ -608,7 +608,7 @@ not_found:   * Returns non-zero if any off-node ports overlap   */  int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper, -			      u32 limit, struct tipc_plist *dports) +			      u32 limit, struct list_head *dports)  {  	struct name_seq *seq;  	struct sub_seq *sseq; @@ -633,7 +633,7 @@ int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper,  		info = sseq->info;  		list_for_each_entry(publ, &info->node_list, node_list) {  			if (publ->scope <= limit) -				tipc_plist_push(dports, publ->ref); +				u32_push(dports, publ->ref);  		}  		if (info->cluster_list_size != info->node_list_size) @@ -645,6 +645,39 @@ exit:  	return res;  } +/* tipc_nametbl_lookup_dst_nodes - find broadcast destination nodes + * - Creates list of nodes that overlap the given multicast address + * - Determines if any node local ports overlap + */ +void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower, +				   u32 upper, u32 domain, +				   struct tipc_nlist *nodes) +{ +	struct sub_seq *sseq, *stop; +	struct publication *publ; +	struct name_info *info; +	struct name_seq *seq; + +	rcu_read_lock(); +	seq = nametbl_find_seq(net, type); +	if (!seq) +		goto exit; + +	spin_lock_bh(&seq->lock); +	sseq = seq->sseqs + nameseq_locate_subseq(seq, lower); +	stop = seq->sseqs + seq->first_free; +	for (; sseq->lower <= upper && sseq != stop; sseq++) { +		info = sseq->info; +		list_for_each_entry(publ, &info->zone_list, zone_list) { +			if (tipc_in_scope(domain, publ->node)) +				tipc_nlist_add(nodes, publ->node); +		} +	} +	spin_unlock_bh(&seq->lock); +exit: +	rcu_read_unlock(); +} +  /*   * tipc_nametbl_publish - add name publication to network name tables   */ @@ -1022,40 +1055,79 @@ int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb)  	return skb->len;  } -void tipc_plist_push(struct tipc_plist *pl, u32 port) +bool u32_find(struct list_head *l, u32 value)  { -	struct tipc_plist *nl; +	struct u32_item *item; -	if (likely(!pl->port)) { -		pl->port = port; -		return; +	list_for_each_entry(item, l, list) { +		if (item->value == value) +			return true;  	} -	if (pl->port == port) -		return; -	list_for_each_entry(nl, &pl->list, list) { -		if (nl->port == port) -			return; +	return false; +} + +bool u32_push(struct list_head *l, u32 value) +{ +	struct u32_item *item; + +	list_for_each_entry(item, l, list) { +		if (item->value == value) +			return false; +	} +	item = kmalloc(sizeof(*item), GFP_ATOMIC); +	if (unlikely(!item)) +		return false; + +	item->value = value; +	list_add(&item->list, l); +	return true; +} + +u32 u32_pop(struct list_head *l) +{ +	struct u32_item *item; +	u32 value = 0; + +	if (list_empty(l)) +		return 0; +	item = list_first_entry(l, typeof(*item), list); +	value = item->value; +	list_del(&item->list); +	kfree(item); +	return value; +} + +bool u32_del(struct list_head *l, u32 value) +{ +	struct u32_item *item, *tmp; + +	list_for_each_entry_safe(item, tmp, l, list) { +		if (item->value != value) +			continue; +		list_del(&item->list); +		kfree(item); +		return true;  	} -	nl = kmalloc(sizeof(*nl), GFP_ATOMIC); -	if (nl) { -		nl->port = port; -		list_add(&nl->list, &pl->list); +	return false; +} + +void u32_list_purge(struct list_head *l) +{ +	struct u32_item *item, *tmp; + +	list_for_each_entry_safe(item, tmp, l, list) { +		list_del(&item->list); +		kfree(item);  	}  } -u32 tipc_plist_pop(struct tipc_plist *pl) +int u32_list_len(struct list_head *l)  { -	struct tipc_plist *nl; -	u32 port = 0; +	struct u32_item *item; +	int i = 0; -	if (likely(list_empty(&pl->list))) { -		port = pl->port; -		pl->port = 0; -		return port; +	list_for_each_entry(item, l, list) { +		i++;  	} -	nl = list_first_entry(&pl->list, typeof(*nl), list); -	port = nl->port; -	list_del(&nl->list); -	kfree(nl); -	return port; +	return i;  } diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h index 1524a73830f7..6ebdeb1d84a5 100644 --- a/net/tipc/name_table.h +++ b/net/tipc/name_table.h @@ -39,6 +39,7 @@  struct tipc_subscription;  struct tipc_plist; +struct tipc_nlist;  /*   * TIPC name types reserved for internal TIPC use (both current and planned) @@ -99,7 +100,10 @@ int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb);  u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance, u32 *node);  int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper, -			      u32 limit, struct tipc_plist *dports); +			      u32 limit, struct list_head *dports); +void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower, +				   u32 upper, u32 domain, +				   struct tipc_nlist *nodes);  struct publication *tipc_nametbl_publish(struct net *net, u32 type, u32 lower,  					 u32 upper, u32 scope, u32 port_ref,  					 u32 key); @@ -116,18 +120,16 @@ void tipc_nametbl_unsubscribe(struct tipc_subscription *s);  int tipc_nametbl_init(struct net *net);  void tipc_nametbl_stop(struct net *net); -struct tipc_plist { +struct u32_item {  	struct list_head list; -	u32 port; +	u32 value;  }; -static inline void tipc_plist_init(struct tipc_plist *pl) -{ -	INIT_LIST_HEAD(&pl->list); -	pl->port = 0; -} - -void tipc_plist_push(struct tipc_plist *pl, u32 port); -u32 tipc_plist_pop(struct tipc_plist *pl); +bool u32_push(struct list_head *l, u32 value); +u32 u32_pop(struct list_head *l); +bool u32_find(struct list_head *l, u32 value); +bool u32_del(struct list_head *l, u32 value); +void u32_list_purge(struct list_head *l); +int u32_list_len(struct list_head *l);  #endif diff --git a/net/tipc/net.c b/net/tipc/net.c index 28bf4feeb81c..ab8a2d5d1e32 100644 --- a/net/tipc/net.c +++ b/net/tipc/net.c @@ -110,6 +110,10 @@ int tipc_net_start(struct net *net, u32 addr)  	char addr_string[16];  	tn->own_addr = addr; + +	/* Ensure that the new address is visible before we reinit. */ +	smp_mb(); +  	tipc_named_reinit(net);  	tipc_sk_reinit(net); diff --git a/net/tipc/node.c b/net/tipc/node.c index 27753325e06e..e9295fa3a554 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -1172,7 +1172,7 @@ msg_full:   * @list: chain of buffers containing message   * @dnode: address of destination node   * @selector: a number used for deterministic link selection - * Consumes the buffer chain, except when returning -ELINKCONG + * Consumes the buffer chain.   * Returns 0 if success, otherwise: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE,-ENOBUF   */  int tipc_node_xmit(struct net *net, struct sk_buff_head *list, @@ -1211,10 +1211,10 @@ int tipc_node_xmit(struct net *net, struct sk_buff_head *list,  	spin_unlock_bh(&le->lock);  	tipc_node_read_unlock(n); -	if (likely(rc == 0)) -		tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr); -	else if (rc == -ENOBUFS) +	if (unlikely(rc == -ENOBUFS))  		tipc_node_link_down(n, bearer_id, false); +	else +		tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);  	tipc_node_put(n); @@ -1226,20 +1226,15 @@ int tipc_node_xmit(struct net *net, struct sk_buff_head *list,   * messages, which will not be rejected   * The only exception is datagram messages rerouted after secondary   * lookup, which are rare and safe to dispose of anyway. - * TODO: Return real return value, and let callers use - * tipc_wait_for_sendpkt() where applicable   */  int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,  		       u32 selector)  {  	struct sk_buff_head head; -	int rc;  	skb_queue_head_init(&head);  	__skb_queue_tail(&head, skb); -	rc = tipc_node_xmit(net, &head, dnode, selector); -	if (rc == -ELINKCONG) -		kfree_skb(skb); +	tipc_node_xmit(net, &head, dnode, selector);  	return 0;  } @@ -1267,6 +1262,19 @@ void tipc_node_broadcast(struct net *net, struct sk_buff *skb)  	kfree_skb(skb);  } +static void tipc_node_mcast_rcv(struct tipc_node *n) +{ +	struct tipc_bclink_entry *be = &n->bc_entry; + +	/* 'arrvq' is under inputq2's lock protection */ +	spin_lock_bh(&be->inputq2.lock); +	spin_lock_bh(&be->inputq1.lock); +	skb_queue_splice_tail_init(&be->inputq1, &be->arrvq); +	spin_unlock_bh(&be->inputq1.lock); +	spin_unlock_bh(&be->inputq2.lock); +	tipc_sk_mcast_rcv(n->net, &be->arrvq, &be->inputq2); +} +  static void tipc_node_bc_sync_rcv(struct tipc_node *n, struct tipc_msg *hdr,  				  int bearer_id, struct sk_buff_head *xmitq)  { @@ -1340,15 +1348,8 @@ static void tipc_node_bc_rcv(struct net *net, struct sk_buff *skb, int bearer_id  	if (!skb_queue_empty(&xmitq))  		tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr); -	/* Deliver. 'arrvq' is under inputq2's lock protection */ -	if (!skb_queue_empty(&be->inputq1)) { -		spin_lock_bh(&be->inputq2.lock); -		spin_lock_bh(&be->inputq1.lock); -		skb_queue_splice_tail_init(&be->inputq1, &be->arrvq); -		spin_unlock_bh(&be->inputq1.lock); -		spin_unlock_bh(&be->inputq2.lock); -		tipc_sk_mcast_rcv(net, &be->arrvq, &be->inputq2); -	} +	if (!skb_queue_empty(&be->inputq1)) +		tipc_node_mcast_rcv(n);  	if (rc & TIPC_LINK_DOWN_EVT) {  		/* Reception reassembly failure => reset all links to peer */ @@ -1575,6 +1576,9 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)  	if (unlikely(!skb_queue_empty(&n->bc_entry.namedq)))  		tipc_named_rcv(net, &n->bc_entry.namedq); +	if (unlikely(!skb_queue_empty(&n->bc_entry.inputq1))) +		tipc_node_mcast_rcv(n); +  	if (!skb_queue_empty(&le->inputq))  		tipc_sk_rcv(net, &le->inputq); diff --git a/net/tipc/node.h b/net/tipc/node.h index 39ef54c1f2ad..898c22916984 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -47,11 +47,13 @@  enum {  	TIPC_BCAST_SYNCH      = (1 << 1),  	TIPC_BCAST_STATE_NACK = (1 << 2), -	TIPC_BLOCK_FLOWCTL    = (1 << 3) +	TIPC_BLOCK_FLOWCTL    = (1 << 3), +	TIPC_BCAST_RCAST      = (1 << 4)  };  #define TIPC_NODE_CAPABILITIES (TIPC_BCAST_SYNCH | \  				TIPC_BCAST_STATE_NACK | \ +				TIPC_BCAST_RCAST | \  				TIPC_BLOCK_FLOWCTL)  #define INVALID_BEARER_ID -1 diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 800caaa699a1..6b09a778cc71 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -67,16 +67,19 @@ enum {   * @max_pkt: maximum packet size "hint" used when building messages sent by port   * @portid: unique port identity in TIPC socket hash table   * @phdr: preformatted message header used when sending messages + * #cong_links: list of congested links   * @publications: list of publications for port + * @blocking_link: address of the congested link we are currently sleeping on   * @pub_count: total # of publications port has made during its lifetime   * @probing_state:   * @conn_timeout: the time we can wait for an unresponded setup request   * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue - * @link_cong: non-zero if owner must sleep because of link congestion + * @cong_link_cnt: number of congested links   * @sent_unacked: # messages sent by socket, and not yet acked by peer   * @rcv_unacked: # messages read by user, but not yet acked back to peer   * @peer: 'connected' peer for dgram/rdm   * @node: hash table node + * @mc_method: cookie for use between socket and broadcast layer   * @rcu: rcu struct for tipc_sock   */  struct tipc_sock { @@ -87,13 +90,13 @@ struct tipc_sock {  	u32 max_pkt;  	u32 portid;  	struct tipc_msg phdr; -	struct list_head sock_list; +	struct list_head cong_links;  	struct list_head publications;  	u32 pub_count;  	uint conn_timeout;  	atomic_t dupl_rcvcnt;  	bool probe_unacked; -	bool link_cong; +	u16 cong_link_cnt;  	u16 snt_unacked;  	u16 snd_win;  	u16 peer_caps; @@ -101,6 +104,7 @@ struct tipc_sock {  	u16 rcv_win;  	struct sockaddr_tipc peer;  	struct rhash_head node; +	struct tipc_mc_method mc_method;  	struct rcu_head rcu;  }; @@ -110,7 +114,6 @@ static void tipc_write_space(struct sock *sk);  static void tipc_sock_destruct(struct sock *sk);  static int tipc_release(struct socket *sock);  static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags); -static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p);  static void tipc_sk_timeout(unsigned long data);  static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,  			   struct tipc_name_seq const *seq); @@ -119,8 +122,7 @@ static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,  static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid);  static int tipc_sk_insert(struct tipc_sock *tsk);  static void tipc_sk_remove(struct tipc_sock *tsk); -static int __tipc_send_stream(struct socket *sock, struct msghdr *m, -			      size_t dsz); +static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz);  static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz);  static const struct proto_ops packet_ops; @@ -334,6 +336,49 @@ static int tipc_set_sk_state(struct sock *sk, int state)  	return res;  } +static int tipc_sk_sock_err(struct socket *sock, long *timeout) +{ +	struct sock *sk = sock->sk; +	int err = sock_error(sk); +	int typ = sock->type; + +	if (err) +		return err; +	if (typ == SOCK_STREAM || typ == SOCK_SEQPACKET) { +		if (sk->sk_state == TIPC_DISCONNECTING) +			return -EPIPE; +		else if (!tipc_sk_connected(sk)) +			return -ENOTCONN; +	} +	if (!*timeout) +		return -EAGAIN; +	if (signal_pending(current)) +		return sock_intr_errno(*timeout); + +	return 0; +} + +#define tipc_wait_for_cond(sock_, timeout_, condition_)			\ +({								        \ +	int rc_ = 0;							\ +	int done_ = 0;							\ +									\ +	while (!(condition_) && !done_) {				\ +		struct sock *sk_ = sock->sk;				\ +		DEFINE_WAIT_FUNC(wait_, woken_wake_function);		\ +									\ +		rc_ = tipc_sk_sock_err(sock_, timeout_);		\ +		if (rc_)						\ +			break;						\ +		prepare_to_wait(sk_sleep(sk_), &wait_,			\ +				TASK_INTERRUPTIBLE);			\ +		done_ = sk_wait_event(sk_, timeout_,			\ +				      (condition_), &wait_);		\ +		remove_wait_queue(sk_sleep(sk_), &wait_);		\ +	}								\ +	rc_;								\ +}) +  /**   * tipc_sk_create - create a TIPC socket   * @net: network namespace (must be default network) @@ -382,10 +427,9 @@ static int tipc_sk_create(struct net *net, struct socket *sock,  	tsk = tipc_sk(sk);  	tsk->max_pkt = MAX_PKT_DEFAULT;  	INIT_LIST_HEAD(&tsk->publications); +	INIT_LIST_HEAD(&tsk->cong_links);  	msg = &tsk->phdr;  	tn = net_generic(sock_net(sk), tipc_net_id); -	tipc_msg_init(tn->own_addr, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG, -		      NAMED_H_SIZE, 0);  	/* Finish initializing socket data structures */  	sock->ops = ops; @@ -395,6 +439,13 @@ static int tipc_sk_create(struct net *net, struct socket *sock,  		pr_warn("Socket create failed; port number exhausted\n");  		return -EINVAL;  	} + +	/* Ensure tsk is visible before we read own_addr. */ +	smp_mb(); + +	tipc_msg_init(tn->own_addr, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG, +		      NAMED_H_SIZE, 0); +  	msg_set_origport(msg, tsk->portid);  	setup_timer(&sk->sk_timer, tipc_sk_timeout, (unsigned long)tsk);  	sk->sk_shutdown = 0; @@ -432,9 +483,14 @@ static void __tipc_shutdown(struct socket *sock, int error)  	struct sock *sk = sock->sk;  	struct tipc_sock *tsk = tipc_sk(sk);  	struct net *net = sock_net(sk); +	long timeout = CONN_TIMEOUT_DEFAULT;  	u32 dnode = tsk_peer_node(tsk);  	struct sk_buff *skb; +	/* Avoid that hi-prio shutdown msgs bypass msgs in link wakeup queue */ +	tipc_wait_for_cond(sock, &timeout, (!tsk->cong_link_cnt && +					    !tsk_conn_cong(tsk))); +  	/* Reject all unreceived messages, except on an active connection  	 * (which disconnects locally & sends a 'FIN+' to peer).  	 */ @@ -505,7 +561,8 @@ static int tipc_release(struct socket *sock)  	/* Reject any messages that accumulated in backlog queue */  	release_sock(sk); - +	u32_list_purge(&tsk->cong_links); +	tsk->cong_link_cnt = 0;  	call_rcu(&tsk->rcu, tipc_sk_callback);  	sock->sk = NULL; @@ -648,7 +705,7 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,  	switch (sk->sk_state) {  	case TIPC_ESTABLISHED: -		if (!tsk->link_cong && !tsk_conn_cong(tsk)) +		if (!tsk->cong_link_cnt && !tsk_conn_cong(tsk))  			mask |= POLLOUT;  		/* fall thru' */  	case TIPC_LISTEN: @@ -657,7 +714,7 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,  			mask |= (POLLIN | POLLRDNORM);  		break;  	case TIPC_OPEN: -		if (!tsk->link_cong) +		if (!tsk->cong_link_cnt)  			mask |= POLLOUT;  		if (tipc_sk_type_connectionless(sk) &&  		    (!skb_queue_empty(&sk->sk_receive_queue))) @@ -676,63 +733,60 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,   * @sock: socket structure   * @seq: destination address   * @msg: message to send - * @dsz: total length of message data - * @timeo: timeout to wait for wakeup + * @dlen: length of data to send + * @timeout: timeout to wait for wakeup   *   * Called from function tipc_sendmsg(), which has done all sanity checks   * Returns the number of bytes sent on success, or errno   */  static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq, -			  struct msghdr *msg, size_t dsz, long timeo) +			  struct msghdr *msg, size_t dlen, long timeout)  {  	struct sock *sk = sock->sk;  	struct tipc_sock *tsk = tipc_sk(sk); +	struct tipc_msg *hdr = &tsk->phdr;  	struct net *net = sock_net(sk); -	struct tipc_msg *mhdr = &tsk->phdr; -	struct sk_buff_head pktchain; -	struct iov_iter save = msg->msg_iter; -	uint mtu; +	int mtu = tipc_bcast_get_mtu(net); +	struct tipc_mc_method *method = &tsk->mc_method; +	u32 domain = addr_domain(net, TIPC_CLUSTER_SCOPE); +	struct sk_buff_head pkts; +	struct tipc_nlist dsts;  	int rc; -	if (!timeo && tsk->link_cong) -		return -ELINKCONG; +	/* Block or return if any destination link is congested */ +	rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt); +	if (unlikely(rc)) +		return rc; -	msg_set_type(mhdr, TIPC_MCAST_MSG); -	msg_set_lookup_scope(mhdr, TIPC_CLUSTER_SCOPE); -	msg_set_destport(mhdr, 0); -	msg_set_destnode(mhdr, 0); -	msg_set_nametype(mhdr, seq->type); -	msg_set_namelower(mhdr, seq->lower); -	msg_set_nameupper(mhdr, seq->upper); -	msg_set_hdr_sz(mhdr, MCAST_H_SIZE); +	/* Lookup destination nodes */ +	tipc_nlist_init(&dsts, tipc_own_addr(net)); +	tipc_nametbl_lookup_dst_nodes(net, seq->type, seq->lower, +				      seq->upper, domain, &dsts); +	if (!dsts.local && !dsts.remote) +		return -EHOSTUNREACH; -	skb_queue_head_init(&pktchain); +	/* Build message header */ +	msg_set_type(hdr, TIPC_MCAST_MSG); +	msg_set_hdr_sz(hdr, MCAST_H_SIZE); +	msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE); +	msg_set_destport(hdr, 0); +	msg_set_destnode(hdr, 0); +	msg_set_nametype(hdr, seq->type); +	msg_set_namelower(hdr, seq->lower); +	msg_set_nameupper(hdr, seq->upper); -new_mtu: -	mtu = tipc_bcast_get_mtu(net); -	rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, &pktchain); -	if (unlikely(rc < 0)) -		return rc; +	/* Build message as chain of buffers */ +	skb_queue_head_init(&pkts); +	rc = tipc_msg_build(hdr, msg, 0, dlen, mtu, &pkts); -	do { -		rc = tipc_bcast_xmit(net, &pktchain); -		if (likely(!rc)) -			return dsz; - -		if (rc == -ELINKCONG) { -			tsk->link_cong = 1; -			rc = tipc_wait_for_sndmsg(sock, &timeo); -			if (!rc) -				continue; -		} -		__skb_queue_purge(&pktchain); -		if (rc == -EMSGSIZE) { -			msg->msg_iter = save; -			goto new_mtu; -		} -		break; -	} while (1); -	return rc; +	/* Send message if build was successful */ +	if (unlikely(rc == dlen)) +		rc = tipc_mcast_xmit(net, &pkts, method, &dsts, +				     &tsk->cong_link_cnt); + +	tipc_nlist_purge(&dsts); + +	return rc ? rc : dlen;  }  /** @@ -746,7 +800,7 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,  		       struct sk_buff_head *inputq)  {  	struct tipc_msg *msg; -	struct tipc_plist dports; +	struct list_head dports;  	u32 portid;  	u32 scope = TIPC_CLUSTER_SCOPE;  	struct sk_buff_head tmpq; @@ -754,7 +808,7 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,  	struct sk_buff *skb, *_skb;  	__skb_queue_head_init(&tmpq); -	tipc_plist_init(&dports); +	INIT_LIST_HEAD(&dports);  	skb = tipc_skb_peek(arrvq, &inputq->lock);  	for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) { @@ -768,8 +822,8 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,  		tipc_nametbl_mc_translate(net,  					  msg_nametype(msg), msg_namelower(msg),  					  msg_nameupper(msg), scope, &dports); -		portid = tipc_plist_pop(&dports); -		for (; portid; portid = tipc_plist_pop(&dports)) { +		portid = u32_pop(&dports); +		for (; portid; portid = u32_pop(&dports)) {  			_skb = __pskb_copy(skb, hsz, GFP_ATOMIC);  			if (_skb) {  				msg_set_destport(buf_msg(_skb), portid); @@ -830,31 +884,6 @@ exit:  	kfree_skb(skb);  } -static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p) -{ -	DEFINE_WAIT_FUNC(wait, woken_wake_function); -	struct sock *sk = sock->sk; -	struct tipc_sock *tsk = tipc_sk(sk); -	int done; - -	do { -		int err = sock_error(sk); -		if (err) -			return err; -		if (sk->sk_shutdown & SEND_SHUTDOWN) -			return -EPIPE; -		if (!*timeo_p) -			return -EAGAIN; -		if (signal_pending(current)) -			return sock_intr_errno(*timeo_p); - -		add_wait_queue(sk_sleep(sk), &wait); -		done = sk_wait_event(sk, timeo_p, !tsk->link_cong, &wait); -		remove_wait_queue(sk_sleep(sk), &wait); -	} while (!done); -	return 0; -} -  /**   * tipc_sendmsg - send message in connectionless manner   * @sock: socket structure @@ -881,35 +910,38 @@ static int tipc_sendmsg(struct socket *sock,  	return ret;  } -static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz) +static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)  { -	DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);  	struct sock *sk = sock->sk; -	struct tipc_sock *tsk = tipc_sk(sk);  	struct net *net = sock_net(sk); -	struct tipc_msg *mhdr = &tsk->phdr; -	u32 dnode, dport; -	struct sk_buff_head pktchain; -	bool is_connectionless = tipc_sk_type_connectionless(sk); -	struct sk_buff *skb; +	struct tipc_sock *tsk = tipc_sk(sk); +	DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); +	long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); +	struct list_head *clinks = &tsk->cong_links; +	bool syn = !tipc_sk_type_connectionless(sk); +	struct tipc_msg *hdr = &tsk->phdr;  	struct tipc_name_seq *seq; -	struct iov_iter save; -	u32 mtu; -	long timeo; -	int rc; +	struct sk_buff_head pkts; +	u32 type, inst, domain; +	u32 dnode, dport; +	int mtu, rc; -	if (dsz > TIPC_MAX_USER_MSG_SIZE) +	if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE))  		return -EMSGSIZE; +  	if (unlikely(!dest)) { -		if (is_connectionless && tsk->peer.family == AF_TIPC) -			dest = &tsk->peer; -		else +		dest = &tsk->peer; +		if (!syn || dest->family != AF_TIPC)  			return -EDESTADDRREQ; -	} else if (unlikely(m->msg_namelen < sizeof(*dest)) || -		   dest->family != AF_TIPC) { -		return -EINVAL;  	} -	if (!is_connectionless) { + +	if (unlikely(m->msg_namelen < sizeof(*dest))) +		return -EINVAL; + +	if (unlikely(dest->family != AF_TIPC)) +		return -EINVAL; + +	if (unlikely(syn)) {  		if (sk->sk_state == TIPC_LISTEN)  			return -EPIPE;  		if (sk->sk_state != TIPC_OPEN) @@ -921,102 +953,62 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz)  			tsk->conn_instance = dest->addr.name.name.instance;  		}  	} -	seq = &dest->addr.nameseq; -	timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); -	if (dest->addrtype == TIPC_ADDR_MCAST) { -		return tipc_sendmcast(sock, seq, m, dsz, timeo); -	} else if (dest->addrtype == TIPC_ADDR_NAME) { -		u32 type = dest->addr.name.name.type; -		u32 inst = dest->addr.name.name.instance; -		u32 domain = dest->addr.name.domain; +	seq = &dest->addr.nameseq; +	if (dest->addrtype == TIPC_ADDR_MCAST) +		return tipc_sendmcast(sock, seq, m, dlen, timeout); +	if (dest->addrtype == TIPC_ADDR_NAME) { +		type = dest->addr.name.name.type; +		inst = dest->addr.name.name.instance; +		domain = dest->addr.name.domain;  		dnode = domain; -		msg_set_type(mhdr, TIPC_NAMED_MSG); -		msg_set_hdr_sz(mhdr, NAMED_H_SIZE); -		msg_set_nametype(mhdr, type); -		msg_set_nameinst(mhdr, inst); -		msg_set_lookup_scope(mhdr, tipc_addr_scope(domain)); +		msg_set_type(hdr, TIPC_NAMED_MSG); +		msg_set_hdr_sz(hdr, NAMED_H_SIZE); +		msg_set_nametype(hdr, type); +		msg_set_nameinst(hdr, inst); +		msg_set_lookup_scope(hdr, tipc_addr_scope(domain));  		dport = tipc_nametbl_translate(net, type, inst, &dnode); -		msg_set_destnode(mhdr, dnode); -		msg_set_destport(mhdr, dport); +		msg_set_destnode(hdr, dnode); +		msg_set_destport(hdr, dport);  		if (unlikely(!dport && !dnode))  			return -EHOSTUNREACH; +  	} else if (dest->addrtype == TIPC_ADDR_ID) {  		dnode = dest->addr.id.node; -		msg_set_type(mhdr, TIPC_DIRECT_MSG); -		msg_set_lookup_scope(mhdr, 0); -		msg_set_destnode(mhdr, dnode); -		msg_set_destport(mhdr, dest->addr.id.ref); -		msg_set_hdr_sz(mhdr, BASIC_H_SIZE); +		msg_set_type(hdr, TIPC_DIRECT_MSG); +		msg_set_lookup_scope(hdr, 0); +		msg_set_destnode(hdr, dnode); +		msg_set_destport(hdr, dest->addr.id.ref); +		msg_set_hdr_sz(hdr, BASIC_H_SIZE);  	} -	skb_queue_head_init(&pktchain); -	save = m->msg_iter; -new_mtu: -	mtu = tipc_node_get_mtu(net, dnode, tsk->portid); -	rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, &pktchain); -	if (rc < 0) +	/* Block or return if destination link is congested */ +	rc = tipc_wait_for_cond(sock, &timeout, !u32_find(clinks, dnode)); +	if (unlikely(rc))  		return rc; -	do { -		skb = skb_peek(&pktchain); -		TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong; -		rc = tipc_node_xmit(net, &pktchain, dnode, tsk->portid); -		if (likely(!rc)) { -			if (!is_connectionless) -				tipc_set_sk_state(sk, TIPC_CONNECTING); -			return dsz; -		} -		if (rc == -ELINKCONG) { -			tsk->link_cong = 1; -			rc = tipc_wait_for_sndmsg(sock, &timeo); -			if (!rc) -				continue; -		} -		__skb_queue_purge(&pktchain); -		if (rc == -EMSGSIZE) { -			m->msg_iter = save; -			goto new_mtu; -		} -		break; -	} while (1); - -	return rc; -} +	skb_queue_head_init(&pkts); +	mtu = tipc_node_get_mtu(net, dnode, tsk->portid); +	rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts); +	if (unlikely(rc != dlen)) +		return rc; -static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p) -{ -	DEFINE_WAIT_FUNC(wait, woken_wake_function); -	struct sock *sk = sock->sk; -	struct tipc_sock *tsk = tipc_sk(sk); -	int done; +	rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid); +	if (unlikely(rc == -ELINKCONG)) { +		u32_push(clinks, dnode); +		tsk->cong_link_cnt++; +		rc = 0; +	} -	do { -		int err = sock_error(sk); -		if (err) -			return err; -		if (sk->sk_state == TIPC_DISCONNECTING) -			return -EPIPE; -		else if (!tipc_sk_connected(sk)) -			return -ENOTCONN; -		if (!*timeo_p) -			return -EAGAIN; -		if (signal_pending(current)) -			return sock_intr_errno(*timeo_p); +	if (unlikely(syn && !rc)) +		tipc_set_sk_state(sk, TIPC_CONNECTING); -		add_wait_queue(sk_sleep(sk), &wait); -		done = sk_wait_event(sk, timeo_p, -				     (!tsk->link_cong && -				      !tsk_conn_cong(tsk)) || -				      !tipc_sk_connected(sk), &wait); -		remove_wait_queue(sk_sleep(sk), &wait); -	} while (!done); -	return 0; +	return rc ? rc : dlen;  }  /** - * tipc_send_stream - send stream-oriented data + * tipc_sendstream - send stream-oriented data   * @sock: socket structure   * @m: data to send   * @dsz: total length of data to be transmitted @@ -1026,94 +1018,69 @@ static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)   * Returns the number of bytes sent on success (or partial success),   * or errno if no data sent   */ -static int tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz) +static int tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz)  {  	struct sock *sk = sock->sk;  	int ret;  	lock_sock(sk); -	ret = __tipc_send_stream(sock, m, dsz); +	ret = __tipc_sendstream(sock, m, dsz);  	release_sock(sk);  	return ret;  } -static int __tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz) +static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen)  {  	struct sock *sk = sock->sk; -	struct net *net = sock_net(sk); -	struct tipc_sock *tsk = tipc_sk(sk); -	struct tipc_msg *mhdr = &tsk->phdr; -	struct sk_buff_head pktchain;  	DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); -	u32 portid = tsk->portid; -	int rc = -EINVAL; -	long timeo; -	u32 dnode; -	uint mtu, send, sent = 0; -	struct iov_iter save; -	int hlen = MIN_H_SIZE; - -	/* Handle implied connection establishment */ -	if (unlikely(dest)) { -		rc = __tipc_sendmsg(sock, m, dsz); -		hlen = msg_hdr_sz(mhdr); -		if (dsz && (dsz == rc)) -			tsk->snt_unacked = tsk_inc(tsk, dsz + hlen); -		return rc; -	} -	if (dsz > (uint)INT_MAX) -		return -EMSGSIZE; - -	if (unlikely(!tipc_sk_connected(sk))) { -		if (sk->sk_state == TIPC_DISCONNECTING) -			return -EPIPE; -		else -			return -ENOTCONN; -	} +	long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); +	struct tipc_sock *tsk = tipc_sk(sk); +	struct tipc_msg *hdr = &tsk->phdr; +	struct net *net = sock_net(sk); +	struct sk_buff_head pkts; +	u32 dnode = tsk_peer_node(tsk); +	int send, sent = 0; +	int rc = 0; -	timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); -	if (!timeo && tsk->link_cong) -		return -ELINKCONG; +	skb_queue_head_init(&pkts); -	dnode = tsk_peer_node(tsk); -	skb_queue_head_init(&pktchain); +	if (unlikely(dlen > INT_MAX)) +		return -EMSGSIZE; -next: -	save = m->msg_iter; -	mtu = tsk->max_pkt; -	send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE); -	rc = tipc_msg_build(mhdr, m, sent, send, mtu, &pktchain); -	if (unlikely(rc < 0)) +	/* Handle implicit connection setup */ +	if (unlikely(dest)) { +		rc = __tipc_sendmsg(sock, m, dlen); +		if (dlen && (dlen == rc)) +			tsk->snt_unacked = tsk_inc(tsk, dlen + msg_hdr_sz(hdr));  		return rc; +	}  	do { -		if (likely(!tsk_conn_cong(tsk))) { -			rc = tipc_node_xmit(net, &pktchain, dnode, portid); -			if (likely(!rc)) { -				tsk->snt_unacked += tsk_inc(tsk, send + hlen); -				sent += send; -				if (sent == dsz) -					return dsz; -				goto next; -			} -			if (rc == -EMSGSIZE) { -				__skb_queue_purge(&pktchain); -				tsk->max_pkt = tipc_node_get_mtu(net, dnode, -								 portid); -				m->msg_iter = save; -				goto next; -			} -			if (rc != -ELINKCONG) -				break; +		rc = tipc_wait_for_cond(sock, &timeout, +					(!tsk->cong_link_cnt && +					 !tsk_conn_cong(tsk) && +					 tipc_sk_connected(sk))); +		if (unlikely(rc)) +			break; -			tsk->link_cong = 1; +		send = min_t(size_t, dlen - sent, TIPC_MAX_USER_MSG_SIZE); +		rc = tipc_msg_build(hdr, m, sent, send, tsk->max_pkt, &pkts); +		if (unlikely(rc != send)) +			break; + +		rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid); +		if (unlikely(rc == -ELINKCONG)) { +			tsk->cong_link_cnt = 1; +			rc = 0;  		} -		rc = tipc_wait_for_sndpkt(sock, &timeo); -	} while (!rc); +		if (likely(!rc)) { +			tsk->snt_unacked += tsk_inc(tsk, send + MIN_H_SIZE); +			sent += send; +		} +	} while (sent < dlen && !rc); -	__skb_queue_purge(&pktchain); -	return sent ? sent : rc; +	return rc ? rc : sent;  }  /** @@ -1131,7 +1098,7 @@ static int tipc_send_packet(struct socket *sock, struct msghdr *m, size_t dsz)  	if (dsz > TIPC_MAX_USER_MSG_SIZE)  		return -EMSGSIZE; -	return tipc_send_stream(sock, m, dsz); +	return tipc_sendstream(sock, m, dsz);  }  /* tipc_sk_finish_conn - complete the setup of a connection @@ -1698,6 +1665,7 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb,  	unsigned int limit = rcvbuf_limit(sk, skb);  	int err = TIPC_OK;  	int usr = msg_user(hdr); +	u32 onode;  	if (unlikely(msg_user(hdr) == CONN_MANAGER)) {  		tipc_sk_proto_rcv(tsk, skb, xmitq); @@ -1705,8 +1673,10 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb,  	}  	if (unlikely(usr == SOCK_WAKEUP)) { +		onode = msg_orignode(hdr);  		kfree_skb(skb); -		tsk->link_cong = 0; +		u32_del(&tsk->cong_links, onode); +		tsk->cong_link_cnt--;  		sk->sk_write_space(sk);  		return false;  	} @@ -2114,7 +2084,7 @@ static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags)  		struct msghdr m = {NULL,};  		tsk_advance_rx_queue(sk); -		__tipc_send_stream(new_sock, &m, 0); +		__tipc_sendstream(new_sock, &m, 0);  	} else {  		__skb_dequeue(&sk->sk_receive_queue);  		__skb_queue_head(&new_sk->sk_receive_queue, buf); @@ -2269,24 +2239,27 @@ static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,  void tipc_sk_reinit(struct net *net)  {  	struct tipc_net *tn = net_generic(net, tipc_net_id); -	const struct bucket_table *tbl; -	struct rhash_head *pos; +	struct rhashtable_iter iter;  	struct tipc_sock *tsk;  	struct tipc_msg *msg; -	int i; -	rcu_read_lock(); -	tbl = rht_dereference_rcu((&tn->sk_rht)->tbl, &tn->sk_rht); -	for (i = 0; i < tbl->size; i++) { -		rht_for_each_entry_rcu(tsk, pos, tbl, i, node) { +	rhashtable_walk_enter(&tn->sk_rht, &iter); + +	do { +		tsk = ERR_PTR(rhashtable_walk_start(&iter)); +		if (tsk) +			continue; + +		while ((tsk = rhashtable_walk_next(&iter)) && !IS_ERR(tsk)) {  			spin_lock_bh(&tsk->sk.sk_lock.slock);  			msg = &tsk->phdr;  			msg_set_prevnode(msg, tn->own_addr);  			msg_set_orignode(msg, tn->own_addr);  			spin_unlock_bh(&tsk->sk.sk_lock.slock);  		} -	} -	rcu_read_unlock(); + +		rhashtable_walk_stop(&iter); +	} while (tsk == ERR_PTR(-EAGAIN));  }  static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid) @@ -2382,18 +2355,29 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,  {  	struct sock *sk = sock->sk;  	struct tipc_sock *tsk = tipc_sk(sk); -	u32 value; -	int res; +	u32 value = 0; +	int res = 0;  	if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))  		return 0;  	if (lvl != SOL_TIPC)  		return -ENOPROTOOPT; -	if (ol < sizeof(value)) -		return -EINVAL; -	res = get_user(value, (u32 __user *)ov); -	if (res) -		return res; + +	switch (opt) { +	case TIPC_IMPORTANCE: +	case TIPC_SRC_DROPPABLE: +	case TIPC_DEST_DROPPABLE: +	case TIPC_CONN_TIMEOUT: +		if (ol < sizeof(value)) +			return -EINVAL; +		res = get_user(value, (u32 __user *)ov); +		if (res) +			return res; +		break; +	default: +		if (ov || ol) +			return -EINVAL; +	}  	lock_sock(sk); @@ -2412,7 +2396,14 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,  		break;  	case TIPC_CONN_TIMEOUT:  		tipc_sk(sk)->conn_timeout = value; -		/* no need to set "res", since already 0 at this point */ +		break; +	case TIPC_MCAST_BROADCAST: +		tsk->mc_method.rcast = false; +		tsk->mc_method.mandatory = true; +		break; +	case TIPC_MCAST_REPLICAST: +		tsk->mc_method.rcast = true; +		tsk->mc_method.mandatory = true;  		break;  	default:  		res = -EINVAL; @@ -2575,7 +2566,7 @@ static const struct proto_ops stream_ops = {  	.shutdown	= tipc_shutdown,  	.setsockopt	= tipc_setsockopt,  	.getsockopt	= tipc_getsockopt, -	.sendmsg	= tipc_send_stream, +	.sendmsg	= tipc_sendstream,  	.recvmsg	= tipc_recv_stream,  	.mmap		= sock_no_mmap,  	.sendpage	= sock_no_sendpage diff --git a/net/tipc/udp_media.c b/net/tipc/udp_media.c index b58dc95f3d35..46061cf48cd1 100644 --- a/net/tipc/udp_media.c +++ b/net/tipc/udp_media.c @@ -113,7 +113,7 @@ static void tipc_udp_media_addr_set(struct tipc_media_addr *addr,  	memcpy(addr->value, ua, sizeof(struct udp_media_addr));  	if (tipc_udp_is_mcast_addr(ua)) -		addr->broadcast = 1; +		addr->broadcast = TIPC_BROADCAST_SUPPORT;  }  /* tipc_udp_addr2str - convert ip/udp address to string */ @@ -229,7 +229,7 @@ static int tipc_udp_send_msg(struct net *net, struct sk_buff *skb,  		goto out;  	} -	if (!addr->broadcast || list_empty(&ub->rcast.list)) +	if (addr->broadcast != TIPC_REPLICAST_SUPPORT)  		return tipc_udp_xmit(net, skb, ub, src, dst);  	/* Replicast, send an skb to each configured IP address */ @@ -296,7 +296,7 @@ static int tipc_udp_rcast_add(struct tipc_bearer *b,  	else if (ntohs(addr->proto) == ETH_P_IPV6)  		pr_info("New replicast peer: %pI6\n", &rcast->addr.ipv6);  #endif - +	b->bcast_addr.broadcast = TIPC_REPLICAST_SUPPORT;  	list_add_rcu(&rcast->list, &ub->rcast.list);  	return 0;  } @@ -681,7 +681,7 @@ static int tipc_udp_enable(struct net *net, struct tipc_bearer *b,  		goto err;  	b->bcast_addr.media_id = TIPC_MEDIA_TYPE_UDP; -	b->bcast_addr.broadcast = 1; +	b->bcast_addr.broadcast = TIPC_BROADCAST_SUPPORT;  	rcu_assign_pointer(b->media_ptr, ub);  	rcu_assign_pointer(ub->bearer, b);  	tipc_udp_media_addr_set(&b->addr, &local); | 
