...

Source file src/pkg/sync/waitgroup.go

     1	// Copyright 2011 The Go Authors. All rights reserved.
     2	// Use of this source code is governed by a BSD-style
     3	// license that can be found in the LICENSE file.
     4	
     5	package sync
     6	
     7	import (
     8		"internal/race"
     9		"sync/atomic"
    10		"unsafe"
    11	)
    12	
    13	// A WaitGroup waits for a collection of goroutines to finish.
    14	// The main goroutine calls Add to set the number of
    15	// goroutines to wait for. Then each of the goroutines
    16	// runs and calls Done when finished. At the same time,
    17	// Wait can be used to block until all goroutines have finished.
    18	//
    19	// A WaitGroup must not be copied after first use.
    20	type WaitGroup struct {
    21		noCopy noCopy
    22	
    23		// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
    24		// 64-bit atomic operations require 64-bit alignment, but 32-bit
    25		// compilers do not ensure it. So we allocate 12 bytes and then use
    26		// the aligned 8 bytes in them as state, and the other 4 as storage
    27		// for the sema.
    28		state1 [3]uint32
    29	}
    30	
    31	// state returns pointers to the state and sema fields stored within wg.state1.
    32	func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
    33		if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
    34			return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
    35		} else {
    36			return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
    37		}
    38	}
    39	
    40	// Add adds delta, which may be negative, to the WaitGroup counter.
    41	// If the counter becomes zero, all goroutines blocked on Wait are released.
    42	// If the counter goes negative, Add panics.
    43	//
    44	// Note that calls with a positive delta that occur when the counter is zero
    45	// must happen before a Wait. Calls with a negative delta, or calls with a
    46	// positive delta that start when the counter is greater than zero, may happen
    47	// at any time.
    48	// Typically this means the calls to Add should execute before the statement
    49	// creating the goroutine or other event to be waited for.
    50	// If a WaitGroup is reused to wait for several independent sets of events,
    51	// new Add calls must happen after all previous Wait calls have returned.
    52	// See the WaitGroup example.
    53	func (wg *WaitGroup) Add(delta int) {
    54		statep, semap := wg.state()
    55		if race.Enabled {
    56			_ = *statep // trigger nil deref early
    57			if delta < 0 {
    58				// Synchronize decrements with Wait.
    59				race.ReleaseMerge(unsafe.Pointer(wg))
    60			}
    61			race.Disable()
    62			defer race.Enable()
    63		}
    64		state := atomic.AddUint64(statep, uint64(delta)<<32)
    65		v := int32(state >> 32)
    66		w := uint32(state)
    67		if race.Enabled && delta > 0 && v == int32(delta) {
    68			// The first increment must be synchronized with Wait.
    69			// Need to model this as a read, because there can be
    70			// several concurrent wg.counter transitions from 0.
    71			race.Read(unsafe.Pointer(semap))
    72		}
    73		if v < 0 {
    74			panic("sync: negative WaitGroup counter")
    75		}
    76		if w != 0 && delta > 0 && v == int32(delta) {
    77			panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    78		}
    79		if v > 0 || w == 0 {
    80			return
    81		}
    82		// This goroutine has set counter to 0 when waiters > 0.
    83		// Now there can't be concurrent mutations of state:
    84		// - Adds must not happen concurrently with Wait,
    85		// - Wait does not increment waiters if it sees counter == 0.
    86		// Still do a cheap sanity check to detect WaitGroup misuse.
    87		if *statep != state {
    88			panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    89		}
    90		// Reset waiters count to 0.
    91		*statep = 0
    92		for ; w != 0; w-- {
    93			runtime_Semrelease(semap, false, 0)
    94		}
    95	}
    96	
    97	// Done decrements the WaitGroup counter by one.
    98	func (wg *WaitGroup) Done() {
    99		wg.Add(-1)
   100	}
   101	
   102	// Wait blocks until the WaitGroup counter is zero.
   103	func (wg *WaitGroup) Wait() {
   104		statep, semap := wg.state()
   105		if race.Enabled {
   106			_ = *statep // trigger nil deref early
   107			race.Disable()
   108		}
   109		for {
   110			state := atomic.LoadUint64(statep)
   111			v := int32(state >> 32)
   112			w := uint32(state)
   113			if v == 0 {
   114				// Counter is 0, no need to wait.
   115				if race.Enabled {
   116					race.Enable()
   117					race.Acquire(unsafe.Pointer(wg))
   118				}
   119				return
   120			}
   121			// Increment waiters count.
   122			if atomic.CompareAndSwapUint64(statep, state, state+1) {
   123				if race.Enabled && w == 0 {
   124					// Wait must be synchronized with the first Add.
   125					// Need to model this is as a write to race with the read in Add.
   126					// As a consequence, can do the write only for the first waiter,
   127					// otherwise concurrent Waits will race with each other.
   128					race.Write(unsafe.Pointer(semap))
   129				}
   130				runtime_Semacquire(semap)
   131				if *statep != 0 {
   132					panic("sync: WaitGroup is reused before previous Wait has returned")
   133				}
   134				if race.Enabled {
   135					race.Enable()
   136					race.Acquire(unsafe.Pointer(wg))
   137				}
   138				return
   139			}
   140		}
   141	}
   142	

View as plain text