...

Source file src/runtime/sema.go

     1	// Copyright 2009 The Go Authors. All rights reserved.
     2	// Use of this source code is governed by a BSD-style
     3	// license that can be found in the LICENSE file.
     4	
     5	// Semaphore implementation exposed to Go.
     6	// Intended use is provide a sleep and wakeup
     7	// primitive that can be used in the contended case
     8	// of other synchronization primitives.
     9	// Thus it targets the same goal as Linux's futex,
    10	// but it has much simpler semantics.
    11	//
    12	// That is, don't think of these as semaphores.
    13	// Think of them as a way to implement sleep and wakeup
    14	// such that every sleep is paired with a single wakeup,
    15	// even if, due to races, the wakeup happens before the sleep.
    16	//
    17	// See Mullender and Cox, ``Semaphores in Plan 9,''
    18	// https://swtch.com/semaphore.pdf
    19	
    20	package runtime
    21	
    22	import (
    23		"internal/cpu"
    24		"runtime/internal/atomic"
    25		"unsafe"
    26	)
    27	
    28	// Asynchronous semaphore for sync.Mutex.
    29	
    30	// A semaRoot holds a balanced tree of sudog with distinct addresses (s.elem).
    31	// Each of those sudog may in turn point (through s.waitlink) to a list
    32	// of other sudogs waiting on the same address.
    33	// The operations on the inner lists of sudogs with the same address
    34	// are all O(1). The scanning of the top-level semaRoot list is O(log n),
    35	// where n is the number of distinct addresses with goroutines blocked
    36	// on them that hash to the given semaRoot.
    37	// See golang.org/issue/17953 for a program that worked badly
    38	// before we introduced the second level of list, and test/locklinear.go
    39	// for a test that exercises this.
    40	type semaRoot struct {
    41		lock  mutex
    42		treap *sudog // root of balanced tree of unique waiters.
    43		nwait uint32 // Number of waiters. Read w/o the lock.
    44	}
    45	
    46	// Prime to not correlate with any user patterns.
    47	const semTabSize = 251
    48	
    49	var semtable [semTabSize]struct {
    50		root semaRoot
    51		pad  [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte
    52	}
    53	
    54	//go:linkname sync_runtime_Semacquire sync.runtime_Semacquire
    55	func sync_runtime_Semacquire(addr *uint32) {
    56		semacquire1(addr, false, semaBlockProfile, 0)
    57	}
    58	
    59	//go:linkname poll_runtime_Semacquire internal/poll.runtime_Semacquire
    60	func poll_runtime_Semacquire(addr *uint32) {
    61		semacquire1(addr, false, semaBlockProfile, 0)
    62	}
    63	
    64	//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease
    65	func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) {
    66		semrelease1(addr, handoff, skipframes)
    67	}
    68	
    69	//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
    70	func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) {
    71		semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes)
    72	}
    73	
    74	//go:linkname poll_runtime_Semrelease internal/poll.runtime_Semrelease
    75	func poll_runtime_Semrelease(addr *uint32) {
    76		semrelease(addr)
    77	}
    78	
    79	func readyWithTime(s *sudog, traceskip int) {
    80		if s.releasetime != 0 {
    81			s.releasetime = cputicks()
    82		}
    83		goready(s.g, traceskip)
    84	}
    85	
    86	type semaProfileFlags int
    87	
    88	const (
    89		semaBlockProfile semaProfileFlags = 1 << iota
    90		semaMutexProfile
    91	)
    92	
    93	// Called from runtime.
    94	func semacquire(addr *uint32) {
    95		semacquire1(addr, false, 0, 0)
    96	}
    97	
    98	func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) {
    99		gp := getg()
   100		if gp != gp.m.curg {
   101			throw("semacquire not on the G stack")
   102		}
   103	
   104		// Easy case.
   105		if cansemacquire(addr) {
   106			return
   107		}
   108	
   109		// Harder case:
   110		//	increment waiter count
   111		//	try cansemacquire one more time, return if succeeded
   112		//	enqueue itself as a waiter
   113		//	sleep
   114		//	(waiter descriptor is dequeued by signaler)
   115		s := acquireSudog()
   116		root := semroot(addr)
   117		t0 := int64(0)
   118		s.releasetime = 0
   119		s.acquiretime = 0
   120		s.ticket = 0
   121		if profile&semaBlockProfile != 0 && blockprofilerate > 0 {
   122			t0 = cputicks()
   123			s.releasetime = -1
   124		}
   125		if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {
   126			if t0 == 0 {
   127				t0 = cputicks()
   128			}
   129			s.acquiretime = t0
   130		}
   131		for {
   132			lock(&root.lock)
   133			// Add ourselves to nwait to disable "easy case" in semrelease.
   134			atomic.Xadd(&root.nwait, 1)
   135			// Check cansemacquire to avoid missed wakeup.
   136			if cansemacquire(addr) {
   137				atomic.Xadd(&root.nwait, -1)
   138				unlock(&root.lock)
   139				break
   140			}
   141			// Any semrelease after the cansemacquire knows we're waiting
   142			// (we set nwait above), so go to sleep.
   143			root.queue(addr, s, lifo)
   144			goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)
   145			if s.ticket != 0 || cansemacquire(addr) {
   146				break
   147			}
   148		}
   149		if s.releasetime > 0 {
   150			blockevent(s.releasetime-t0, 3+skipframes)
   151		}
   152		releaseSudog(s)
   153	}
   154	
   155	func semrelease(addr *uint32) {
   156		semrelease1(addr, false, 0)
   157	}
   158	
   159	func semrelease1(addr *uint32, handoff bool, skipframes int) {
   160		root := semroot(addr)
   161		atomic.Xadd(addr, 1)
   162	
   163		// Easy case: no waiters?
   164		// This check must happen after the xadd, to avoid a missed wakeup
   165		// (see loop in semacquire).
   166		if atomic.Load(&root.nwait) == 0 {
   167			return
   168		}
   169	
   170		// Harder case: search for a waiter and wake it.
   171		lock(&root.lock)
   172		if atomic.Load(&root.nwait) == 0 {
   173			// The count is already consumed by another goroutine,
   174			// so no need to wake up another goroutine.
   175			unlock(&root.lock)
   176			return
   177		}
   178		s, t0 := root.dequeue(addr)
   179		if s != nil {
   180			atomic.Xadd(&root.nwait, -1)
   181		}
   182		unlock(&root.lock)
   183		if s != nil { // May be slow, so unlock first
   184			acquiretime := s.acquiretime
   185			if acquiretime != 0 {
   186				mutexevent(t0-acquiretime, 3+skipframes)
   187			}
   188			if s.ticket != 0 {
   189				throw("corrupted semaphore ticket")
   190			}
   191			if handoff && cansemacquire(addr) {
   192				s.ticket = 1
   193			}
   194			readyWithTime(s, 5+skipframes)
   195		}
   196	}
   197	
   198	func semroot(addr *uint32) *semaRoot {
   199		return &semtable[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root
   200	}
   201	
   202	func cansemacquire(addr *uint32) bool {
   203		for {
   204			v := atomic.Load(addr)
   205			if v == 0 {
   206				return false
   207			}
   208			if atomic.Cas(addr, v, v-1) {
   209				return true
   210			}
   211		}
   212	}
   213	
   214	// queue adds s to the blocked goroutines in semaRoot.
   215	func (root *semaRoot) queue(addr *uint32, s *sudog, lifo bool) {
   216		s.g = getg()
   217		s.elem = unsafe.Pointer(addr)
   218		s.next = nil
   219		s.prev = nil
   220	
   221		var last *sudog
   222		pt := &root.treap
   223		for t := *pt; t != nil; t = *pt {
   224			if t.elem == unsafe.Pointer(addr) {
   225				// Already have addr in list.
   226				if lifo {
   227					// Substitute s in t's place in treap.
   228					*pt = s
   229					s.ticket = t.ticket
   230					s.acquiretime = t.acquiretime
   231					s.parent = t.parent
   232					s.prev = t.prev
   233					s.next = t.next
   234					if s.prev != nil {
   235						s.prev.parent = s
   236					}
   237					if s.next != nil {
   238						s.next.parent = s
   239					}
   240					// Add t first in s's wait list.
   241					s.waitlink = t
   242					s.waittail = t.waittail
   243					if s.waittail == nil {
   244						s.waittail = t
   245					}
   246					t.parent = nil
   247					t.prev = nil
   248					t.next = nil
   249					t.waittail = nil
   250				} else {
   251					// Add s to end of t's wait list.
   252					if t.waittail == nil {
   253						t.waitlink = s
   254					} else {
   255						t.waittail.waitlink = s
   256					}
   257					t.waittail = s
   258					s.waitlink = nil
   259				}
   260				return
   261			}
   262			last = t
   263			if uintptr(unsafe.Pointer(addr)) < uintptr(t.elem) {
   264				pt = &t.prev
   265			} else {
   266				pt = &t.next
   267			}
   268		}
   269	
   270		// Add s as new leaf in tree of unique addrs.
   271		// The balanced tree is a treap using ticket as the random heap priority.
   272		// That is, it is a binary tree ordered according to the elem addresses,
   273		// but then among the space of possible binary trees respecting those
   274		// addresses, it is kept balanced on average by maintaining a heap ordering
   275		// on the ticket: s.ticket <= both s.prev.ticket and s.next.ticket.
   276		// https://en.wikipedia.org/wiki/Treap
   277		// https://faculty.washington.edu/aragon/pubs/rst89.pdf
   278		//
   279		// s.ticket compared with zero in couple of places, therefore set lowest bit.
   280		// It will not affect treap's quality noticeably.
   281		s.ticket = fastrand() | 1
   282		s.parent = last
   283		*pt = s
   284	
   285		// Rotate up into tree according to ticket (priority).
   286		for s.parent != nil && s.parent.ticket > s.ticket {
   287			if s.parent.prev == s {
   288				root.rotateRight(s.parent)
   289			} else {
   290				if s.parent.next != s {
   291					panic("semaRoot queue")
   292				}
   293				root.rotateLeft(s.parent)
   294			}
   295		}
   296	}
   297	
   298	// dequeue searches for and finds the first goroutine
   299	// in semaRoot blocked on addr.
   300	// If the sudog was being profiled, dequeue returns the time
   301	// at which it was woken up as now. Otherwise now is 0.
   302	func (root *semaRoot) dequeue(addr *uint32) (found *sudog, now int64) {
   303		ps := &root.treap
   304		s := *ps
   305		for ; s != nil; s = *ps {
   306			if s.elem == unsafe.Pointer(addr) {
   307				goto Found
   308			}
   309			if uintptr(unsafe.Pointer(addr)) < uintptr(s.elem) {
   310				ps = &s.prev
   311			} else {
   312				ps = &s.next
   313			}
   314		}
   315		return nil, 0
   316	
   317	Found:
   318		now = int64(0)
   319		if s.acquiretime != 0 {
   320			now = cputicks()
   321		}
   322		if t := s.waitlink; t != nil {
   323			// Substitute t, also waiting on addr, for s in root tree of unique addrs.
   324			*ps = t
   325			t.ticket = s.ticket
   326			t.parent = s.parent
   327			t.prev = s.prev
   328			if t.prev != nil {
   329				t.prev.parent = t
   330			}
   331			t.next = s.next
   332			if t.next != nil {
   333				t.next.parent = t
   334			}
   335			if t.waitlink != nil {
   336				t.waittail = s.waittail
   337			} else {
   338				t.waittail = nil
   339			}
   340			t.acquiretime = now
   341			s.waitlink = nil
   342			s.waittail = nil
   343		} else {
   344			// Rotate s down to be leaf of tree for removal, respecting priorities.
   345			for s.next != nil || s.prev != nil {
   346				if s.next == nil || s.prev != nil && s.prev.ticket < s.next.ticket {
   347					root.rotateRight(s)
   348				} else {
   349					root.rotateLeft(s)
   350				}
   351			}
   352			// Remove s, now a leaf.
   353			if s.parent != nil {
   354				if s.parent.prev == s {
   355					s.parent.prev = nil
   356				} else {
   357					s.parent.next = nil
   358				}
   359			} else {
   360				root.treap = nil
   361			}
   362		}
   363		s.parent = nil
   364		s.elem = nil
   365		s.next = nil
   366		s.prev = nil
   367		s.ticket = 0
   368		return s, now
   369	}
   370	
   371	// rotateLeft rotates the tree rooted at node x.
   372	// turning (x a (y b c)) into (y (x a b) c).
   373	func (root *semaRoot) rotateLeft(x *sudog) {
   374		// p -> (x a (y b c))
   375		p := x.parent
   376		a, y := x.prev, x.next
   377		b, c := y.prev, y.next
   378	
   379		y.prev = x
   380		x.parent = y
   381		y.next = c
   382		if c != nil {
   383			c.parent = y
   384		}
   385		x.prev = a
   386		if a != nil {
   387			a.parent = x
   388		}
   389		x.next = b
   390		if b != nil {
   391			b.parent = x
   392		}
   393	
   394		y.parent = p
   395		if p == nil {
   396			root.treap = y
   397		} else if p.prev == x {
   398			p.prev = y
   399		} else {
   400			if p.next != x {
   401				throw("semaRoot rotateLeft")
   402			}
   403			p.next = y
   404		}
   405	}
   406	
   407	// rotateRight rotates the tree rooted at node y.
   408	// turning (y (x a b) c) into (x a (y b c)).
   409	func (root *semaRoot) rotateRight(y *sudog) {
   410		// p -> (y (x a b) c)
   411		p := y.parent
   412		x, c := y.prev, y.next
   413		a, b := x.prev, x.next
   414	
   415		x.prev = a
   416		if a != nil {
   417			a.parent = x
   418		}
   419		x.next = y
   420		y.parent = x
   421		y.prev = b
   422		if b != nil {
   423			b.parent = y
   424		}
   425		y.next = c
   426		if c != nil {
   427			c.parent = y
   428		}
   429	
   430		x.parent = p
   431		if p == nil {
   432			root.treap = x
   433		} else if p.prev == y {
   434			p.prev = x
   435		} else {
   436			if p.next != y {
   437				throw("semaRoot rotateRight")
   438			}
   439			p.next = x
   440		}
   441	}
   442	
   443	// notifyList is a ticket-based notification list used to implement sync.Cond.
   444	//
   445	// It must be kept in sync with the sync package.
   446	type notifyList struct {
   447		// wait is the ticket number of the next waiter. It is atomically
   448		// incremented outside the lock.
   449		wait uint32
   450	
   451		// notify is the ticket number of the next waiter to be notified. It can
   452		// be read outside the lock, but is only written to with lock held.
   453		//
   454		// Both wait & notify can wrap around, and such cases will be correctly
   455		// handled as long as their "unwrapped" difference is bounded by 2^31.
   456		// For this not to be the case, we'd need to have 2^31+ goroutines
   457		// blocked on the same condvar, which is currently not possible.
   458		notify uint32
   459	
   460		// List of parked waiters.
   461		lock mutex
   462		head *sudog
   463		tail *sudog
   464	}
   465	
   466	// less checks if a < b, considering a & b running counts that may overflow the
   467	// 32-bit range, and that their "unwrapped" difference is always less than 2^31.
   468	func less(a, b uint32) bool {
   469		return int32(a-b) < 0
   470	}
   471	
   472	// notifyListAdd adds the caller to a notify list such that it can receive
   473	// notifications. The caller must eventually call notifyListWait to wait for
   474	// such a notification, passing the returned ticket number.
   475	//go:linkname notifyListAdd sync.runtime_notifyListAdd
   476	func notifyListAdd(l *notifyList) uint32 {
   477		// This may be called concurrently, for example, when called from
   478		// sync.Cond.Wait while holding a RWMutex in read mode.
   479		return atomic.Xadd(&l.wait, 1) - 1
   480	}
   481	
   482	// notifyListWait waits for a notification. If one has been sent since
   483	// notifyListAdd was called, it returns immediately. Otherwise, it blocks.
   484	//go:linkname notifyListWait sync.runtime_notifyListWait
   485	func notifyListWait(l *notifyList, t uint32) {
   486		lock(&l.lock)
   487	
   488		// Return right away if this ticket has already been notified.
   489		if less(t, l.notify) {
   490			unlock(&l.lock)
   491			return
   492		}
   493	
   494		// Enqueue itself.
   495		s := acquireSudog()
   496		s.g = getg()
   497		s.ticket = t
   498		s.releasetime = 0
   499		t0 := int64(0)
   500		if blockprofilerate > 0 {
   501			t0 = cputicks()
   502			s.releasetime = -1
   503		}
   504		if l.tail == nil {
   505			l.head = s
   506		} else {
   507			l.tail.next = s
   508		}
   509		l.tail = s
   510		goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
   511		if t0 != 0 {
   512			blockevent(s.releasetime-t0, 2)
   513		}
   514		releaseSudog(s)
   515	}
   516	
   517	// notifyListNotifyAll notifies all entries in the list.
   518	//go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll
   519	func notifyListNotifyAll(l *notifyList) {
   520		// Fast-path: if there are no new waiters since the last notification
   521		// we don't need to acquire the lock.
   522		if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
   523			return
   524		}
   525	
   526		// Pull the list out into a local variable, waiters will be readied
   527		// outside the lock.
   528		lock(&l.lock)
   529		s := l.head
   530		l.head = nil
   531		l.tail = nil
   532	
   533		// Update the next ticket to be notified. We can set it to the current
   534		// value of wait because any previous waiters are already in the list
   535		// or will notice that they have already been notified when trying to
   536		// add themselves to the list.
   537		atomic.Store(&l.notify, atomic.Load(&l.wait))
   538		unlock(&l.lock)
   539	
   540		// Go through the local list and ready all waiters.
   541		for s != nil {
   542			next := s.next
   543			s.next = nil
   544			readyWithTime(s, 4)
   545			s = next
   546		}
   547	}
   548	
   549	// notifyListNotifyOne notifies one entry in the list.
   550	//go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne
   551	func notifyListNotifyOne(l *notifyList) {
   552		// Fast-path: if there are no new waiters since the last notification
   553		// we don't need to acquire the lock at all.
   554		if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
   555			return
   556		}
   557	
   558		lock(&l.lock)
   559	
   560		// Re-check under the lock if we need to do anything.
   561		t := l.notify
   562		if t == atomic.Load(&l.wait) {
   563			unlock(&l.lock)
   564			return
   565		}
   566	
   567		// Update the next notify ticket number.
   568		atomic.Store(&l.notify, t+1)
   569	
   570		// Try to find the g that needs to be notified.
   571		// If it hasn't made it to the list yet we won't find it,
   572		// but it won't park itself once it sees the new notify number.
   573		//
   574		// This scan looks linear but essentially always stops quickly.
   575		// Because g's queue separately from taking numbers,
   576		// there may be minor reorderings in the list, but we
   577		// expect the g we're looking for to be near the front.
   578		// The g has others in front of it on the list only to the
   579		// extent that it lost the race, so the iteration will not
   580		// be too long. This applies even when the g is missing:
   581		// it hasn't yet gotten to sleep and has lost the race to
   582		// the (few) other g's that we find on the list.
   583		for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
   584			if s.ticket == t {
   585				n := s.next
   586				if p != nil {
   587					p.next = n
   588				} else {
   589					l.head = n
   590				}
   591				if n == nil {
   592					l.tail = p
   593				}
   594				unlock(&l.lock)
   595				s.next = nil
   596				readyWithTime(s, 4)
   597				return
   598			}
   599		}
   600		unlock(&l.lock)
   601	}
   602	
   603	//go:linkname notifyListCheck sync.runtime_notifyListCheck
   604	func notifyListCheck(sz uintptr) {
   605		if sz != unsafe.Sizeof(notifyList{}) {
   606			print("runtime: bad notifyList size - sync=", sz, " runtime=", unsafe.Sizeof(notifyList{}), "\n")
   607			throw("bad notifyList size")
   608		}
   609	}
   610	
   611	//go:linkname sync_nanotime sync.runtime_nanotime
   612	func sync_nanotime() int64 {
   613		return nanotime()
   614	}
   615	

View as plain text