diff options
Diffstat (limited to 'lib/sbitmap.c')
| -rw-r--r-- | lib/sbitmap.c | 109 | 
1 files changed, 70 insertions, 39 deletions
| diff --git a/lib/sbitmap.c b/lib/sbitmap.c index 29eb0484215a..a8108a962dfd 100644 --- a/lib/sbitmap.c +++ b/lib/sbitmap.c @@ -533,21 +533,20 @@ unsigned long __sbitmap_queue_get_batch(struct sbitmap_queue *sbq, int nr_tags,  		nr = find_first_zero_bit(&map->word, map_depth);  		if (nr + nr_tags <= map_depth) {  			atomic_long_t *ptr = (atomic_long_t *) &map->word; -			int map_tags = min_t(int, nr_tags, map_depth); -			unsigned long val, ret; +			unsigned long val; -			get_mask = ((1UL << map_tags) - 1) << nr; +			get_mask = ((1UL << nr_tags) - 1) << nr; +			val = READ_ONCE(map->word);  			do { -				val = READ_ONCE(map->word);  				if ((val & ~get_mask) != val)  					goto next; -				ret = atomic_long_cmpxchg(ptr, val, get_mask | val); -			} while (ret != val); -			get_mask = (get_mask & ~ret) >> nr; +			} while (!atomic_long_try_cmpxchg(ptr, &val, +							  get_mask | val)); +			get_mask = (get_mask & ~val) >> nr;  			if (get_mask) {  				*offset = nr + (index << sb->shift);  				update_alloc_hint_after_get(sb, depth, hint, -							*offset + map_tags - 1); +							*offset + nr_tags - 1);  				return get_mask;  			}  		} @@ -588,7 +587,7 @@ static struct sbq_wait_state *sbq_wake_ptr(struct sbitmap_queue *sbq)  	for (i = 0; i < SBQ_WAIT_QUEUES; i++) {  		struct sbq_wait_state *ws = &sbq->ws[wake_index]; -		if (waitqueue_active(&ws->wait)) { +		if (waitqueue_active(&ws->wait) && atomic_read(&ws->wait_cnt)) {  			if (wake_index != atomic_read(&sbq->wake_index))  				atomic_set(&sbq->wake_index, wake_index);  			return ws; @@ -600,50 +599,82 @@ static struct sbq_wait_state *sbq_wake_ptr(struct sbitmap_queue *sbq)  	return NULL;  } -static bool __sbq_wake_up(struct sbitmap_queue *sbq) +static bool __sbq_wake_up(struct sbitmap_queue *sbq, int *nr)  {  	struct sbq_wait_state *ws;  	unsigned int wake_batch; -	int wait_cnt; +	int wait_cnt, cur, sub; +	bool ret; + +	if (*nr <= 0) +		return false;  	ws = sbq_wake_ptr(sbq);  	if (!ws)  		return false; -	wait_cnt = atomic_dec_return(&ws->wait_cnt); -	if (wait_cnt <= 0) { -		int ret; - -		wake_batch = READ_ONCE(sbq->wake_batch); - +	cur = atomic_read(&ws->wait_cnt); +	do {  		/* -		 * Pairs with the memory barrier in sbitmap_queue_resize() to -		 * ensure that we see the batch size update before the wait -		 * count is reset. +		 * For concurrent callers of this, callers should call this +		 * function again to wakeup a new batch on a different 'ws'.  		 */ -		smp_mb__before_atomic(); +		if (cur == 0) +			return true; +		sub = min(*nr, cur); +		wait_cnt = cur - sub; +	} while (!atomic_try_cmpxchg(&ws->wait_cnt, &cur, wait_cnt)); -		/* -		 * For concurrent callers of this, the one that failed the -		 * atomic_cmpxhcg() race should call this function again -		 * to wakeup a new batch on a different 'ws'. -		 */ -		ret = atomic_cmpxchg(&ws->wait_cnt, wait_cnt, wake_batch); -		if (ret == wait_cnt) { -			sbq_index_atomic_inc(&sbq->wake_index); -			wake_up_nr(&ws->wait, wake_batch); -			return false; -		} +	/* +	 * If we decremented queue without waiters, retry to avoid lost +	 * wakeups. +	 */ +	if (wait_cnt > 0) +		return !waitqueue_active(&ws->wait); -		return true; -	} +	*nr -= sub; -	return false; +	/* +	 * When wait_cnt == 0, we have to be particularly careful as we are +	 * responsible to reset wait_cnt regardless whether we've actually +	 * woken up anybody. But in case we didn't wakeup anybody, we still +	 * need to retry. +	 */ +	ret = !waitqueue_active(&ws->wait); +	wake_batch = READ_ONCE(sbq->wake_batch); + +	/* +	 * Wake up first in case that concurrent callers decrease wait_cnt +	 * while waitqueue is empty. +	 */ +	wake_up_nr(&ws->wait, wake_batch); + +	/* +	 * Pairs with the memory barrier in sbitmap_queue_resize() to +	 * ensure that we see the batch size update before the wait +	 * count is reset. +	 * +	 * Also pairs with the implicit barrier between decrementing wait_cnt +	 * and checking for waitqueue_active() to make sure waitqueue_active() +	 * sees result of the wakeup if atomic_dec_return() has seen the result +	 * of atomic_set(). +	 */ +	smp_mb__before_atomic(); + +	/* +	 * Increase wake_index before updating wait_cnt, otherwise concurrent +	 * callers can see valid wait_cnt in old waitqueue, which can cause +	 * invalid wakeup on the old waitqueue. +	 */ +	sbq_index_atomic_inc(&sbq->wake_index); +	atomic_set(&ws->wait_cnt, wake_batch); + +	return ret || *nr;  } -void sbitmap_queue_wake_up(struct sbitmap_queue *sbq) +void sbitmap_queue_wake_up(struct sbitmap_queue *sbq, int nr)  { -	while (__sbq_wake_up(sbq)) +	while (__sbq_wake_up(sbq, &nr))  		;  }  EXPORT_SYMBOL_GPL(sbitmap_queue_wake_up); @@ -683,7 +714,7 @@ void sbitmap_queue_clear_batch(struct sbitmap_queue *sbq, int offset,  		atomic_long_andnot(mask, (atomic_long_t *) addr);  	smp_mb__after_atomic(); -	sbitmap_queue_wake_up(sbq); +	sbitmap_queue_wake_up(sbq, nr_tags);  	sbitmap_update_cpu_hint(&sbq->sb, raw_smp_processor_id(),  					tags[nr_tags - 1] - offset);  } @@ -711,7 +742,7 @@ void sbitmap_queue_clear(struct sbitmap_queue *sbq, unsigned int nr,  	 * waiter. See the comment on waitqueue_active().  	 */  	smp_mb__after_atomic(); -	sbitmap_queue_wake_up(sbq); +	sbitmap_queue_wake_up(sbq, 1);  	sbitmap_update_cpu_hint(&sbq->sb, cpu, nr);  }  EXPORT_SYMBOL_GPL(sbitmap_queue_clear); | 
