...

Source file src/runtime/select.go

     1	// Copyright 2009 The Go Authors. All rights reserved.
     2	// Use of this source code is governed by a BSD-style
     3	// license that can be found in the LICENSE file.
     4	
     5	package runtime
     6	
     7	// This file contains the implementation of Go select statements.
     8	
     9	import (
    10		"unsafe"
    11	)
    12	
    13	const debugSelect = false
    14	
    15	// scase.kind values.
    16	// Known to compiler.
    17	// Changes here must also be made in src/cmd/compile/internal/gc/select.go's walkselect.
    18	const (
    19		caseNil = iota
    20		caseRecv
    21		caseSend
    22		caseDefault
    23	)
    24	
    25	// Select case descriptor.
    26	// Known to compiler.
    27	// Changes here must also be made in src/cmd/internal/gc/select.go's scasetype.
    28	type scase struct {
    29		c           *hchan         // chan
    30		elem        unsafe.Pointer // data element
    31		kind        uint16
    32		pc          uintptr // race pc (for race detector / msan)
    33		releasetime int64
    34	}
    35	
    36	var (
    37		chansendpc = funcPC(chansend)
    38		chanrecvpc = funcPC(chanrecv)
    39	)
    40	
    41	func selectsetpc(cas *scase) {
    42		cas.pc = getcallerpc()
    43	}
    44	
    45	func sellock(scases []scase, lockorder []uint16) {
    46		var c *hchan
    47		for _, o := range lockorder {
    48			c0 := scases[o].c
    49			if c0 != nil && c0 != c {
    50				c = c0
    51				lock(&c.lock)
    52			}
    53		}
    54	}
    55	
    56	func selunlock(scases []scase, lockorder []uint16) {
    57		// We must be very careful here to not touch sel after we have unlocked
    58		// the last lock, because sel can be freed right after the last unlock.
    59		// Consider the following situation.
    60		// First M calls runtime·park() in runtime·selectgo() passing the sel.
    61		// Once runtime·park() has unlocked the last lock, another M makes
    62		// the G that calls select runnable again and schedules it for execution.
    63		// When the G runs on another M, it locks all the locks and frees sel.
    64		// Now if the first M touches sel, it will access freed memory.
    65		for i := len(scases) - 1; i >= 0; i-- {
    66			c := scases[lockorder[i]].c
    67			if c == nil {
    68				break
    69			}
    70			if i > 0 && c == scases[lockorder[i-1]].c {
    71				continue // will unlock it on the next iteration
    72			}
    73			unlock(&c.lock)
    74		}
    75	}
    76	
    77	func selparkcommit(gp *g, _ unsafe.Pointer) bool {
    78		// This must not access gp's stack (see gopark). In
    79		// particular, it must not access the *hselect. That's okay,
    80		// because by the time this is called, gp.waiting has all
    81		// channels in lock order.
    82		var lastc *hchan
    83		for sg := gp.waiting; sg != nil; sg = sg.waitlink {
    84			if sg.c != lastc && lastc != nil {
    85				// As soon as we unlock the channel, fields in
    86				// any sudog with that channel may change,
    87				// including c and waitlink. Since multiple
    88				// sudogs may have the same channel, we unlock
    89				// only after we've passed the last instance
    90				// of a channel.
    91				unlock(&lastc.lock)
    92			}
    93			lastc = sg.c
    94		}
    95		if lastc != nil {
    96			unlock(&lastc.lock)
    97		}
    98		return true
    99	}
   100	
   101	func block() {
   102		gopark(nil, nil, waitReasonSelectNoCases, traceEvGoStop, 1) // forever
   103	}
   104	
   105	// selectgo implements the select statement.
   106	//
   107	// cas0 points to an array of type [ncases]scase, and order0 points to
   108	// an array of type [2*ncases]uint16. Both reside on the goroutine's
   109	// stack (regardless of any escaping in selectgo).
   110	//
   111	// selectgo returns the index of the chosen scase, which matches the
   112	// ordinal position of its respective select{recv,send,default} call.
   113	// Also, if the chosen scase was a receive operation, it reports whether
   114	// a value was received.
   115	func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
   116		if debugSelect {
   117			print("select: cas0=", cas0, "\n")
   118		}
   119	
   120		cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
   121		order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
   122	
   123		scases := cas1[:ncases:ncases]
   124		pollorder := order1[:ncases:ncases]
   125		lockorder := order1[ncases:][:ncases:ncases]
   126	
   127		// Replace send/receive cases involving nil channels with
   128		// caseNil so logic below can assume non-nil channel.
   129		for i := range scases {
   130			cas := &scases[i]
   131			if cas.c == nil && cas.kind != caseDefault {
   132				*cas = scase{}
   133			}
   134		}
   135	
   136		var t0 int64
   137		if blockprofilerate > 0 {
   138			t0 = cputicks()
   139			for i := 0; i < ncases; i++ {
   140				scases[i].releasetime = -1
   141			}
   142		}
   143	
   144		// The compiler rewrites selects that statically have
   145		// only 0 or 1 cases plus default into simpler constructs.
   146		// The only way we can end up with such small sel.ncase
   147		// values here is for a larger select in which most channels
   148		// have been nilled out. The general code handles those
   149		// cases correctly, and they are rare enough not to bother
   150		// optimizing (and needing to test).
   151	
   152		// generate permuted order
   153		for i := 1; i < ncases; i++ {
   154			j := fastrandn(uint32(i + 1))
   155			pollorder[i] = pollorder[j]
   156			pollorder[j] = uint16(i)
   157		}
   158	
   159		// sort the cases by Hchan address to get the locking order.
   160		// simple heap sort, to guarantee n log n time and constant stack footprint.
   161		for i := 0; i < ncases; i++ {
   162			j := i
   163			// Start with the pollorder to permute cases on the same channel.
   164			c := scases[pollorder[i]].c
   165			for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {
   166				k := (j - 1) / 2
   167				lockorder[j] = lockorder[k]
   168				j = k
   169			}
   170			lockorder[j] = pollorder[i]
   171		}
   172		for i := ncases - 1; i >= 0; i-- {
   173			o := lockorder[i]
   174			c := scases[o].c
   175			lockorder[i] = lockorder[0]
   176			j := 0
   177			for {
   178				k := j*2 + 1
   179				if k >= i {
   180					break
   181				}
   182				if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() {
   183					k++
   184				}
   185				if c.sortkey() < scases[lockorder[k]].c.sortkey() {
   186					lockorder[j] = lockorder[k]
   187					j = k
   188					continue
   189				}
   190				break
   191			}
   192			lockorder[j] = o
   193		}
   194	
   195		if debugSelect {
   196			for i := 0; i+1 < ncases; i++ {
   197				if scases[lockorder[i]].c.sortkey() > scases[lockorder[i+1]].c.sortkey() {
   198					print("i=", i, " x=", lockorder[i], " y=", lockorder[i+1], "\n")
   199					throw("select: broken sort")
   200				}
   201			}
   202		}
   203	
   204		// lock all the channels involved in the select
   205		sellock(scases, lockorder)
   206	
   207		var (
   208			gp     *g
   209			sg     *sudog
   210			c      *hchan
   211			k      *scase
   212			sglist *sudog
   213			sgnext *sudog
   214			qp     unsafe.Pointer
   215			nextp  **sudog
   216		)
   217	
   218	loop:
   219		// pass 1 - look for something already waiting
   220		var dfli int
   221		var dfl *scase
   222		var casi int
   223		var cas *scase
   224		var recvOK bool
   225		for i := 0; i < ncases; i++ {
   226			casi = int(pollorder[i])
   227			cas = &scases[casi]
   228			c = cas.c
   229	
   230			switch cas.kind {
   231			case caseNil:
   232				continue
   233	
   234			case caseRecv:
   235				sg = c.sendq.dequeue()
   236				if sg != nil {
   237					goto recv
   238				}
   239				if c.qcount > 0 {
   240					goto bufrecv
   241				}
   242				if c.closed != 0 {
   243					goto rclose
   244				}
   245	
   246			case caseSend:
   247				if raceenabled {
   248					racereadpc(c.raceaddr(), cas.pc, chansendpc)
   249				}
   250				if c.closed != 0 {
   251					goto sclose
   252				}
   253				sg = c.recvq.dequeue()
   254				if sg != nil {
   255					goto send
   256				}
   257				if c.qcount < c.dataqsiz {
   258					goto bufsend
   259				}
   260	
   261			case caseDefault:
   262				dfli = casi
   263				dfl = cas
   264			}
   265		}
   266	
   267		if dfl != nil {
   268			selunlock(scases, lockorder)
   269			casi = dfli
   270			cas = dfl
   271			goto retc
   272		}
   273	
   274		// pass 2 - enqueue on all chans
   275		gp = getg()
   276		if gp.waiting != nil {
   277			throw("gp.waiting != nil")
   278		}
   279		nextp = &gp.waiting
   280		for _, casei := range lockorder {
   281			casi = int(casei)
   282			cas = &scases[casi]
   283			if cas.kind == caseNil {
   284				continue
   285			}
   286			c = cas.c
   287			sg := acquireSudog()
   288			sg.g = gp
   289			sg.isSelect = true
   290			// No stack splits between assigning elem and enqueuing
   291			// sg on gp.waiting where copystack can find it.
   292			sg.elem = cas.elem
   293			sg.releasetime = 0
   294			if t0 != 0 {
   295				sg.releasetime = -1
   296			}
   297			sg.c = c
   298			// Construct waiting list in lock order.
   299			*nextp = sg
   300			nextp = &sg.waitlink
   301	
   302			switch cas.kind {
   303			case caseRecv:
   304				c.recvq.enqueue(sg)
   305	
   306			case caseSend:
   307				c.sendq.enqueue(sg)
   308			}
   309		}
   310	
   311		// wait for someone to wake us up
   312		gp.param = nil
   313		gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)
   314	
   315		sellock(scases, lockorder)
   316	
   317		gp.selectDone = 0
   318		sg = (*sudog)(gp.param)
   319		gp.param = nil
   320	
   321		// pass 3 - dequeue from unsuccessful chans
   322		// otherwise they stack up on quiet channels
   323		// record the successful case, if any.
   324		// We singly-linked up the SudoGs in lock order.
   325		casi = -1
   326		cas = nil
   327		sglist = gp.waiting
   328		// Clear all elem before unlinking from gp.waiting.
   329		for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
   330			sg1.isSelect = false
   331			sg1.elem = nil
   332			sg1.c = nil
   333		}
   334		gp.waiting = nil
   335	
   336		for _, casei := range lockorder {
   337			k = &scases[casei]
   338			if k.kind == caseNil {
   339				continue
   340			}
   341			if sglist.releasetime > 0 {
   342				k.releasetime = sglist.releasetime
   343			}
   344			if sg == sglist {
   345				// sg has already been dequeued by the G that woke us up.
   346				casi = int(casei)
   347				cas = k
   348			} else {
   349				c = k.c
   350				if k.kind == caseSend {
   351					c.sendq.dequeueSudoG(sglist)
   352				} else {
   353					c.recvq.dequeueSudoG(sglist)
   354				}
   355			}
   356			sgnext = sglist.waitlink
   357			sglist.waitlink = nil
   358			releaseSudog(sglist)
   359			sglist = sgnext
   360		}
   361	
   362		if cas == nil {
   363			// We can wake up with gp.param == nil (so cas == nil)
   364			// when a channel involved in the select has been closed.
   365			// It is easiest to loop and re-run the operation;
   366			// we'll see that it's now closed.
   367			// Maybe some day we can signal the close explicitly,
   368			// but we'd have to distinguish close-on-reader from close-on-writer.
   369			// It's easiest not to duplicate the code and just recheck above.
   370			// We know that something closed, and things never un-close,
   371			// so we won't block again.
   372			goto loop
   373		}
   374	
   375		c = cas.c
   376	
   377		if debugSelect {
   378			print("wait-return: cas0=", cas0, " c=", c, " cas=", cas, " kind=", cas.kind, "\n")
   379		}
   380	
   381		if cas.kind == caseRecv {
   382			recvOK = true
   383		}
   384	
   385		if raceenabled {
   386			if cas.kind == caseRecv && cas.elem != nil {
   387				raceWriteObjectPC(c.elemtype, cas.elem, cas.pc, chanrecvpc)
   388			} else if cas.kind == caseSend {
   389				raceReadObjectPC(c.elemtype, cas.elem, cas.pc, chansendpc)
   390			}
   391		}
   392		if msanenabled {
   393			if cas.kind == caseRecv && cas.elem != nil {
   394				msanwrite(cas.elem, c.elemtype.size)
   395			} else if cas.kind == caseSend {
   396				msanread(cas.elem, c.elemtype.size)
   397			}
   398		}
   399	
   400		selunlock(scases, lockorder)
   401		goto retc
   402	
   403	bufrecv:
   404		// can receive from buffer
   405		if raceenabled {
   406			if cas.elem != nil {
   407				raceWriteObjectPC(c.elemtype, cas.elem, cas.pc, chanrecvpc)
   408			}
   409			raceacquire(chanbuf(c, c.recvx))
   410			racerelease(chanbuf(c, c.recvx))
   411		}
   412		if msanenabled && cas.elem != nil {
   413			msanwrite(cas.elem, c.elemtype.size)
   414		}
   415		recvOK = true
   416		qp = chanbuf(c, c.recvx)
   417		if cas.elem != nil {
   418			typedmemmove(c.elemtype, cas.elem, qp)
   419		}
   420		typedmemclr(c.elemtype, qp)
   421		c.recvx++
   422		if c.recvx == c.dataqsiz {
   423			c.recvx = 0
   424		}
   425		c.qcount--
   426		selunlock(scases, lockorder)
   427		goto retc
   428	
   429	bufsend:
   430		// can send to buffer
   431		if raceenabled {
   432			raceacquire(chanbuf(c, c.sendx))
   433			racerelease(chanbuf(c, c.sendx))
   434			raceReadObjectPC(c.elemtype, cas.elem, cas.pc, chansendpc)
   435		}
   436		if msanenabled {
   437			msanread(cas.elem, c.elemtype.size)
   438		}
   439		typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
   440		c.sendx++
   441		if c.sendx == c.dataqsiz {
   442			c.sendx = 0
   443		}
   444		c.qcount++
   445		selunlock(scases, lockorder)
   446		goto retc
   447	
   448	recv:
   449		// can receive from sleeping sender (sg)
   450		recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
   451		if debugSelect {
   452			print("syncrecv: cas0=", cas0, " c=", c, "\n")
   453		}
   454		recvOK = true
   455		goto retc
   456	
   457	rclose:
   458		// read at end of closed channel
   459		selunlock(scases, lockorder)
   460		recvOK = false
   461		if cas.elem != nil {
   462			typedmemclr(c.elemtype, cas.elem)
   463		}
   464		if raceenabled {
   465			raceacquire(c.raceaddr())
   466		}
   467		goto retc
   468	
   469	send:
   470		// can send to a sleeping receiver (sg)
   471		if raceenabled {
   472			raceReadObjectPC(c.elemtype, cas.elem, cas.pc, chansendpc)
   473		}
   474		if msanenabled {
   475			msanread(cas.elem, c.elemtype.size)
   476		}
   477		send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
   478		if debugSelect {
   479			print("syncsend: cas0=", cas0, " c=", c, "\n")
   480		}
   481		goto retc
   482	
   483	retc:
   484		if cas.releasetime > 0 {
   485			blockevent(cas.releasetime-t0, 1)
   486		}
   487		return casi, recvOK
   488	
   489	sclose:
   490		// send on closed channel
   491		selunlock(scases, lockorder)
   492		panic(plainError("send on closed channel"))
   493	}
   494	
   495	func (c *hchan) sortkey() uintptr {
   496		// TODO(khr): if we have a moving garbage collector, we'll need to
   497		// change this function.
   498		return uintptr(unsafe.Pointer(c))
   499	}
   500	
   501	// A runtimeSelect is a single case passed to rselect.
   502	// This must match ../reflect/value.go:/runtimeSelect
   503	type runtimeSelect struct {
   504		dir selectDir
   505		typ unsafe.Pointer // channel type (not used here)
   506		ch  *hchan         // channel
   507		val unsafe.Pointer // ptr to data (SendDir) or ptr to receive buffer (RecvDir)
   508	}
   509	
   510	// These values must match ../reflect/value.go:/SelectDir.
   511	type selectDir int
   512	
   513	const (
   514		_             selectDir = iota
   515		selectSend              // case Chan <- Send
   516		selectRecv              // case <-Chan:
   517		selectDefault           // default
   518	)
   519	
   520	//go:linkname reflect_rselect reflect.rselect
   521	func reflect_rselect(cases []runtimeSelect) (int, bool) {
   522		if len(cases) == 0 {
   523			block()
   524		}
   525		sel := make([]scase, len(cases))
   526		order := make([]uint16, 2*len(cases))
   527		for i := range cases {
   528			rc := &cases[i]
   529			switch rc.dir {
   530			case selectDefault:
   531				sel[i] = scase{kind: caseDefault}
   532			case selectSend:
   533				sel[i] = scase{kind: caseSend, c: rc.ch, elem: rc.val}
   534			case selectRecv:
   535				sel[i] = scase{kind: caseRecv, c: rc.ch, elem: rc.val}
   536			}
   537			if raceenabled || msanenabled {
   538				selectsetpc(&sel[i])
   539			}
   540		}
   541	
   542		return selectgo(&sel[0], &order[0], len(cases))
   543	}
   544	
   545	func (q *waitq) dequeueSudoG(sgp *sudog) {
   546		x := sgp.prev
   547		y := sgp.next
   548		if x != nil {
   549			if y != nil {
   550				// middle of queue
   551				x.next = y
   552				y.prev = x
   553				sgp.next = nil
   554				sgp.prev = nil
   555				return
   556			}
   557			// end of queue
   558			x.next = nil
   559			q.last = x
   560			sgp.prev = nil
   561			return
   562		}
   563		if y != nil {
   564			// start of queue
   565			y.prev = nil
   566			q.first = y
   567			sgp.next = nil
   568			return
   569		}
   570	
   571		// x==y==nil. Either sgp is the only element in the queue,
   572		// or it has already been removed. Use q.first to disambiguate.
   573		if q.first == sgp {
   574			q.first = nil
   575			q.last = nil
   576		}
   577	}
   578	

View as plain text