...

Source file src/pkg/sync/pool.go

     1	// Copyright 2013 The Go Authors. All rights reserved.
     2	// Use of this source code is governed by a BSD-style
     3	// license that can be found in the LICENSE file.
     4	
     5	package sync
     6	
     7	import (
     8		"internal/race"
     9		"runtime"
    10		"sync/atomic"
    11		"unsafe"
    12	)
    13	
    14	// A Pool is a set of temporary objects that may be individually saved and
    15	// retrieved.
    16	//
    17	// Any item stored in the Pool may be removed automatically at any time without
    18	// notification. If the Pool holds the only reference when this happens, the
    19	// item might be deallocated.
    20	//
    21	// A Pool is safe for use by multiple goroutines simultaneously.
    22	//
    23	// Pool's purpose is to cache allocated but unused items for later reuse,
    24	// relieving pressure on the garbage collector. That is, it makes it easy to
    25	// build efficient, thread-safe free lists. However, it is not suitable for all
    26	// free lists.
    27	//
    28	// An appropriate use of a Pool is to manage a group of temporary items
    29	// silently shared among and potentially reused by concurrent independent
    30	// clients of a package. Pool provides a way to amortize allocation overhead
    31	// across many clients.
    32	//
    33	// An example of good use of a Pool is in the fmt package, which maintains a
    34	// dynamically-sized store of temporary output buffers. The store scales under
    35	// load (when many goroutines are actively printing) and shrinks when
    36	// quiescent.
    37	//
    38	// On the other hand, a free list maintained as part of a short-lived object is
    39	// not a suitable use for a Pool, since the overhead does not amortize well in
    40	// that scenario. It is more efficient to have such objects implement their own
    41	// free list.
    42	//
    43	// A Pool must not be copied after first use.
    44	type Pool struct {
    45		noCopy noCopy
    46	
    47		local     unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
    48		localSize uintptr        // size of the local array
    49	
    50		victim     unsafe.Pointer // local from previous cycle
    51		victimSize uintptr        // size of victims array
    52	
    53		// New optionally specifies a function to generate
    54		// a value when Get would otherwise return nil.
    55		// It may not be changed concurrently with calls to Get.
    56		New func() interface{}
    57	}
    58	
    59	// Local per-P Pool appendix.
    60	type poolLocalInternal struct {
    61		private interface{} // Can be used only by the respective P.
    62		shared  poolChain   // Local P can pushHead/popHead; any P can popTail.
    63	}
    64	
    65	type poolLocal struct {
    66		poolLocalInternal
    67	
    68		// Prevents false sharing on widespread platforms with
    69		// 128 mod (cache line size) = 0 .
    70		pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
    71	}
    72	
    73	// from runtime
    74	func fastrand() uint32
    75	
    76	var poolRaceHash [128]uint64
    77	
    78	// poolRaceAddr returns an address to use as the synchronization point
    79	// for race detector logic. We don't use the actual pointer stored in x
    80	// directly, for fear of conflicting with other synchronization on that address.
    81	// Instead, we hash the pointer to get an index into poolRaceHash.
    82	// See discussion on golang.org/cl/31589.
    83	func poolRaceAddr(x interface{}) unsafe.Pointer {
    84		ptr := uintptr((*[2]unsafe.Pointer)(unsafe.Pointer(&x))[1])
    85		h := uint32((uint64(uint32(ptr)) * 0x85ebca6b) >> 16)
    86		return unsafe.Pointer(&poolRaceHash[h%uint32(len(poolRaceHash))])
    87	}
    88	
    89	// Put adds x to the pool.
    90	func (p *Pool) Put(x interface{}) {
    91		if x == nil {
    92			return
    93		}
    94		if race.Enabled {
    95			if fastrand()%4 == 0 {
    96				// Randomly drop x on floor.
    97				return
    98			}
    99			race.ReleaseMerge(poolRaceAddr(x))
   100			race.Disable()
   101		}
   102		l, _ := p.pin()
   103		if l.private == nil {
   104			l.private = x
   105			x = nil
   106		}
   107		if x != nil {
   108			l.shared.pushHead(x)
   109		}
   110		runtime_procUnpin()
   111		if race.Enabled {
   112			race.Enable()
   113		}
   114	}
   115	
   116	// Get selects an arbitrary item from the Pool, removes it from the
   117	// Pool, and returns it to the caller.
   118	// Get may choose to ignore the pool and treat it as empty.
   119	// Callers should not assume any relation between values passed to Put and
   120	// the values returned by Get.
   121	//
   122	// If Get would otherwise return nil and p.New is non-nil, Get returns
   123	// the result of calling p.New.
   124	func (p *Pool) Get() interface{} {
   125		if race.Enabled {
   126			race.Disable()
   127		}
   128		l, pid := p.pin()
   129		x := l.private
   130		l.private = nil
   131		if x == nil {
   132			// Try to pop the head of the local shard. We prefer
   133			// the head over the tail for temporal locality of
   134			// reuse.
   135			x, _ = l.shared.popHead()
   136			if x == nil {
   137				x = p.getSlow(pid)
   138			}
   139		}
   140		runtime_procUnpin()
   141		if race.Enabled {
   142			race.Enable()
   143			if x != nil {
   144				race.Acquire(poolRaceAddr(x))
   145			}
   146		}
   147		if x == nil && p.New != nil {
   148			x = p.New()
   149		}
   150		return x
   151	}
   152	
   153	func (p *Pool) getSlow(pid int) interface{} {
   154		// See the comment in pin regarding ordering of the loads.
   155		size := atomic.LoadUintptr(&p.localSize) // load-acquire
   156		locals := p.local                        // load-consume
   157		// Try to steal one element from other procs.
   158		for i := 0; i < int(size); i++ {
   159			l := indexLocal(locals, (pid+i+1)%int(size))
   160			if x, _ := l.shared.popTail(); x != nil {
   161				return x
   162			}
   163		}
   164	
   165		// Try the victim cache. We do this after attempting to steal
   166		// from all primary caches because we want objects in the
   167		// victim cache to age out if at all possible.
   168		size = atomic.LoadUintptr(&p.victimSize)
   169		if uintptr(pid) >= size {
   170			return nil
   171		}
   172		locals = p.victim
   173		l := indexLocal(locals, pid)
   174		if x := l.private; x != nil {
   175			l.private = nil
   176			return x
   177		}
   178		for i := 0; i < int(size); i++ {
   179			l := indexLocal(locals, (pid+i)%int(size))
   180			if x, _ := l.shared.popTail(); x != nil {
   181				return x
   182			}
   183		}
   184	
   185		// Mark the victim cache as empty for future gets don't bother
   186		// with it.
   187		atomic.StoreUintptr(&p.victimSize, 0)
   188	
   189		return nil
   190	}
   191	
   192	// pin pins the current goroutine to P, disables preemption and
   193	// returns poolLocal pool for the P and the P's id.
   194	// Caller must call runtime_procUnpin() when done with the pool.
   195	func (p *Pool) pin() (*poolLocal, int) {
   196		pid := runtime_procPin()
   197		// In pinSlow we store to local and then to localSize, here we load in opposite order.
   198		// Since we've disabled preemption, GC cannot happen in between.
   199		// Thus here we must observe local at least as large localSize.
   200		// We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness).
   201		s := atomic.LoadUintptr(&p.localSize) // load-acquire
   202		l := p.local                          // load-consume
   203		if uintptr(pid) < s {
   204			return indexLocal(l, pid), pid
   205		}
   206		return p.pinSlow()
   207	}
   208	
   209	func (p *Pool) pinSlow() (*poolLocal, int) {
   210		// Retry under the mutex.
   211		// Can not lock the mutex while pinned.
   212		runtime_procUnpin()
   213		allPoolsMu.Lock()
   214		defer allPoolsMu.Unlock()
   215		pid := runtime_procPin()
   216		// poolCleanup won't be called while we are pinned.
   217		s := p.localSize
   218		l := p.local
   219		if uintptr(pid) < s {
   220			return indexLocal(l, pid), pid
   221		}
   222		if p.local == nil {
   223			allPools = append(allPools, p)
   224		}
   225		// If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
   226		size := runtime.GOMAXPROCS(0)
   227		local := make([]poolLocal, size)
   228		atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
   229		atomic.StoreUintptr(&p.localSize, uintptr(size))         // store-release
   230		return &local[pid], pid
   231	}
   232	
   233	func poolCleanup() {
   234		// This function is called with the world stopped, at the beginning of a garbage collection.
   235		// It must not allocate and probably should not call any runtime functions.
   236	
   237		// Because the world is stopped, no pool user can be in a
   238		// pinned section (in effect, this has all Ps pinned).
   239	
   240		// Drop victim caches from all pools.
   241		for _, p := range oldPools {
   242			p.victim = nil
   243			p.victimSize = 0
   244		}
   245	
   246		// Move primary cache to victim cache.
   247		for _, p := range allPools {
   248			p.victim = p.local
   249			p.victimSize = p.localSize
   250			p.local = nil
   251			p.localSize = 0
   252		}
   253	
   254		// The pools with non-empty primary caches now have non-empty
   255		// victim caches and no pools have primary caches.
   256		oldPools, allPools = allPools, nil
   257	}
   258	
   259	var (
   260		allPoolsMu Mutex
   261	
   262		// allPools is the set of pools that have non-empty primary
   263		// caches. Protected by either 1) allPoolsMu and pinning or 2)
   264		// STW.
   265		allPools []*Pool
   266	
   267		// oldPools is the set of pools that may have non-empty victim
   268		// caches. Protected by STW.
   269		oldPools []*Pool
   270	)
   271	
   272	func init() {
   273		runtime_registerPoolCleanup(poolCleanup)
   274	}
   275	
   276	func indexLocal(l unsafe.Pointer, i int) *poolLocal {
   277		lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
   278		return (*poolLocal)(lp)
   279	}
   280	
   281	// Implemented in runtime.
   282	func runtime_registerPoolCleanup(cleanup func())
   283	func runtime_procPin() int
   284	func runtime_procUnpin()
   285	

View as plain text