...
Source file src/io/pipe.go
1
2
3
4
5
6
7
8 package io
9
10 import (
11 "errors"
12 "sync"
13 "sync/atomic"
14 )
15
16
17
18 type atomicError struct{ v atomic.Value }
19
20 func (a *atomicError) Store(err error) {
21 a.v.Store(struct{ error }{err})
22 }
23 func (a *atomicError) Load() error {
24 err, _ := a.v.Load().(struct{ error })
25 return err.error
26 }
27
28
29 var ErrClosedPipe = errors.New("io: read/write on closed pipe")
30
31
32 type pipe struct {
33 wrMu sync.Mutex
34 wrCh chan []byte
35 rdCh chan int
36
37 once sync.Once
38 done chan struct{}
39 rerr atomicError
40 werr atomicError
41 }
42
43 func (p *pipe) Read(b []byte) (n int, err error) {
44 select {
45 case <-p.done:
46 return 0, p.readCloseError()
47 default:
48 }
49
50 select {
51 case bw := <-p.wrCh:
52 nr := copy(b, bw)
53 p.rdCh <- nr
54 return nr, nil
55 case <-p.done:
56 return 0, p.readCloseError()
57 }
58 }
59
60 func (p *pipe) readCloseError() error {
61 rerr := p.rerr.Load()
62 if werr := p.werr.Load(); rerr == nil && werr != nil {
63 return werr
64 }
65 return ErrClosedPipe
66 }
67
68 func (p *pipe) CloseRead(err error) error {
69 if err == nil {
70 err = ErrClosedPipe
71 }
72 p.rerr.Store(err)
73 p.once.Do(func() { close(p.done) })
74 return nil
75 }
76
77 func (p *pipe) Write(b []byte) (n int, err error) {
78 select {
79 case <-p.done:
80 return 0, p.writeCloseError()
81 default:
82 p.wrMu.Lock()
83 defer p.wrMu.Unlock()
84 }
85
86 for once := true; once || len(b) > 0; once = false {
87 select {
88 case p.wrCh <- b:
89 nw := <-p.rdCh
90 b = b[nw:]
91 n += nw
92 case <-p.done:
93 return n, p.writeCloseError()
94 }
95 }
96 return n, nil
97 }
98
99 func (p *pipe) writeCloseError() error {
100 werr := p.werr.Load()
101 if rerr := p.rerr.Load(); werr == nil && rerr != nil {
102 return rerr
103 }
104 return ErrClosedPipe
105 }
106
107 func (p *pipe) CloseWrite(err error) error {
108 if err == nil {
109 err = EOF
110 }
111 p.werr.Store(err)
112 p.once.Do(func() { close(p.done) })
113 return nil
114 }
115
116
117 type PipeReader struct {
118 p *pipe
119 }
120
121
122
123
124
125
126 func (r *PipeReader) Read(data []byte) (n int, err error) {
127 return r.p.Read(data)
128 }
129
130
131
132 func (r *PipeReader) Close() error {
133 return r.CloseWithError(nil)
134 }
135
136
137
138 func (r *PipeReader) CloseWithError(err error) error {
139 return r.p.CloseRead(err)
140 }
141
142
143 type PipeWriter struct {
144 p *pipe
145 }
146
147
148
149
150
151
152 func (w *PipeWriter) Write(data []byte) (n int, err error) {
153 return w.p.Write(data)
154 }
155
156
157
158 func (w *PipeWriter) Close() error {
159 return w.CloseWithError(nil)
160 }
161
162
163
164
165
166
167 func (w *PipeWriter) CloseWithError(err error) error {
168 return w.p.CloseWrite(err)
169 }
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186 func Pipe() (*PipeReader, *PipeWriter) {
187 p := &pipe{
188 wrCh: make(chan []byte),
189 rdCh: make(chan int),
190 done: make(chan struct{}),
191 }
192 return &PipeReader{p}, &PipeWriter{p}
193 }
194
View as plain text