...

Source file src/pkg/runtime/profbuf.go

     1	// Copyright 2017 The Go Authors.  All rights reserved.
     2	// Use of this source code is governed by a BSD-style
     3	// license that can be found in the LICENSE file.
     4	
     5	package runtime
     6	
     7	import (
     8		"runtime/internal/atomic"
     9		"unsafe"
    10	)
    11	
    12	// A profBuf is a lock-free buffer for profiling events,
    13	// safe for concurrent use by one reader and one writer.
    14	// The writer may be a signal handler running without a user g.
    15	// The reader is assumed to be a user g.
    16	//
    17	// Each logged event corresponds to a fixed size header, a list of
    18	// uintptrs (typically a stack), and exactly one unsafe.Pointer tag.
    19	// The header and uintptrs are stored in the circular buffer data and the
    20	// tag is stored in a circular buffer tags, running in parallel.
    21	// In the circular buffer data, each event takes 2+hdrsize+len(stk)
    22	// words: the value 2+hdrsize+len(stk), then the time of the event, then
    23	// hdrsize words giving the fixed-size header, and then len(stk) words
    24	// for the stack.
    25	//
    26	// The current effective offsets into the tags and data circular buffers
    27	// for reading and writing are stored in the high 30 and low 32 bits of r and w.
    28	// The bottom bits of the high 32 are additional flag bits in w, unused in r.
    29	// "Effective" offsets means the total number of reads or writes, mod 2^length.
    30	// The offset in the buffer is the effective offset mod the length of the buffer.
    31	// To make wraparound mod 2^length match wraparound mod length of the buffer,
    32	// the length of the buffer must be a power of two.
    33	//
    34	// If the reader catches up to the writer, a flag passed to read controls
    35	// whether the read blocks until more data is available. A read returns a
    36	// pointer to the buffer data itself; the caller is assumed to be done with
    37	// that data at the next read. The read offset rNext tracks the next offset to
    38	// be returned by read. By definition, r ≤ rNext ≤ w (before wraparound),
    39	// and rNext is only used by the reader, so it can be accessed without atomics.
    40	//
    41	// If the writer gets ahead of the reader, so that the buffer fills,
    42	// future writes are discarded and replaced in the output stream by an
    43	// overflow entry, which has size 2+hdrsize+1, time set to the time of
    44	// the first discarded write, a header of all zeroed words, and a "stack"
    45	// containing one word, the number of discarded writes.
    46	//
    47	// Between the time the buffer fills and the buffer becomes empty enough
    48	// to hold more data, the overflow entry is stored as a pending overflow
    49	// entry in the fields overflow and overflowTime. The pending overflow
    50	// entry can be turned into a real record by either the writer or the
    51	// reader. If the writer is called to write a new record and finds that
    52	// the output buffer has room for both the pending overflow entry and the
    53	// new record, the writer emits the pending overflow entry and the new
    54	// record into the buffer. If the reader is called to read data and finds
    55	// that the output buffer is empty but that there is a pending overflow
    56	// entry, the reader will return a synthesized record for the pending
    57	// overflow entry.
    58	//
    59	// Only the writer can create or add to a pending overflow entry, but
    60	// either the reader or the writer can clear the pending overflow entry.
    61	// A pending overflow entry is indicated by the low 32 bits of 'overflow'
    62	// holding the number of discarded writes, and overflowTime holding the
    63	// time of the first discarded write. The high 32 bits of 'overflow'
    64	// increment each time the low 32 bits transition from zero to non-zero
    65	// or vice versa. This sequence number avoids ABA problems in the use of
    66	// compare-and-swap to coordinate between reader and writer.
    67	// The overflowTime is only written when the low 32 bits of overflow are
    68	// zero, that is, only when there is no pending overflow entry, in
    69	// preparation for creating a new one. The reader can therefore fetch and
    70	// clear the entry atomically using
    71	//
    72	//	for {
    73	//		overflow = load(&b.overflow)
    74	//		if uint32(overflow) == 0 {
    75	//			// no pending entry
    76	//			break
    77	//		}
    78	//		time = load(&b.overflowTime)
    79	//		if cas(&b.overflow, overflow, ((overflow>>32)+1)<<32) {
    80	//			// pending entry cleared
    81	//			break
    82	//		}
    83	//	}
    84	//	if uint32(overflow) > 0 {
    85	//		emit entry for uint32(overflow), time
    86	//	}
    87	//
    88	type profBuf struct {
    89		// accessed atomically
    90		r, w         profAtomic
    91		overflow     uint64
    92		overflowTime uint64
    93		eof          uint32
    94	
    95		// immutable (excluding slice content)
    96		hdrsize uintptr
    97		data    []uint64
    98		tags    []unsafe.Pointer
    99	
   100		// owned by reader
   101		rNext       profIndex
   102		overflowBuf []uint64 // for use by reader to return overflow record
   103		wait        note
   104	}
   105	
   106	// A profAtomic is the atomically-accessed word holding a profIndex.
   107	type profAtomic uint64
   108	
   109	// A profIndex is the packet tag and data counts and flags bits, described above.
   110	type profIndex uint64
   111	
   112	const (
   113		profReaderSleeping profIndex = 1 << 32 // reader is sleeping and must be woken up
   114		profWriteExtra     profIndex = 1 << 33 // overflow or eof waiting
   115	)
   116	
   117	func (x *profAtomic) load() profIndex {
   118		return profIndex(atomic.Load64((*uint64)(x)))
   119	}
   120	
   121	func (x *profAtomic) store(new profIndex) {
   122		atomic.Store64((*uint64)(x), uint64(new))
   123	}
   124	
   125	func (x *profAtomic) cas(old, new profIndex) bool {
   126		return atomic.Cas64((*uint64)(x), uint64(old), uint64(new))
   127	}
   128	
   129	func (x profIndex) dataCount() uint32 {
   130		return uint32(x)
   131	}
   132	
   133	func (x profIndex) tagCount() uint32 {
   134		return uint32(x >> 34)
   135	}
   136	
   137	// countSub subtracts two counts obtained from profIndex.dataCount or profIndex.tagCount,
   138	// assuming that they are no more than 2^29 apart (guaranteed since they are never more than
   139	// len(data) or len(tags) apart, respectively).
   140	// tagCount wraps at 2^30, while dataCount wraps at 2^32.
   141	// This function works for both.
   142	func countSub(x, y uint32) int {
   143		// x-y is 32-bit signed or 30-bit signed; sign-extend to 32 bits and convert to int.
   144		return int(int32(x-y) << 2 >> 2)
   145	}
   146	
   147	// addCountsAndClearFlags returns the packed form of "x + (data, tag) - all flags".
   148	func (x profIndex) addCountsAndClearFlags(data, tag int) profIndex {
   149		return profIndex((uint64(x)>>34+uint64(uint32(tag)<<2>>2))<<34 | uint64(uint32(x)+uint32(data)))
   150	}
   151	
   152	// hasOverflow reports whether b has any overflow records pending.
   153	func (b *profBuf) hasOverflow() bool {
   154		return uint32(atomic.Load64(&b.overflow)) > 0
   155	}
   156	
   157	// takeOverflow consumes the pending overflow records, returning the overflow count
   158	// and the time of the first overflow.
   159	// When called by the reader, it is racing against incrementOverflow.
   160	func (b *profBuf) takeOverflow() (count uint32, time uint64) {
   161		overflow := atomic.Load64(&b.overflow)
   162		time = atomic.Load64(&b.overflowTime)
   163		for {
   164			count = uint32(overflow)
   165			if count == 0 {
   166				time = 0
   167				break
   168			}
   169			// Increment generation, clear overflow count in low bits.
   170			if atomic.Cas64(&b.overflow, overflow, ((overflow>>32)+1)<<32) {
   171				break
   172			}
   173			overflow = atomic.Load64(&b.overflow)
   174			time = atomic.Load64(&b.overflowTime)
   175		}
   176		return uint32(overflow), time
   177	}
   178	
   179	// incrementOverflow records a single overflow at time now.
   180	// It is racing against a possible takeOverflow in the reader.
   181	func (b *profBuf) incrementOverflow(now int64) {
   182		for {
   183			overflow := atomic.Load64(&b.overflow)
   184	
   185			// Once we see b.overflow reach 0, it's stable: no one else is changing it underfoot.
   186			// We need to set overflowTime if we're incrementing b.overflow from 0.
   187			if uint32(overflow) == 0 {
   188				// Store overflowTime first so it's always available when overflow != 0.
   189				atomic.Store64(&b.overflowTime, uint64(now))
   190				atomic.Store64(&b.overflow, (((overflow>>32)+1)<<32)+1)
   191				break
   192			}
   193			// Otherwise we're racing to increment against reader
   194			// who wants to set b.overflow to 0.
   195			// Out of paranoia, leave 2³²-1 a sticky overflow value,
   196			// to avoid wrapping around. Extremely unlikely.
   197			if int32(overflow) == -1 {
   198				break
   199			}
   200			if atomic.Cas64(&b.overflow, overflow, overflow+1) {
   201				break
   202			}
   203		}
   204	}
   205	
   206	// newProfBuf returns a new profiling buffer with room for
   207	// a header of hdrsize words and a buffer of at least bufwords words.
   208	func newProfBuf(hdrsize, bufwords, tags int) *profBuf {
   209		if min := 2 + hdrsize + 1; bufwords < min {
   210			bufwords = min
   211		}
   212	
   213		// Buffer sizes must be power of two, so that we don't have to
   214		// worry about uint32 wraparound changing the effective position
   215		// within the buffers. We store 30 bits of count; limiting to 28
   216		// gives us some room for intermediate calculations.
   217		if bufwords >= 1<<28 || tags >= 1<<28 {
   218			throw("newProfBuf: buffer too large")
   219		}
   220		var i int
   221		for i = 1; i < bufwords; i <<= 1 {
   222		}
   223		bufwords = i
   224		for i = 1; i < tags; i <<= 1 {
   225		}
   226		tags = i
   227	
   228		b := new(profBuf)
   229		b.hdrsize = uintptr(hdrsize)
   230		b.data = make([]uint64, bufwords)
   231		b.tags = make([]unsafe.Pointer, tags)
   232		b.overflowBuf = make([]uint64, 2+b.hdrsize+1)
   233		return b
   234	}
   235	
   236	// canWriteRecord reports whether the buffer has room
   237	// for a single contiguous record with a stack of length nstk.
   238	func (b *profBuf) canWriteRecord(nstk int) bool {
   239		br := b.r.load()
   240		bw := b.w.load()
   241	
   242		// room for tag?
   243		if countSub(br.tagCount(), bw.tagCount())+len(b.tags) < 1 {
   244			return false
   245		}
   246	
   247		// room for data?
   248		nd := countSub(br.dataCount(), bw.dataCount()) + len(b.data)
   249		want := 2 + int(b.hdrsize) + nstk
   250		i := int(bw.dataCount() % uint32(len(b.data)))
   251		if i+want > len(b.data) {
   252			// Can't fit in trailing fragment of slice.
   253			// Skip over that and start over at beginning of slice.
   254			nd -= len(b.data) - i
   255		}
   256		return nd >= want
   257	}
   258	
   259	// canWriteTwoRecords reports whether the buffer has room
   260	// for two records with stack lengths nstk1, nstk2, in that order.
   261	// Each record must be contiguous on its own, but the two
   262	// records need not be contiguous (one can be at the end of the buffer
   263	// and the other can wrap around and start at the beginning of the buffer).
   264	func (b *profBuf) canWriteTwoRecords(nstk1, nstk2 int) bool {
   265		br := b.r.load()
   266		bw := b.w.load()
   267	
   268		// room for tag?
   269		if countSub(br.tagCount(), bw.tagCount())+len(b.tags) < 2 {
   270			return false
   271		}
   272	
   273		// room for data?
   274		nd := countSub(br.dataCount(), bw.dataCount()) + len(b.data)
   275	
   276		// first record
   277		want := 2 + int(b.hdrsize) + nstk1
   278		i := int(bw.dataCount() % uint32(len(b.data)))
   279		if i+want > len(b.data) {
   280			// Can't fit in trailing fragment of slice.
   281			// Skip over that and start over at beginning of slice.
   282			nd -= len(b.data) - i
   283			i = 0
   284		}
   285		i += want
   286		nd -= want
   287	
   288		// second record
   289		want = 2 + int(b.hdrsize) + nstk2
   290		if i+want > len(b.data) {
   291			// Can't fit in trailing fragment of slice.
   292			// Skip over that and start over at beginning of slice.
   293			nd -= len(b.data) - i
   294			i = 0
   295		}
   296		return nd >= want
   297	}
   298	
   299	// write writes an entry to the profiling buffer b.
   300	// The entry begins with a fixed hdr, which must have
   301	// length b.hdrsize, followed by a variable-sized stack
   302	// and a single tag pointer *tagPtr (or nil if tagPtr is nil).
   303	// No write barriers allowed because this might be called from a signal handler.
   304	func (b *profBuf) write(tagPtr *unsafe.Pointer, now int64, hdr []uint64, stk []uintptr) {
   305		if b == nil {
   306			return
   307		}
   308		if len(hdr) > int(b.hdrsize) {
   309			throw("misuse of profBuf.write")
   310		}
   311	
   312		if hasOverflow := b.hasOverflow(); hasOverflow && b.canWriteTwoRecords(1, len(stk)) {
   313			// Room for both an overflow record and the one being written.
   314			// Write the overflow record if the reader hasn't gotten to it yet.
   315			// Only racing against reader, not other writers.
   316			count, time := b.takeOverflow()
   317			if count > 0 {
   318				var stk [1]uintptr
   319				stk[0] = uintptr(count)
   320				b.write(nil, int64(time), nil, stk[:])
   321			}
   322		} else if hasOverflow || !b.canWriteRecord(len(stk)) {
   323			// Pending overflow without room to write overflow and new records
   324			// or no overflow but also no room for new record.
   325			b.incrementOverflow(now)
   326			b.wakeupExtra()
   327			return
   328		}
   329	
   330		// There's room: write the record.
   331		br := b.r.load()
   332		bw := b.w.load()
   333	
   334		// Profiling tag
   335		//
   336		// The tag is a pointer, but we can't run a write barrier here.
   337		// We have interrupted the OS-level execution of gp, but the
   338		// runtime still sees gp as executing. In effect, we are running
   339		// in place of the real gp. Since gp is the only goroutine that
   340		// can overwrite gp.labels, the value of gp.labels is stable during
   341		// this signal handler: it will still be reachable from gp when
   342		// we finish executing. If a GC is in progress right now, it must
   343		// keep gp.labels alive, because gp.labels is reachable from gp.
   344		// If gp were to overwrite gp.labels, the deletion barrier would
   345		// still shade that pointer, which would preserve it for the
   346		// in-progress GC, so all is well. Any future GC will see the
   347		// value we copied when scanning b.tags (heap-allocated).
   348		// We arrange that the store here is always overwriting a nil,
   349		// so there is no need for a deletion barrier on b.tags[wt].
   350		wt := int(bw.tagCount() % uint32(len(b.tags)))
   351		if tagPtr != nil {
   352			*(*uintptr)(unsafe.Pointer(&b.tags[wt])) = uintptr(unsafe.Pointer(*tagPtr))
   353		}
   354	
   355		// Main record.
   356		// It has to fit in a contiguous section of the slice, so if it doesn't fit at the end,
   357		// leave a rewind marker (0) and start over at the beginning of the slice.
   358		wd := int(bw.dataCount() % uint32(len(b.data)))
   359		nd := countSub(br.dataCount(), bw.dataCount()) + len(b.data)
   360		skip := 0
   361		if wd+2+int(b.hdrsize)+len(stk) > len(b.data) {
   362			b.data[wd] = 0
   363			skip = len(b.data) - wd
   364			nd -= skip
   365			wd = 0
   366		}
   367		data := b.data[wd:]
   368		data[0] = uint64(2 + b.hdrsize + uintptr(len(stk))) // length
   369		data[1] = uint64(now)                               // time stamp
   370		// header, zero-padded
   371		i := uintptr(copy(data[2:2+b.hdrsize], hdr))
   372		for ; i < b.hdrsize; i++ {
   373			data[2+i] = 0
   374		}
   375		for i, pc := range stk {
   376			data[2+b.hdrsize+uintptr(i)] = uint64(pc)
   377		}
   378	
   379		for {
   380			// Commit write.
   381			// Racing with reader setting flag bits in b.w, to avoid lost wakeups.
   382			old := b.w.load()
   383			new := old.addCountsAndClearFlags(skip+2+len(stk)+int(b.hdrsize), 1)
   384			if !b.w.cas(old, new) {
   385				continue
   386			}
   387			// If there was a reader, wake it up.
   388			if old&profReaderSleeping != 0 {
   389				notewakeup(&b.wait)
   390			}
   391			break
   392		}
   393	}
   394	
   395	// close signals that there will be no more writes on the buffer.
   396	// Once all the data has been read from the buffer, reads will return eof=true.
   397	func (b *profBuf) close() {
   398		if atomic.Load(&b.eof) > 0 {
   399			throw("runtime: profBuf already closed")
   400		}
   401		atomic.Store(&b.eof, 1)
   402		b.wakeupExtra()
   403	}
   404	
   405	// wakeupExtra must be called after setting one of the "extra"
   406	// atomic fields b.overflow or b.eof.
   407	// It records the change in b.w and wakes up the reader if needed.
   408	func (b *profBuf) wakeupExtra() {
   409		for {
   410			old := b.w.load()
   411			new := old | profWriteExtra
   412			if !b.w.cas(old, new) {
   413				continue
   414			}
   415			if old&profReaderSleeping != 0 {
   416				notewakeup(&b.wait)
   417			}
   418			break
   419		}
   420	}
   421	
   422	// profBufReadMode specifies whether to block when no data is available to read.
   423	type profBufReadMode int
   424	
   425	const (
   426		profBufBlocking profBufReadMode = iota
   427		profBufNonBlocking
   428	)
   429	
   430	var overflowTag [1]unsafe.Pointer // always nil
   431	
   432	func (b *profBuf) read(mode profBufReadMode) (data []uint64, tags []unsafe.Pointer, eof bool) {
   433		if b == nil {
   434			return nil, nil, true
   435		}
   436	
   437		br := b.rNext
   438	
   439		// Commit previous read, returning that part of the ring to the writer.
   440		// First clear tags that have now been read, both to avoid holding
   441		// up the memory they point at for longer than necessary
   442		// and so that b.write can assume it is always overwriting
   443		// nil tag entries (see comment in b.write).
   444		rPrev := b.r.load()
   445		if rPrev != br {
   446			ntag := countSub(br.tagCount(), rPrev.tagCount())
   447			ti := int(rPrev.tagCount() % uint32(len(b.tags)))
   448			for i := 0; i < ntag; i++ {
   449				b.tags[ti] = nil
   450				if ti++; ti == len(b.tags) {
   451					ti = 0
   452				}
   453			}
   454			b.r.store(br)
   455		}
   456	
   457	Read:
   458		bw := b.w.load()
   459		numData := countSub(bw.dataCount(), br.dataCount())
   460		if numData == 0 {
   461			if b.hasOverflow() {
   462				// No data to read, but there is overflow to report.
   463				// Racing with writer flushing b.overflow into a real record.
   464				count, time := b.takeOverflow()
   465				if count == 0 {
   466					// Lost the race, go around again.
   467					goto Read
   468				}
   469				// Won the race, report overflow.
   470				dst := b.overflowBuf
   471				dst[0] = uint64(2 + b.hdrsize + 1)
   472				dst[1] = uint64(time)
   473				for i := uintptr(0); i < b.hdrsize; i++ {
   474					dst[2+i] = 0
   475				}
   476				dst[2+b.hdrsize] = uint64(count)
   477				return dst[:2+b.hdrsize+1], overflowTag[:1], false
   478			}
   479			if atomic.Load(&b.eof) > 0 {
   480				// No data, no overflow, EOF set: done.
   481				return nil, nil, true
   482			}
   483			if bw&profWriteExtra != 0 {
   484				// Writer claims to have published extra information (overflow or eof).
   485				// Attempt to clear notification and then check again.
   486				// If we fail to clear the notification it means b.w changed,
   487				// so we still need to check again.
   488				b.w.cas(bw, bw&^profWriteExtra)
   489				goto Read
   490			}
   491	
   492			// Nothing to read right now.
   493			// Return or sleep according to mode.
   494			if mode == profBufNonBlocking {
   495				return nil, nil, false
   496			}
   497			if !b.w.cas(bw, bw|profReaderSleeping) {
   498				goto Read
   499			}
   500			// Committed to sleeping.
   501			notetsleepg(&b.wait, -1)
   502			noteclear(&b.wait)
   503			goto Read
   504		}
   505		data = b.data[br.dataCount()%uint32(len(b.data)):]
   506		if len(data) > numData {
   507			data = data[:numData]
   508		} else {
   509			numData -= len(data) // available in case of wraparound
   510		}
   511		skip := 0
   512		if data[0] == 0 {
   513			// Wraparound record. Go back to the beginning of the ring.
   514			skip = len(data)
   515			data = b.data
   516			if len(data) > numData {
   517				data = data[:numData]
   518			}
   519		}
   520	
   521		ntag := countSub(bw.tagCount(), br.tagCount())
   522		if ntag == 0 {
   523			throw("runtime: malformed profBuf buffer - tag and data out of sync")
   524		}
   525		tags = b.tags[br.tagCount()%uint32(len(b.tags)):]
   526		if len(tags) > ntag {
   527			tags = tags[:ntag]
   528		}
   529	
   530		// Count out whole data records until either data or tags is done.
   531		// They are always in sync in the buffer, but due to an end-of-slice
   532		// wraparound we might need to stop early and return the rest
   533		// in the next call.
   534		di := 0
   535		ti := 0
   536		for di < len(data) && data[di] != 0 && ti < len(tags) {
   537			if uintptr(di)+uintptr(data[di]) > uintptr(len(data)) {
   538				throw("runtime: malformed profBuf buffer - invalid size")
   539			}
   540			di += int(data[di])
   541			ti++
   542		}
   543	
   544		// Remember how much we returned, to commit read on next call.
   545		b.rNext = br.addCountsAndClearFlags(skip+di, ti)
   546	
   547		if raceenabled {
   548			// Match racereleasemerge in runtime_setProfLabel,
   549			// so that the setting of the labels in runtime_setProfLabel
   550			// is treated as happening before any use of the labels
   551			// by our caller. The synchronization on labelSync itself is a fiction
   552			// for the race detector. The actual synchronization is handled
   553			// by the fact that the signal handler only reads from the current
   554			// goroutine and uses atomics to write the updated queue indices,
   555			// and then the read-out from the signal handler buffer uses
   556			// atomics to read those queue indices.
   557			raceacquire(unsafe.Pointer(&labelSync))
   558		}
   559	
   560		return data[:di], tags[:ti], false
   561	}
   562	

View as plain text