...

Source file src/pkg/cmd/go/internal/par/work.go

     1	// Copyright 2018 The Go Authors. All rights reserved.
     2	// Use of this source code is governed by a BSD-style
     3	// license that can be found in the LICENSE file.
     4	
     5	// Package par implements parallel execution helpers.
     6	package par
     7	
     8	import (
     9		"math/rand"
    10		"sync"
    11		"sync/atomic"
    12	)
    13	
    14	// Work manages a set of work items to be executed in parallel, at most once each.
    15	// The items in the set must all be valid map keys.
    16	type Work struct {
    17		f       func(interface{}) // function to run for each item
    18		running int               // total number of runners
    19	
    20		mu      sync.Mutex
    21		added   map[interface{}]bool // items added to set
    22		todo    []interface{}        // items yet to be run
    23		wait    sync.Cond            // wait when todo is empty
    24		waiting int                  // number of runners waiting for todo
    25	}
    26	
    27	func (w *Work) init() {
    28		if w.added == nil {
    29			w.added = make(map[interface{}]bool)
    30		}
    31	}
    32	
    33	// Add adds item to the work set, if it hasn't already been added.
    34	func (w *Work) Add(item interface{}) {
    35		w.mu.Lock()
    36		w.init()
    37		if !w.added[item] {
    38			w.added[item] = true
    39			w.todo = append(w.todo, item)
    40			if w.waiting > 0 {
    41				w.wait.Signal()
    42			}
    43		}
    44		w.mu.Unlock()
    45	}
    46	
    47	// Do runs f in parallel on items from the work set,
    48	// with at most n invocations of f running at a time.
    49	// It returns when everything added to the work set has been processed.
    50	// At least one item should have been added to the work set
    51	// before calling Do (or else Do returns immediately),
    52	// but it is allowed for f(item) to add new items to the set.
    53	// Do should only be used once on a given Work.
    54	func (w *Work) Do(n int, f func(item interface{})) {
    55		if n < 1 {
    56			panic("par.Work.Do: n < 1")
    57		}
    58		if w.running >= 1 {
    59			panic("par.Work.Do: already called Do")
    60		}
    61	
    62		w.running = n
    63		w.f = f
    64		w.wait.L = &w.mu
    65	
    66		for i := 0; i < n-1; i++ {
    67			go w.runner()
    68		}
    69		w.runner()
    70	}
    71	
    72	// runner executes work in w until both nothing is left to do
    73	// and all the runners are waiting for work.
    74	// (Then all the runners return.)
    75	func (w *Work) runner() {
    76		for {
    77			// Wait for something to do.
    78			w.mu.Lock()
    79			for len(w.todo) == 0 {
    80				w.waiting++
    81				if w.waiting == w.running {
    82					// All done.
    83					w.wait.Broadcast()
    84					w.mu.Unlock()
    85					return
    86				}
    87				w.wait.Wait()
    88				w.waiting--
    89			}
    90	
    91			// Pick something to do at random,
    92			// to eliminate pathological contention
    93			// in case items added at about the same time
    94			// are most likely to contend.
    95			i := rand.Intn(len(w.todo))
    96			item := w.todo[i]
    97			w.todo[i] = w.todo[len(w.todo)-1]
    98			w.todo = w.todo[:len(w.todo)-1]
    99			w.mu.Unlock()
   100	
   101			w.f(item)
   102		}
   103	}
   104	
   105	// Cache runs an action once per key and caches the result.
   106	type Cache struct {
   107		m sync.Map
   108	}
   109	
   110	type cacheEntry struct {
   111		done   uint32
   112		mu     sync.Mutex
   113		result interface{}
   114	}
   115	
   116	// Do calls the function f if and only if Do is being called for the first time with this key.
   117	// No call to Do with a given key returns until the one call to f returns.
   118	// Do returns the value returned by the one call to f.
   119	func (c *Cache) Do(key interface{}, f func() interface{}) interface{} {
   120		entryIface, ok := c.m.Load(key)
   121		if !ok {
   122			entryIface, _ = c.m.LoadOrStore(key, new(cacheEntry))
   123		}
   124		e := entryIface.(*cacheEntry)
   125		if atomic.LoadUint32(&e.done) == 0 {
   126			e.mu.Lock()
   127			if atomic.LoadUint32(&e.done) == 0 {
   128				e.result = f()
   129				atomic.StoreUint32(&e.done, 1)
   130			}
   131			e.mu.Unlock()
   132		}
   133		return e.result
   134	}
   135	
   136	// Get returns the cached result associated with key.
   137	// It returns nil if there is no such result.
   138	// If the result for key is being computed, Get does not wait for the computation to finish.
   139	func (c *Cache) Get(key interface{}) interface{} {
   140		entryIface, ok := c.m.Load(key)
   141		if !ok {
   142			return nil
   143		}
   144		e := entryIface.(*cacheEntry)
   145		if atomic.LoadUint32(&e.done) == 0 {
   146			return nil
   147		}
   148		return e.result
   149	}
   150	
   151	// Clear removes all entries in the cache.
   152	//
   153	// Concurrent calls to Get may return old values. Concurrent calls to Do
   154	// may return old values or store results in entries that have been deleted.
   155	//
   156	// TODO(jayconrod): Delete this after the package cache clearing functions
   157	// in internal/load have been removed.
   158	func (c *Cache) Clear() {
   159		c.m.Range(func(key, value interface{}) bool {
   160			c.m.Delete(key)
   161			return true
   162		})
   163	}
   164	
   165	// Delete removes an entry from the map. It is safe to call Delete for an
   166	// entry that does not exist. Delete will return quickly, even if the result
   167	// for a key is still being computed; the computation will finish, but the
   168	// result won't be accessible through the cache.
   169	//
   170	// TODO(jayconrod): Delete this after the package cache clearing functions
   171	// in internal/load have been removed.
   172	func (c *Cache) Delete(key interface{}) {
   173		c.m.Delete(key)
   174	}
   175	
   176	// DeleteIf calls pred for each key in the map. If pred returns true for a key,
   177	// DeleteIf removes the corresponding entry. If the result for a key is
   178	// still being computed, DeleteIf will remove the entry without waiting for
   179	// the computation to finish. The result won't be accessible through the cache.
   180	//
   181	// TODO(jayconrod): Delete this after the package cache clearing functions
   182	// in internal/load have been removed.
   183	func (c *Cache) DeleteIf(pred func(key interface{}) bool) {
   184		c.m.Range(func(key, _ interface{}) bool {
   185			if pred(key) {
   186				c.Delete(key)
   187			}
   188			return true
   189		})
   190	}
   191	

View as plain text