diff options
Diffstat (limited to 'kernel/trace/ring_buffer.c')
| -rw-r--r-- | kernel/trace/ring_buffer.c | 179 | 
1 files changed, 154 insertions, 25 deletions
| diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c index 41ca394feb22..7f6059c5aa94 100644 --- a/kernel/trace/ring_buffer.c +++ b/kernel/trace/ring_buffer.c @@ -319,6 +319,11 @@ EXPORT_SYMBOL_GPL(ring_buffer_event_data);  #define TS_MASK		((1ULL << TS_SHIFT) - 1)  #define TS_DELTA_TEST	(~TS_MASK) +/* Flag when events were overwritten */ +#define RB_MISSED_EVENTS	(1 << 31) +/* Missed count stored at end */ +#define RB_MISSED_STORED	(1 << 30) +  struct buffer_data_page {  	u64		 time_stamp;	/* page time stamp */  	local_t		 commit;	/* write committed index */ @@ -338,6 +343,7 @@ struct buffer_page {  	local_t		 write;		/* index for next write */  	unsigned	 read;		/* index for next read */  	local_t		 entries;	/* entries on this page */ +	unsigned long	 real_end;	/* real end of data */  	struct buffer_data_page *page;	/* Actual data page */  }; @@ -417,6 +423,12 @@ int ring_buffer_print_page_header(struct trace_seq *s)  			       (unsigned int)sizeof(field.commit),  			       (unsigned int)is_signed_type(long)); +	ret = trace_seq_printf(s, "\tfield: int overwrite;\t" +			       "offset:%u;\tsize:%u;\tsigned:%u;\n", +			       (unsigned int)offsetof(typeof(field), commit), +			       1, +			       (unsigned int)is_signed_type(long)); +  	ret = trace_seq_printf(s, "\tfield: char data;\t"  			       "offset:%u;\tsize:%u;\tsigned:%u;\n",  			       (unsigned int)offsetof(typeof(field), data), @@ -440,6 +452,8 @@ struct ring_buffer_per_cpu {  	struct buffer_page		*tail_page;	/* write to tail */  	struct buffer_page		*commit_page;	/* committed pages */  	struct buffer_page		*reader_page; +	unsigned long			lost_events; +	unsigned long			last_overrun;  	local_t				commit_overrun;  	local_t				overrun;  	local_t				entries; @@ -1762,6 +1776,13 @@ rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer,  	kmemcheck_annotate_bitfield(event, bitfield);  	/* +	 * Save the original length to the meta data. +	 * This will be used by the reader to add lost event +	 * counter. +	 */ +	tail_page->real_end = tail; + +	/*  	 * If this event is bigger than the minimum size, then  	 * we need to be careful that we don't subtract the  	 * write counter enough to allow another writer to slip @@ -1979,17 +2000,13 @@ rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer,  		  u64 *ts, u64 *delta)  {  	struct ring_buffer_event *event; -	static int once;  	int ret; -	if (unlikely(*delta > (1ULL << 59) && !once++)) { -		printk(KERN_WARNING "Delta way too big! %llu" -		       " ts=%llu write stamp = %llu\n", -		       (unsigned long long)*delta, -		       (unsigned long long)*ts, -		       (unsigned long long)cpu_buffer->write_stamp); -		WARN_ON(1); -	} +	WARN_ONCE(*delta > (1ULL << 59), +		  KERN_WARNING "Delta way too big! %llu ts=%llu write stamp = %llu\n", +		  (unsigned long long)*delta, +		  (unsigned long long)*ts, +		  (unsigned long long)cpu_buffer->write_stamp);  	/*  	 * The delta is too big, we to add a @@ -2838,6 +2855,7 @@ static struct buffer_page *  rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)  {  	struct buffer_page *reader = NULL; +	unsigned long overwrite;  	unsigned long flags;  	int nr_loops = 0;  	int ret; @@ -2879,6 +2897,7 @@ rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)  	local_set(&cpu_buffer->reader_page->write, 0);  	local_set(&cpu_buffer->reader_page->entries, 0);  	local_set(&cpu_buffer->reader_page->page->commit, 0); +	cpu_buffer->reader_page->real_end = 0;   spin:  	/* @@ -2899,6 +2918,18 @@ rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)  	rb_set_list_to_head(cpu_buffer, &cpu_buffer->reader_page->list);  	/* +	 * We want to make sure we read the overruns after we set up our +	 * pointers to the next object. The writer side does a +	 * cmpxchg to cross pages which acts as the mb on the writer +	 * side. Note, the reader will constantly fail the swap +	 * while the writer is updating the pointers, so this +	 * guarantees that the overwrite recorded here is the one we +	 * want to compare with the last_overrun. +	 */ +	smp_mb(); +	overwrite = local_read(&(cpu_buffer->overrun)); + +	/*  	 * Here's the tricky part.  	 *  	 * We need to move the pointer past the header page. @@ -2929,6 +2960,11 @@ rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)  	cpu_buffer->reader_page = reader;  	rb_reset_reader_page(cpu_buffer); +	if (overwrite != cpu_buffer->last_overrun) { +		cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun; +		cpu_buffer->last_overrun = overwrite; +	} +  	goto again;   out: @@ -3005,8 +3041,14 @@ static void rb_advance_iter(struct ring_buffer_iter *iter)  		rb_advance_iter(iter);  } +static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer) +{ +	return cpu_buffer->lost_events; +} +  static struct ring_buffer_event * -rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts) +rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts, +	       unsigned long *lost_events)  {  	struct ring_buffer_event *event;  	struct buffer_page *reader; @@ -3058,6 +3100,8 @@ rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts)  			ring_buffer_normalize_time_stamp(cpu_buffer->buffer,  							 cpu_buffer->cpu, ts);  		} +		if (lost_events) +			*lost_events = rb_lost_events(cpu_buffer);  		return event;  	default: @@ -3168,12 +3212,14 @@ static inline int rb_ok_to_lock(void)   * @buffer: The ring buffer to read   * @cpu: The cpu to peak at   * @ts: The timestamp counter of this event. + * @lost_events: a variable to store if events were lost (may be NULL)   *   * This will return the event that will be read next, but does   * not consume the data.   */  struct ring_buffer_event * -ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts) +ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts, +		 unsigned long *lost_events)  {  	struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];  	struct ring_buffer_event *event; @@ -3188,7 +3234,7 @@ ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)  	local_irq_save(flags);  	if (dolock)  		spin_lock(&cpu_buffer->reader_lock); -	event = rb_buffer_peek(cpu_buffer, ts); +	event = rb_buffer_peek(cpu_buffer, ts, lost_events);  	if (event && event->type_len == RINGBUF_TYPE_PADDING)  		rb_advance_reader(cpu_buffer);  	if (dolock) @@ -3230,13 +3276,17 @@ ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)  /**   * ring_buffer_consume - return an event and consume it   * @buffer: The ring buffer to get the next event from + * @cpu: the cpu to read the buffer from + * @ts: a variable to store the timestamp (may be NULL) + * @lost_events: a variable to store if events were lost (may be NULL)   *   * Returns the next event in the ring buffer, and that event is consumed.   * Meaning, that sequential reads will keep returning a different event,   * and eventually empty the ring buffer if the producer is slower.   */  struct ring_buffer_event * -ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts) +ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts, +		    unsigned long *lost_events)  {  	struct ring_buffer_per_cpu *cpu_buffer;  	struct ring_buffer_event *event = NULL; @@ -3257,9 +3307,11 @@ ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts)  	if (dolock)  		spin_lock(&cpu_buffer->reader_lock); -	event = rb_buffer_peek(cpu_buffer, ts); -	if (event) +	event = rb_buffer_peek(cpu_buffer, ts, lost_events); +	if (event) { +		cpu_buffer->lost_events = 0;  		rb_advance_reader(cpu_buffer); +	}  	if (dolock)  		spin_unlock(&cpu_buffer->reader_lock); @@ -3276,23 +3328,30 @@ ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts)  EXPORT_SYMBOL_GPL(ring_buffer_consume);  /** - * ring_buffer_read_start - start a non consuming read of the buffer + * ring_buffer_read_prepare - Prepare for a non consuming read of the buffer   * @buffer: The ring buffer to read from   * @cpu: The cpu buffer to iterate over   * - * This starts up an iteration through the buffer. It also disables - * the recording to the buffer until the reading is finished. - * This prevents the reading from being corrupted. This is not - * a consuming read, so a producer is not expected. + * This performs the initial preparations necessary to iterate + * through the buffer.  Memory is allocated, buffer recording + * is disabled, and the iterator pointer is returned to the caller.   * - * Must be paired with ring_buffer_finish. + * Disabling buffer recordng prevents the reading from being + * corrupted. This is not a consuming read, so a producer is not + * expected. + * + * After a sequence of ring_buffer_read_prepare calls, the user is + * expected to make at least one call to ring_buffer_prepare_sync. + * Afterwards, ring_buffer_read_start is invoked to get things going + * for real. + * + * This overall must be paired with ring_buffer_finish.   */  struct ring_buffer_iter * -ring_buffer_read_start(struct ring_buffer *buffer, int cpu) +ring_buffer_read_prepare(struct ring_buffer *buffer, int cpu)  {  	struct ring_buffer_per_cpu *cpu_buffer;  	struct ring_buffer_iter *iter; -	unsigned long flags;  	if (!cpumask_test_cpu(cpu, buffer->cpumask))  		return NULL; @@ -3306,15 +3365,52 @@ ring_buffer_read_start(struct ring_buffer *buffer, int cpu)  	iter->cpu_buffer = cpu_buffer;  	atomic_inc(&cpu_buffer->record_disabled); + +	return iter; +} +EXPORT_SYMBOL_GPL(ring_buffer_read_prepare); + +/** + * ring_buffer_read_prepare_sync - Synchronize a set of prepare calls + * + * All previously invoked ring_buffer_read_prepare calls to prepare + * iterators will be synchronized.  Afterwards, read_buffer_read_start + * calls on those iterators are allowed. + */ +void +ring_buffer_read_prepare_sync(void) +{  	synchronize_sched(); +} +EXPORT_SYMBOL_GPL(ring_buffer_read_prepare_sync); + +/** + * ring_buffer_read_start - start a non consuming read of the buffer + * @iter: The iterator returned by ring_buffer_read_prepare + * + * This finalizes the startup of an iteration through the buffer. + * The iterator comes from a call to ring_buffer_read_prepare and + * an intervening ring_buffer_read_prepare_sync must have been + * performed. + * + * Must be paired with ring_buffer_finish. + */ +void +ring_buffer_read_start(struct ring_buffer_iter *iter) +{ +	struct ring_buffer_per_cpu *cpu_buffer; +	unsigned long flags; + +	if (!iter) +		return; + +	cpu_buffer = iter->cpu_buffer;  	spin_lock_irqsave(&cpu_buffer->reader_lock, flags);  	arch_spin_lock(&cpu_buffer->lock);  	rb_iter_reset(iter);  	arch_spin_unlock(&cpu_buffer->lock);  	spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); - -	return iter;  }  EXPORT_SYMBOL_GPL(ring_buffer_read_start); @@ -3408,6 +3504,9 @@ rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)  	cpu_buffer->write_stamp = 0;  	cpu_buffer->read_stamp = 0; +	cpu_buffer->lost_events = 0; +	cpu_buffer->last_overrun = 0; +  	rb_head_page_activate(cpu_buffer);  } @@ -3683,6 +3782,7 @@ int ring_buffer_read_page(struct ring_buffer *buffer,  	struct ring_buffer_event *event;  	struct buffer_data_page *bpage;  	struct buffer_page *reader; +	unsigned long missed_events;  	unsigned long flags;  	unsigned int commit;  	unsigned int read; @@ -3719,6 +3819,9 @@ int ring_buffer_read_page(struct ring_buffer *buffer,  	read = reader->read;  	commit = rb_page_commit(reader); +	/* Check if any events were dropped */ +	missed_events = cpu_buffer->lost_events; +  	/*  	 * If this page has been partially read or  	 * if len is not big enough to read the rest of the page or @@ -3779,9 +3882,35 @@ int ring_buffer_read_page(struct ring_buffer *buffer,  		local_set(&reader->entries, 0);  		reader->read = 0;  		*data_page = bpage; + +		/* +		 * Use the real_end for the data size, +		 * This gives us a chance to store the lost events +		 * on the page. +		 */ +		if (reader->real_end) +			local_set(&bpage->commit, reader->real_end);  	}  	ret = read; +	cpu_buffer->lost_events = 0; +	/* +	 * Set a flag in the commit field if we lost events +	 */ +	if (missed_events) { +		commit = local_read(&bpage->commit); + +		/* If there is room at the end of the page to save the +		 * missed events, then record it there. +		 */ +		if (BUF_PAGE_SIZE - commit >= sizeof(missed_events)) { +			memcpy(&bpage->data[commit], &missed_events, +			       sizeof(missed_events)); +			local_add(RB_MISSED_STORED, &bpage->commit); +		} +		local_add(RB_MISSED_EVENTS, &bpage->commit); +	} +   out_unlock:  	spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); | 
