Source file src/pkg/runtime/netpoll.go
1
2
3
4
5
6
7 package runtime
8
9 import (
10 "runtime/internal/atomic"
11 "unsafe"
12 )
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34 const (
35 pdReady uintptr = 1
36 pdWait uintptr = 2
37 )
38
39 const pollBlockSize = 4 * 1024
40
41
42
43
44
45
46 type pollDesc struct {
47 link *pollDesc
48
49
50
51
52
53
54
55
56 lock mutex
57 fd uintptr
58 closing bool
59 everr bool
60 user uint32
61 rseq uintptr
62 rg uintptr
63 rt timer
64 rd int64
65 wseq uintptr
66 wg uintptr
67 wt timer
68 wd int64
69 }
70
71 type pollCache struct {
72 lock mutex
73 first *pollDesc
74
75
76
77
78
79 }
80
81 var (
82 netpollInited uint32
83 pollcache pollCache
84 netpollWaiters uint32
85 )
86
87
88 func poll_runtime_pollServerInit() {
89 netpollinit()
90 atomic.Store(&netpollInited, 1)
91 }
92
93 func netpollinited() bool {
94 return atomic.Load(&netpollInited) != 0
95 }
96
97
98
99
100
101 func poll_runtime_isPollServerDescriptor(fd uintptr) bool {
102 fds := netpolldescriptor()
103 if GOOS != "aix" {
104 return fd == fds
105 } else {
106
107
108 return fd == fds&0xFFFF || fd == (fds>>16)&0xFFFF
109 }
110 }
111
112
113 func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
114 pd := pollcache.alloc()
115 lock(&pd.lock)
116 if pd.wg != 0 && pd.wg != pdReady {
117 throw("runtime: blocked write on free polldesc")
118 }
119 if pd.rg != 0 && pd.rg != pdReady {
120 throw("runtime: blocked read on free polldesc")
121 }
122 pd.fd = fd
123 pd.closing = false
124 pd.everr = false
125 pd.rseq++
126 pd.rg = 0
127 pd.rd = 0
128 pd.wseq++
129 pd.wg = 0
130 pd.wd = 0
131 unlock(&pd.lock)
132
133 var errno int32
134 errno = netpollopen(fd, pd)
135 return pd, int(errno)
136 }
137
138
139 func poll_runtime_pollClose(pd *pollDesc) {
140 if !pd.closing {
141 throw("runtime: close polldesc w/o unblock")
142 }
143 if pd.wg != 0 && pd.wg != pdReady {
144 throw("runtime: blocked write on closing polldesc")
145 }
146 if pd.rg != 0 && pd.rg != pdReady {
147 throw("runtime: blocked read on closing polldesc")
148 }
149 netpollclose(pd.fd)
150 pollcache.free(pd)
151 }
152
153 func (c *pollCache) free(pd *pollDesc) {
154 lock(&c.lock)
155 pd.link = c.first
156 c.first = pd
157 unlock(&c.lock)
158 }
159
160
161 func poll_runtime_pollReset(pd *pollDesc, mode int) int {
162 err := netpollcheckerr(pd, int32(mode))
163 if err != 0 {
164 return err
165 }
166 if mode == 'r' {
167 pd.rg = 0
168 } else if mode == 'w' {
169 pd.wg = 0
170 }
171 return 0
172 }
173
174
175 func poll_runtime_pollWait(pd *pollDesc, mode int) int {
176 err := netpollcheckerr(pd, int32(mode))
177 if err != 0 {
178 return err
179 }
180
181 if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
182 netpollarm(pd, mode)
183 }
184 for !netpollblock(pd, int32(mode), false) {
185 err = netpollcheckerr(pd, int32(mode))
186 if err != 0 {
187 return err
188 }
189
190
191
192 }
193 return 0
194 }
195
196
197 func poll_runtime_pollWaitCanceled(pd *pollDesc, mode int) {
198
199
200 for !netpollblock(pd, int32(mode), true) {
201 }
202 }
203
204
205 func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
206 lock(&pd.lock)
207 if pd.closing {
208 unlock(&pd.lock)
209 return
210 }
211 rd0, wd0 := pd.rd, pd.wd
212 combo0 := rd0 > 0 && rd0 == wd0
213 if d > 0 {
214 d += nanotime()
215 if d <= 0 {
216
217
218 d = 1<<63 - 1
219 }
220 }
221 if mode == 'r' || mode == 'r'+'w' {
222 pd.rd = d
223 }
224 if mode == 'w' || mode == 'r'+'w' {
225 pd.wd = d
226 }
227 combo := pd.rd > 0 && pd.rd == pd.wd
228 rtf := netpollReadDeadline
229 if combo {
230 rtf = netpollDeadline
231 }
232 if pd.rt.f == nil {
233 if pd.rd > 0 {
234 pd.rt.f = rtf
235 pd.rt.when = pd.rd
236
237
238
239 pd.rt.arg = pd
240 pd.rt.seq = pd.rseq
241 addtimer(&pd.rt)
242 }
243 } else if pd.rd != rd0 || combo != combo0 {
244 pd.rseq++
245 if pd.rd > 0 {
246 modtimer(&pd.rt, pd.rd, 0, rtf, pd, pd.rseq)
247 } else {
248 deltimer(&pd.rt)
249 pd.rt.f = nil
250 }
251 }
252 if pd.wt.f == nil {
253 if pd.wd > 0 && !combo {
254 pd.wt.f = netpollWriteDeadline
255 pd.wt.when = pd.wd
256 pd.wt.arg = pd
257 pd.wt.seq = pd.wseq
258 addtimer(&pd.wt)
259 }
260 } else if pd.wd != wd0 || combo != combo0 {
261 pd.wseq++
262 if pd.wd > 0 && !combo {
263 modtimer(&pd.wt, pd.wd, 0, netpollWriteDeadline, pd, pd.wseq)
264 } else {
265 deltimer(&pd.wt)
266 pd.wt.f = nil
267 }
268 }
269
270 var rg, wg *g
271 if pd.rd < 0 || pd.wd < 0 {
272 atomic.StorepNoWB(noescape(unsafe.Pointer(&wg)), nil)
273 if pd.rd < 0 {
274 rg = netpollunblock(pd, 'r', false)
275 }
276 if pd.wd < 0 {
277 wg = netpollunblock(pd, 'w', false)
278 }
279 }
280 unlock(&pd.lock)
281 if rg != nil {
282 netpollgoready(rg, 3)
283 }
284 if wg != nil {
285 netpollgoready(wg, 3)
286 }
287 }
288
289
290 func poll_runtime_pollUnblock(pd *pollDesc) {
291 lock(&pd.lock)
292 if pd.closing {
293 throw("runtime: unblock on closing polldesc")
294 }
295 pd.closing = true
296 pd.rseq++
297 pd.wseq++
298 var rg, wg *g
299 atomic.StorepNoWB(noescape(unsafe.Pointer(&rg)), nil)
300 rg = netpollunblock(pd, 'r', false)
301 wg = netpollunblock(pd, 'w', false)
302 if pd.rt.f != nil {
303 deltimer(&pd.rt)
304 pd.rt.f = nil
305 }
306 if pd.wt.f != nil {
307 deltimer(&pd.wt)
308 pd.wt.f = nil
309 }
310 unlock(&pd.lock)
311 if rg != nil {
312 netpollgoready(rg, 3)
313 }
314 if wg != nil {
315 netpollgoready(wg, 3)
316 }
317 }
318
319
320
321
322 func netpollready(toRun *gList, pd *pollDesc, mode int32) {
323 var rg, wg *g
324 if mode == 'r' || mode == 'r'+'w' {
325 rg = netpollunblock(pd, 'r', true)
326 }
327 if mode == 'w' || mode == 'r'+'w' {
328 wg = netpollunblock(pd, 'w', true)
329 }
330 if rg != nil {
331 toRun.push(rg)
332 }
333 if wg != nil {
334 toRun.push(wg)
335 }
336 }
337
338 func netpollcheckerr(pd *pollDesc, mode int32) int {
339 if pd.closing {
340 return 1
341 }
342 if (mode == 'r' && pd.rd < 0) || (mode == 'w' && pd.wd < 0) {
343 return 2
344 }
345
346
347
348 if mode == 'r' && pd.everr {
349 return 3
350 }
351 return 0
352 }
353
354 func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
355 r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
356 if r {
357
358
359
360 atomic.Xadd(&netpollWaiters, 1)
361 }
362 return r
363 }
364
365 func netpollgoready(gp *g, traceskip int) {
366 atomic.Xadd(&netpollWaiters, -1)
367 goready(gp, traceskip+1)
368 }
369
370
371
372 func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
373 gpp := &pd.rg
374 if mode == 'w' {
375 gpp = &pd.wg
376 }
377
378
379 for {
380 old := *gpp
381 if old == pdReady {
382 *gpp = 0
383 return true
384 }
385 if old != 0 {
386 throw("runtime: double wait")
387 }
388 if atomic.Casuintptr(gpp, 0, pdWait) {
389 break
390 }
391 }
392
393
394
395
396 if waitio || netpollcheckerr(pd, mode) == 0 {
397 gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
398 }
399
400 old := atomic.Xchguintptr(gpp, 0)
401 if old > pdWait {
402 throw("runtime: corrupted polldesc")
403 }
404 return old == pdReady
405 }
406
407 func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
408 gpp := &pd.rg
409 if mode == 'w' {
410 gpp = &pd.wg
411 }
412
413 for {
414 old := *gpp
415 if old == pdReady {
416 return nil
417 }
418 if old == 0 && !ioready {
419
420
421 return nil
422 }
423 var new uintptr
424 if ioready {
425 new = pdReady
426 }
427 if atomic.Casuintptr(gpp, old, new) {
428 if old == pdReady || old == pdWait {
429 old = 0
430 }
431 return (*g)(unsafe.Pointer(old))
432 }
433 }
434 }
435
436 func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) {
437 lock(&pd.lock)
438
439
440 currentSeq := pd.rseq
441 if !read {
442 currentSeq = pd.wseq
443 }
444 if seq != currentSeq {
445
446 unlock(&pd.lock)
447 return
448 }
449 var rg *g
450 if read {
451 if pd.rd <= 0 || pd.rt.f == nil {
452 throw("runtime: inconsistent read deadline")
453 }
454 pd.rd = -1
455 atomic.StorepNoWB(unsafe.Pointer(&pd.rt.f), nil)
456 rg = netpollunblock(pd, 'r', false)
457 }
458 var wg *g
459 if write {
460 if pd.wd <= 0 || pd.wt.f == nil && !read {
461 throw("runtime: inconsistent write deadline")
462 }
463 pd.wd = -1
464 atomic.StorepNoWB(unsafe.Pointer(&pd.wt.f), nil)
465 wg = netpollunblock(pd, 'w', false)
466 }
467 unlock(&pd.lock)
468 if rg != nil {
469 netpollgoready(rg, 0)
470 }
471 if wg != nil {
472 netpollgoready(wg, 0)
473 }
474 }
475
476 func netpollDeadline(arg interface{}, seq uintptr) {
477 netpolldeadlineimpl(arg.(*pollDesc), seq, true, true)
478 }
479
480 func netpollReadDeadline(arg interface{}, seq uintptr) {
481 netpolldeadlineimpl(arg.(*pollDesc), seq, true, false)
482 }
483
484 func netpollWriteDeadline(arg interface{}, seq uintptr) {
485 netpolldeadlineimpl(arg.(*pollDesc), seq, false, true)
486 }
487
488 func (c *pollCache) alloc() *pollDesc {
489 lock(&c.lock)
490 if c.first == nil {
491 const pdSize = unsafe.Sizeof(pollDesc{})
492 n := pollBlockSize / pdSize
493 if n == 0 {
494 n = 1
495 }
496
497
498 mem := persistentalloc(n*pdSize, 0, &memstats.other_sys)
499 for i := uintptr(0); i < n; i++ {
500 pd := (*pollDesc)(add(mem, i*pdSize))
501 pd.link = c.first
502 c.first = pd
503 }
504 }
505 pd := c.first
506 c.first = pd.link
507 unlock(&c.lock)
508 return pd
509 }
510
View as plain text