Source file src/net/pipe.go
1
2
3
4
5 package net
6
7 import (
8 "io"
9 "sync"
10 "time"
11 )
12
13
14 type pipeDeadline struct {
15 mu sync.Mutex
16 timer *time.Timer
17 cancel chan struct{}
18 }
19
20 func makePipeDeadline() pipeDeadline {
21 return pipeDeadline{cancel: make(chan struct{})}
22 }
23
24
25
26
27
28
29
30 func (d *pipeDeadline) set(t time.Time) {
31 d.mu.Lock()
32 defer d.mu.Unlock()
33
34 if d.timer != nil && !d.timer.Stop() {
35 <-d.cancel
36 }
37 d.timer = nil
38
39
40 closed := isClosedChan(d.cancel)
41 if t.IsZero() {
42 if closed {
43 d.cancel = make(chan struct{})
44 }
45 return
46 }
47
48
49 if dur := time.Until(t); dur > 0 {
50 if closed {
51 d.cancel = make(chan struct{})
52 }
53 d.timer = time.AfterFunc(dur, func() {
54 close(d.cancel)
55 })
56 return
57 }
58
59
60 if !closed {
61 close(d.cancel)
62 }
63 }
64
65
66 func (d *pipeDeadline) wait() chan struct{} {
67 d.mu.Lock()
68 defer d.mu.Unlock()
69 return d.cancel
70 }
71
72 func isClosedChan(c <-chan struct{}) bool {
73 select {
74 case <-c:
75 return true
76 default:
77 return false
78 }
79 }
80
81 type timeoutError struct{}
82
83 func (timeoutError) Error() string { return "deadline exceeded" }
84 func (timeoutError) Timeout() bool { return true }
85 func (timeoutError) Temporary() bool { return true }
86
87 type pipeAddr struct{}
88
89 func (pipeAddr) Network() string { return "pipe" }
90 func (pipeAddr) String() string { return "pipe" }
91
92 type pipe struct {
93 wrMu sync.Mutex
94
95
96
97 rdRx <-chan []byte
98 rdTx chan<- int
99
100
101
102 wrTx chan<- []byte
103 wrRx <-chan int
104
105 once sync.Once
106 localDone chan struct{}
107 remoteDone <-chan struct{}
108
109 readDeadline pipeDeadline
110 writeDeadline pipeDeadline
111 }
112
113
114
115
116
117
118 func Pipe() (Conn, Conn) {
119 cb1 := make(chan []byte)
120 cb2 := make(chan []byte)
121 cn1 := make(chan int)
122 cn2 := make(chan int)
123 done1 := make(chan struct{})
124 done2 := make(chan struct{})
125
126 p1 := &pipe{
127 rdRx: cb1, rdTx: cn1,
128 wrTx: cb2, wrRx: cn2,
129 localDone: done1, remoteDone: done2,
130 readDeadline: makePipeDeadline(),
131 writeDeadline: makePipeDeadline(),
132 }
133 p2 := &pipe{
134 rdRx: cb2, rdTx: cn2,
135 wrTx: cb1, wrRx: cn1,
136 localDone: done2, remoteDone: done1,
137 readDeadline: makePipeDeadline(),
138 writeDeadline: makePipeDeadline(),
139 }
140 return p1, p2
141 }
142
143 func (*pipe) LocalAddr() Addr { return pipeAddr{} }
144 func (*pipe) RemoteAddr() Addr { return pipeAddr{} }
145
146 func (p *pipe) Read(b []byte) (int, error) {
147 n, err := p.read(b)
148 if err != nil && err != io.EOF && err != io.ErrClosedPipe {
149 err = &OpError{Op: "read", Net: "pipe", Err: err}
150 }
151 return n, err
152 }
153
154 func (p *pipe) read(b []byte) (n int, err error) {
155 switch {
156 case isClosedChan(p.localDone):
157 return 0, io.ErrClosedPipe
158 case isClosedChan(p.remoteDone):
159 return 0, io.EOF
160 case isClosedChan(p.readDeadline.wait()):
161 return 0, timeoutError{}
162 }
163
164 select {
165 case bw := <-p.rdRx:
166 nr := copy(b, bw)
167 p.rdTx <- nr
168 return nr, nil
169 case <-p.localDone:
170 return 0, io.ErrClosedPipe
171 case <-p.remoteDone:
172 return 0, io.EOF
173 case <-p.readDeadline.wait():
174 return 0, timeoutError{}
175 }
176 }
177
178 func (p *pipe) Write(b []byte) (int, error) {
179 n, err := p.write(b)
180 if err != nil && err != io.ErrClosedPipe {
181 err = &OpError{Op: "write", Net: "pipe", Err: err}
182 }
183 return n, err
184 }
185
186 func (p *pipe) write(b []byte) (n int, err error) {
187 switch {
188 case isClosedChan(p.localDone):
189 return 0, io.ErrClosedPipe
190 case isClosedChan(p.remoteDone):
191 return 0, io.ErrClosedPipe
192 case isClosedChan(p.writeDeadline.wait()):
193 return 0, timeoutError{}
194 }
195
196 p.wrMu.Lock()
197 defer p.wrMu.Unlock()
198 for once := true; once || len(b) > 0; once = false {
199 select {
200 case p.wrTx <- b:
201 nw := <-p.wrRx
202 b = b[nw:]
203 n += nw
204 case <-p.localDone:
205 return n, io.ErrClosedPipe
206 case <-p.remoteDone:
207 return n, io.ErrClosedPipe
208 case <-p.writeDeadline.wait():
209 return n, timeoutError{}
210 }
211 }
212 return n, nil
213 }
214
215 func (p *pipe) SetDeadline(t time.Time) error {
216 if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) {
217 return io.ErrClosedPipe
218 }
219 p.readDeadline.set(t)
220 p.writeDeadline.set(t)
221 return nil
222 }
223
224 func (p *pipe) SetReadDeadline(t time.Time) error {
225 if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) {
226 return io.ErrClosedPipe
227 }
228 p.readDeadline.set(t)
229 return nil
230 }
231
232 func (p *pipe) SetWriteDeadline(t time.Time) error {
233 if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) {
234 return io.ErrClosedPipe
235 }
236 p.writeDeadline.set(t)
237 return nil
238 }
239
240 func (p *pipe) Close() error {
241 p.once.Do(func() { close(p.localDone) })
242 return nil
243 }
244
View as plain text