Source file src/runtime/chan.go
1
2
3
4
5 package runtime
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 import (
21 "runtime/internal/atomic"
22 "runtime/internal/math"
23 "unsafe"
24 )
25
26 const (
27 maxAlign = 8
28 hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
29 debugChan = false
30 )
31
32 type hchan struct {
33 qcount uint
34 dataqsiz uint
35 buf unsafe.Pointer
36 elemsize uint16
37 closed uint32
38 elemtype *_type
39 sendx uint
40 recvx uint
41 recvq waitq
42 sendq waitq
43
44
45
46
47
48
49
50 lock mutex
51 }
52
53 type waitq struct {
54 first *sudog
55 last *sudog
56 }
57
58
59 func reflect_makechan(t *chantype, size int) *hchan {
60 return makechan(t, size)
61 }
62
63 func makechan64(t *chantype, size int64) *hchan {
64 if int64(int(size)) != size {
65 panic(plainError("makechan: size out of range"))
66 }
67
68 return makechan(t, int(size))
69 }
70
71 func makechan(t *chantype, size int) *hchan {
72 elem := t.elem
73
74
75 if elem.size >= 1<<16 {
76 throw("makechan: invalid channel element type")
77 }
78 if hchanSize%maxAlign != 0 || elem.align > maxAlign {
79 throw("makechan: bad alignment")
80 }
81
82 mem, overflow := math.MulUintptr(elem.size, uintptr(size))
83 if overflow || mem > maxAlloc-hchanSize || size < 0 {
84 panic(plainError("makechan: size out of range"))
85 }
86
87
88
89
90
91 var c *hchan
92 switch {
93 case mem == 0:
94
95 c = (*hchan)(mallocgc(hchanSize, nil, true))
96
97 c.buf = c.raceaddr()
98 case elem.ptrdata == 0:
99
100
101 c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
102 c.buf = add(unsafe.Pointer(c), hchanSize)
103 default:
104
105 c = new(hchan)
106 c.buf = mallocgc(mem, elem, true)
107 }
108
109 c.elemsize = uint16(elem.size)
110 c.elemtype = elem
111 c.dataqsiz = uint(size)
112
113 if debugChan {
114 print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
115 }
116 return c
117 }
118
119
120 func chanbuf(c *hchan, i uint) unsafe.Pointer {
121 return add(c.buf, uintptr(i)*uintptr(c.elemsize))
122 }
123
124
125
126 func chansend1(c *hchan, elem unsafe.Pointer) {
127 chansend(c, elem, true, getcallerpc())
128 }
129
130
142 func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
143 if c == nil {
144 if !block {
145 return false
146 }
147 gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
148 throw("unreachable")
149 }
150
151 if debugChan {
152 print("chansend: chan=", c, "\n")
153 }
154
155 if raceenabled {
156 racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
157 }
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173 if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
174 (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
175 return false
176 }
177
178 var t0 int64
179 if blockprofilerate > 0 {
180 t0 = cputicks()
181 }
182
183 lock(&c.lock)
184
185 if c.closed != 0 {
186 unlock(&c.lock)
187 panic(plainError("send on closed channel"))
188 }
189
190 if sg := c.recvq.dequeue(); sg != nil {
191
192
193 send(c, sg, ep, func() { unlock(&c.lock) }, 3)
194 return true
195 }
196
197 if c.qcount < c.dataqsiz {
198
199 qp := chanbuf(c, c.sendx)
200 if raceenabled {
201 raceacquire(qp)
202 racerelease(qp)
203 }
204 typedmemmove(c.elemtype, qp, ep)
205 c.sendx++
206 if c.sendx == c.dataqsiz {
207 c.sendx = 0
208 }
209 c.qcount++
210 unlock(&c.lock)
211 return true
212 }
213
214 if !block {
215 unlock(&c.lock)
216 return false
217 }
218
219
220 gp := getg()
221 mysg := acquireSudog()
222 mysg.releasetime = 0
223 if t0 != 0 {
224 mysg.releasetime = -1
225 }
226
227
228 mysg.elem = ep
229 mysg.waitlink = nil
230 mysg.g = gp
231 mysg.isSelect = false
232 mysg.c = c
233 gp.waiting = mysg
234 gp.param = nil
235 c.sendq.enqueue(mysg)
236 goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
237
238
239
240
241 KeepAlive(ep)
242
243
244 if mysg != gp.waiting {
245 throw("G waiting list is corrupted")
246 }
247 gp.waiting = nil
248 if gp.param == nil {
249 if c.closed == 0 {
250 throw("chansend: spurious wakeup")
251 }
252 panic(plainError("send on closed channel"))
253 }
254 gp.param = nil
255 if mysg.releasetime > 0 {
256 blockevent(mysg.releasetime-t0, 2)
257 }
258 mysg.c = nil
259 releaseSudog(mysg)
260 return true
261 }
262
263
264
265
266
267
268
269 func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
270 if raceenabled {
271 if c.dataqsiz == 0 {
272 racesync(c, sg)
273 } else {
274
275
276
277 qp := chanbuf(c, c.recvx)
278 raceacquire(qp)
279 racerelease(qp)
280 raceacquireg(sg.g, qp)
281 racereleaseg(sg.g, qp)
282 c.recvx++
283 if c.recvx == c.dataqsiz {
284 c.recvx = 0
285 }
286 c.sendx = c.recvx
287 }
288 }
289 if sg.elem != nil {
290 sendDirect(c.elemtype, sg, ep)
291 sg.elem = nil
292 }
293 gp := sg.g
294 unlockf()
295 gp.param = unsafe.Pointer(sg)
296 if sg.releasetime != 0 {
297 sg.releasetime = cputicks()
298 }
299 goready(gp, skip+1)
300 }
301
302
303
304
305
306
307
308
309
310
311
312 func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
313
314
315
316
317
318 dst := sg.elem
319 typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
320
321
322 memmove(dst, src, t.size)
323 }
324
325 func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
326
327
328
329 src := sg.elem
330 typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
331 memmove(dst, src, t.size)
332 }
333
334 func closechan(c *hchan) {
335 if c == nil {
336 panic(plainError("close of nil channel"))
337 }
338
339 lock(&c.lock)
340 if c.closed != 0 {
341 unlock(&c.lock)
342 panic(plainError("close of closed channel"))
343 }
344
345 if raceenabled {
346 callerpc := getcallerpc()
347 racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
348 racerelease(c.raceaddr())
349 }
350
351 c.closed = 1
352
353 var glist gList
354
355
356 for {
357 sg := c.recvq.dequeue()
358 if sg == nil {
359 break
360 }
361 if sg.elem != nil {
362 typedmemclr(c.elemtype, sg.elem)
363 sg.elem = nil
364 }
365 if sg.releasetime != 0 {
366 sg.releasetime = cputicks()
367 }
368 gp := sg.g
369 gp.param = nil
370 if raceenabled {
371 raceacquireg(gp, c.raceaddr())
372 }
373 glist.push(gp)
374 }
375
376
377 for {
378 sg := c.sendq.dequeue()
379 if sg == nil {
380 break
381 }
382 sg.elem = nil
383 if sg.releasetime != 0 {
384 sg.releasetime = cputicks()
385 }
386 gp := sg.g
387 gp.param = nil
388 if raceenabled {
389 raceacquireg(gp, c.raceaddr())
390 }
391 glist.push(gp)
392 }
393 unlock(&c.lock)
394
395
396 for !glist.empty() {
397 gp := glist.pop()
398 gp.schedlink = 0
399 goready(gp, 3)
400 }
401 }
402
403
404
405 func chanrecv1(c *hchan, elem unsafe.Pointer) {
406 chanrecv(c, elem, true)
407 }
408
409
410 func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
411 _, received = chanrecv(c, elem, true)
412 return
413 }
414
415
416
417
418
419
420
421 func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
422
423
424
425 if debugChan {
426 print("chanrecv: chan=", c, "\n")
427 }
428
429 if c == nil {
430 if !block {
431 return
432 }
433 gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
434 throw("unreachable")
435 }
436
437
438
439
440
441
442
443
444
445
446
447
448
449 if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
450 c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
451 atomic.Load(&c.closed) == 0 {
452 return
453 }
454
455 var t0 int64
456 if blockprofilerate > 0 {
457 t0 = cputicks()
458 }
459
460 lock(&c.lock)
461
462 if c.closed != 0 && c.qcount == 0 {
463 if raceenabled {
464 raceacquire(c.raceaddr())
465 }
466 unlock(&c.lock)
467 if ep != nil {
468 typedmemclr(c.elemtype, ep)
469 }
470 return true, false
471 }
472
473 if sg := c.sendq.dequeue(); sg != nil {
474
475
476
477
478 recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
479 return true, true
480 }
481
482 if c.qcount > 0 {
483
484 qp := chanbuf(c, c.recvx)
485 if raceenabled {
486 raceacquire(qp)
487 racerelease(qp)
488 }
489 if ep != nil {
490 typedmemmove(c.elemtype, ep, qp)
491 }
492 typedmemclr(c.elemtype, qp)
493 c.recvx++
494 if c.recvx == c.dataqsiz {
495 c.recvx = 0
496 }
497 c.qcount--
498 unlock(&c.lock)
499 return true, true
500 }
501
502 if !block {
503 unlock(&c.lock)
504 return false, false
505 }
506
507
508 gp := getg()
509 mysg := acquireSudog()
510 mysg.releasetime = 0
511 if t0 != 0 {
512 mysg.releasetime = -1
513 }
514
515
516 mysg.elem = ep
517 mysg.waitlink = nil
518 gp.waiting = mysg
519 mysg.g = gp
520 mysg.isSelect = false
521 mysg.c = c
522 gp.param = nil
523 c.recvq.enqueue(mysg)
524 goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
525
526
527 if mysg != gp.waiting {
528 throw("G waiting list is corrupted")
529 }
530 gp.waiting = nil
531 if mysg.releasetime > 0 {
532 blockevent(mysg.releasetime-t0, 2)
533 }
534 closed := gp.param == nil
535 gp.param = nil
536 mysg.c = nil
537 releaseSudog(mysg)
538 return true, !closed
539 }
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554 func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
555 if c.dataqsiz == 0 {
556 if raceenabled {
557 racesync(c, sg)
558 }
559 if ep != nil {
560
561 recvDirect(c.elemtype, sg, ep)
562 }
563 } else {
564
565
566
567
568 qp := chanbuf(c, c.recvx)
569 if raceenabled {
570 raceacquire(qp)
571 racerelease(qp)
572 raceacquireg(sg.g, qp)
573 racereleaseg(sg.g, qp)
574 }
575
576 if ep != nil {
577 typedmemmove(c.elemtype, ep, qp)
578 }
579
580 typedmemmove(c.elemtype, qp, sg.elem)
581 c.recvx++
582 if c.recvx == c.dataqsiz {
583 c.recvx = 0
584 }
585 c.sendx = c.recvx
586 }
587 sg.elem = nil
588 gp := sg.g
589 unlockf()
590 gp.param = unsafe.Pointer(sg)
591 if sg.releasetime != 0 {
592 sg.releasetime = cputicks()
593 }
594 goready(gp, skip+1)
595 }
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614 func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
615 return chansend(c, elem, false, getcallerpc())
616 }
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635 func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {
636 selected, _ = chanrecv(c, elem, false)
637 return
638 }
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657 func selectnbrecv2(elem unsafe.Pointer, received *bool, c *hchan) (selected bool) {
658
659 selected, *received = chanrecv(c, elem, false)
660 return
661 }
662
663
664 func reflect_chansend(c *hchan, elem unsafe.Pointer, nb bool) (selected bool) {
665 return chansend(c, elem, !nb, getcallerpc())
666 }
667
668
669 func reflect_chanrecv(c *hchan, nb bool, elem unsafe.Pointer) (selected bool, received bool) {
670 return chanrecv(c, elem, !nb)
671 }
672
673
674 func reflect_chanlen(c *hchan) int {
675 if c == nil {
676 return 0
677 }
678 return int(c.qcount)
679 }
680
681
682 func reflectlite_chanlen(c *hchan) int {
683 if c == nil {
684 return 0
685 }
686 return int(c.qcount)
687 }
688
689
690 func reflect_chancap(c *hchan) int {
691 if c == nil {
692 return 0
693 }
694 return int(c.dataqsiz)
695 }
696
697
698 func reflect_chanclose(c *hchan) {
699 closechan(c)
700 }
701
702 func (q *waitq) enqueue(sgp *sudog) {
703 sgp.next = nil
704 x := q.last
705 if x == nil {
706 sgp.prev = nil
707 q.first = sgp
708 q.last = sgp
709 return
710 }
711 sgp.prev = x
712 x.next = sgp
713 q.last = sgp
714 }
715
716 func (q *waitq) dequeue() *sudog {
717 for {
718 sgp := q.first
719 if sgp == nil {
720 return nil
721 }
722 y := sgp.next
723 if y == nil {
724 q.first = nil
725 q.last = nil
726 } else {
727 y.prev = nil
728 q.first = y
729 sgp.next = nil
730 }
731
732
733
734
735
736
737
738
739
740 if sgp.isSelect && !atomic.Cas(&sgp.g.selectDone, 0, 1) {
741 continue
742 }
743
744 return sgp
745 }
746 }
747
748 func (c *hchan) raceaddr() unsafe.Pointer {
749
750
751
752
753
754 return unsafe.Pointer(&c.buf)
755 }
756
757 func racesync(c *hchan, sg *sudog) {
758 racerelease(chanbuf(c, 0))
759 raceacquireg(sg.g, chanbuf(c, 0))
760 racereleaseg(sg.g, chanbuf(c, 0))
761 raceacquire(chanbuf(c, 0))
762 }
763
View as plain text