...

Source file src/sync/poolqueue.go

     1	// Copyright 2019 The Go Authors. All rights reserved.
     2	// Use of this source code is governed by a BSD-style
     3	// license that can be found in the LICENSE file.
     4	
     5	package sync
     6	
     7	import (
     8		"sync/atomic"
     9		"unsafe"
    10	)
    11	
    12	// poolDequeue is a lock-free fixed-size single-producer,
    13	// multi-consumer queue. The single producer can both push and pop
    14	// from the head, and consumers can pop from the tail.
    15	//
    16	// It has the added feature that it nils out unused slots to avoid
    17	// unnecessary retention of objects. This is important for sync.Pool,
    18	// but not typically a property considered in the literature.
    19	type poolDequeue struct {
    20		// headTail packs together a 32-bit head index and a 32-bit
    21		// tail index. Both are indexes into vals modulo len(vals)-1.
    22		//
    23		// tail = index of oldest data in queue
    24		// head = index of next slot to fill
    25		//
    26		// Slots in the range [tail, head) are owned by consumers.
    27		// A consumer continues to own a slot outside this range until
    28		// it nils the slot, at which point ownership passes to the
    29		// producer.
    30		//
    31		// The head index is stored in the most-significant bits so
    32		// that we can atomically add to it and the overflow is
    33		// harmless.
    34		headTail uint64
    35	
    36		// vals is a ring buffer of interface{} values stored in this
    37		// dequeue. The size of this must be a power of 2.
    38		//
    39		// vals[i].typ is nil if the slot is empty and non-nil
    40		// otherwise. A slot is still in use until *both* the tail
    41		// index has moved beyond it and typ has been set to nil. This
    42		// is set to nil atomically by the consumer and read
    43		// atomically by the producer.
    44		vals []eface
    45	}
    46	
    47	type eface struct {
    48		typ, val unsafe.Pointer
    49	}
    50	
    51	const dequeueBits = 32
    52	
    53	// dequeueLimit is the maximum size of a poolDequeue.
    54	//
    55	// This must be at most (1<<dequeueBits)/2 because detecting fullness
    56	// depends on wrapping around the ring buffer without wrapping around
    57	// the index. We divide by 4 so this fits in an int on 32-bit.
    58	const dequeueLimit = (1 << dequeueBits) / 4
    59	
    60	// dequeueNil is used in poolDeqeue to represent interface{}(nil).
    61	// Since we use nil to represent empty slots, we need a sentinel value
    62	// to represent nil.
    63	type dequeueNil *struct{}
    64	
    65	func (d *poolDequeue) unpack(ptrs uint64) (head, tail uint32) {
    66		const mask = 1<<dequeueBits - 1
    67		head = uint32((ptrs >> dequeueBits) & mask)
    68		tail = uint32(ptrs & mask)
    69		return
    70	}
    71	
    72	func (d *poolDequeue) pack(head, tail uint32) uint64 {
    73		const mask = 1<<dequeueBits - 1
    74		return (uint64(head) << dequeueBits) |
    75			uint64(tail&mask)
    76	}
    77	
    78	// pushHead adds val at the head of the queue. It returns false if the
    79	// queue is full. It must only be called by a single producer.
    80	func (d *poolDequeue) pushHead(val interface{}) bool {
    81		ptrs := atomic.LoadUint64(&d.headTail)
    82		head, tail := d.unpack(ptrs)
    83		if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
    84			// Queue is full.
    85			return false
    86		}
    87		slot := &d.vals[head&uint32(len(d.vals)-1)]
    88	
    89		// Check if the head slot has been released by popTail.
    90		typ := atomic.LoadPointer(&slot.typ)
    91		if typ != nil {
    92			// Another goroutine is still cleaning up the tail, so
    93			// the queue is actually still full.
    94			return false
    95		}
    96	
    97		// The head slot is free, so we own it.
    98		if val == nil {
    99			val = dequeueNil(nil)
   100		}
   101		*(*interface{})(unsafe.Pointer(slot)) = val
   102	
   103		// Increment head. This passes ownership of slot to popTail
   104		// and acts as a store barrier for writing the slot.
   105		atomic.AddUint64(&d.headTail, 1<<dequeueBits)
   106		return true
   107	}
   108	
   109	// popHead removes and returns the element at the head of the queue.
   110	// It returns false if the queue is empty. It must only be called by a
   111	// single producer.
   112	func (d *poolDequeue) popHead() (interface{}, bool) {
   113		var slot *eface
   114		for {
   115			ptrs := atomic.LoadUint64(&d.headTail)
   116			head, tail := d.unpack(ptrs)
   117			if tail == head {
   118				// Queue is empty.
   119				return nil, false
   120			}
   121	
   122			// Confirm tail and decrement head. We do this before
   123			// reading the value to take back ownership of this
   124			// slot.
   125			head--
   126			ptrs2 := d.pack(head, tail)
   127			if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
   128				// We successfully took back slot.
   129				slot = &d.vals[head&uint32(len(d.vals)-1)]
   130				break
   131			}
   132		}
   133	
   134		val := *(*interface{})(unsafe.Pointer(slot))
   135		if val == dequeueNil(nil) {
   136			val = nil
   137		}
   138		// Zero the slot. Unlike popTail, this isn't racing with
   139		// pushHead, so we don't need to be careful here.
   140		*slot = eface{}
   141		return val, true
   142	}
   143	
   144	// popTail removes and returns the element at the tail of the queue.
   145	// It returns false if the queue is empty. It may be called by any
   146	// number of consumers.
   147	func (d *poolDequeue) popTail() (interface{}, bool) {
   148		var slot *eface
   149		for {
   150			ptrs := atomic.LoadUint64(&d.headTail)
   151			head, tail := d.unpack(ptrs)
   152			if tail == head {
   153				// Queue is empty.
   154				return nil, false
   155			}
   156	
   157			// Confirm head and tail (for our speculative check
   158			// above) and increment tail. If this succeeds, then
   159			// we own the slot at tail.
   160			ptrs2 := d.pack(head, tail+1)
   161			if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
   162				// Success.
   163				slot = &d.vals[tail&uint32(len(d.vals)-1)]
   164				break
   165			}
   166		}
   167	
   168		// We now own slot.
   169		val := *(*interface{})(unsafe.Pointer(slot))
   170		if val == dequeueNil(nil) {
   171			val = nil
   172		}
   173	
   174		// Tell pushHead that we're done with this slot. Zeroing the
   175		// slot is also important so we don't leave behind references
   176		// that could keep this object live longer than necessary.
   177		//
   178		// We write to val first and then publish that we're done with
   179		// this slot by atomically writing to typ.
   180		slot.val = nil
   181		atomic.StorePointer(&slot.typ, nil)
   182		// At this point pushHead owns the slot.
   183	
   184		return val, true
   185	}
   186	
   187	// poolChain is a dynamically-sized version of poolDequeue.
   188	//
   189	// This is implemented as a doubly-linked list queue of poolDequeues
   190	// where each dequeue is double the size of the previous one. Once a
   191	// dequeue fills up, this allocates a new one and only ever pushes to
   192	// the latest dequeue. Pops happen from the other end of the list and
   193	// once a dequeue is exhausted, it gets removed from the list.
   194	type poolChain struct {
   195		// head is the poolDequeue to push to. This is only accessed
   196		// by the producer, so doesn't need to be synchronized.
   197		head *poolChainElt
   198	
   199		// tail is the poolDequeue to popTail from. This is accessed
   200		// by consumers, so reads and writes must be atomic.
   201		tail *poolChainElt
   202	}
   203	
   204	type poolChainElt struct {
   205		poolDequeue
   206	
   207		// next and prev link to the adjacent poolChainElts in this
   208		// poolChain.
   209		//
   210		// next is written atomically by the producer and read
   211		// atomically by the consumer. It only transitions from nil to
   212		// non-nil.
   213		//
   214		// prev is written atomically by the consumer and read
   215		// atomically by the producer. It only transitions from
   216		// non-nil to nil.
   217		next, prev *poolChainElt
   218	}
   219	
   220	func storePoolChainElt(pp **poolChainElt, v *poolChainElt) {
   221		atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(pp)), unsafe.Pointer(v))
   222	}
   223	
   224	func loadPoolChainElt(pp **poolChainElt) *poolChainElt {
   225		return (*poolChainElt)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(pp))))
   226	}
   227	
   228	func (c *poolChain) pushHead(val interface{}) {
   229		d := c.head
   230		if d == nil {
   231			// Initialize the chain.
   232			const initSize = 8 // Must be a power of 2
   233			d = new(poolChainElt)
   234			d.vals = make([]eface, initSize)
   235			c.head = d
   236			storePoolChainElt(&c.tail, d)
   237		}
   238	
   239		if d.pushHead(val) {
   240			return
   241		}
   242	
   243		// The current dequeue is full. Allocate a new one of twice
   244		// the size.
   245		newSize := len(d.vals) * 2
   246		if newSize >= dequeueLimit {
   247			// Can't make it any bigger.
   248			newSize = dequeueLimit
   249		}
   250	
   251		d2 := &poolChainElt{prev: d}
   252		d2.vals = make([]eface, newSize)
   253		c.head = d2
   254		storePoolChainElt(&d.next, d2)
   255		d2.pushHead(val)
   256	}
   257	
   258	func (c *poolChain) popHead() (interface{}, bool) {
   259		d := c.head
   260		for d != nil {
   261			if val, ok := d.popHead(); ok {
   262				return val, ok
   263			}
   264			// There may still be unconsumed elements in the
   265			// previous dequeue, so try backing up.
   266			d = loadPoolChainElt(&d.prev)
   267		}
   268		return nil, false
   269	}
   270	
   271	func (c *poolChain) popTail() (interface{}, bool) {
   272		d := loadPoolChainElt(&c.tail)
   273		if d == nil {
   274			return nil, false
   275		}
   276	
   277		for {
   278			// It's important that we load the next pointer
   279			// *before* popping the tail. In general, d may be
   280			// transiently empty, but if next is non-nil before
   281			// the pop and the pop fails, then d is permanently
   282			// empty, which is the only condition under which it's
   283			// safe to drop d from the chain.
   284			d2 := loadPoolChainElt(&d.next)
   285	
   286			if val, ok := d.popTail(); ok {
   287				return val, ok
   288			}
   289	
   290			if d2 == nil {
   291				// This is the only dequeue. It's empty right
   292				// now, but could be pushed to in the future.
   293				return nil, false
   294			}
   295	
   296			// The tail of the chain has been drained, so move on
   297			// to the next dequeue. Try to drop it from the chain
   298			// so the next pop doesn't have to look at the empty
   299			// dequeue again.
   300			if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
   301				// We won the race. Clear the prev pointer so
   302				// the garbage collector can collect the empty
   303				// dequeue and so popHead doesn't back up
   304				// further than necessary.
   305				storePoolChainElt(&d2.prev, nil)
   306			}
   307			d = d2
   308		}
   309	}
   310	

View as plain text