...

Source file src/io/pipe.go

     1	// Copyright 2009 The Go Authors. All rights reserved.
     2	// Use of this source code is governed by a BSD-style
     3	// license that can be found in the LICENSE file.
     4	
     5	// Pipe adapter to connect code expecting an io.Reader
     6	// with code expecting an io.Writer.
     7	
     8	package io
     9	
    10	import (
    11		"errors"
    12		"sync"
    13		"sync/atomic"
    14	)
    15	
    16	// atomicError is a type-safe atomic value for errors.
    17	// We use a struct{ error } to ensure consistent use of a concrete type.
    18	type atomicError struct{ v atomic.Value }
    19	
    20	func (a *atomicError) Store(err error) {
    21		a.v.Store(struct{ error }{err})
    22	}
    23	func (a *atomicError) Load() error {
    24		err, _ := a.v.Load().(struct{ error })
    25		return err.error
    26	}
    27	
    28	// ErrClosedPipe is the error used for read or write operations on a closed pipe.
    29	var ErrClosedPipe = errors.New("io: read/write on closed pipe")
    30	
    31	// A pipe is the shared pipe structure underlying PipeReader and PipeWriter.
    32	type pipe struct {
    33		wrMu sync.Mutex // Serializes Write operations
    34		wrCh chan []byte
    35		rdCh chan int
    36	
    37		once sync.Once // Protects closing done
    38		done chan struct{}
    39		rerr atomicError
    40		werr atomicError
    41	}
    42	
    43	func (p *pipe) Read(b []byte) (n int, err error) {
    44		select {
    45		case <-p.done:
    46			return 0, p.readCloseError()
    47		default:
    48		}
    49	
    50		select {
    51		case bw := <-p.wrCh:
    52			nr := copy(b, bw)
    53			p.rdCh <- nr
    54			return nr, nil
    55		case <-p.done:
    56			return 0, p.readCloseError()
    57		}
    58	}
    59	
    60	func (p *pipe) readCloseError() error {
    61		rerr := p.rerr.Load()
    62		if werr := p.werr.Load(); rerr == nil && werr != nil {
    63			return werr
    64		}
    65		return ErrClosedPipe
    66	}
    67	
    68	func (p *pipe) CloseRead(err error) error {
    69		if err == nil {
    70			err = ErrClosedPipe
    71		}
    72		p.rerr.Store(err)
    73		p.once.Do(func() { close(p.done) })
    74		return nil
    75	}
    76	
    77	func (p *pipe) Write(b []byte) (n int, err error) {
    78		select {
    79		case <-p.done:
    80			return 0, p.writeCloseError()
    81		default:
    82			p.wrMu.Lock()
    83			defer p.wrMu.Unlock()
    84		}
    85	
    86		for once := true; once || len(b) > 0; once = false {
    87			select {
    88			case p.wrCh <- b:
    89				nw := <-p.rdCh
    90				b = b[nw:]
    91				n += nw
    92			case <-p.done:
    93				return n, p.writeCloseError()
    94			}
    95		}
    96		return n, nil
    97	}
    98	
    99	func (p *pipe) writeCloseError() error {
   100		werr := p.werr.Load()
   101		if rerr := p.rerr.Load(); werr == nil && rerr != nil {
   102			return rerr
   103		}
   104		return ErrClosedPipe
   105	}
   106	
   107	func (p *pipe) CloseWrite(err error) error {
   108		if err == nil {
   109			err = EOF
   110		}
   111		p.werr.Store(err)
   112		p.once.Do(func() { close(p.done) })
   113		return nil
   114	}
   115	
   116	// A PipeReader is the read half of a pipe.
   117	type PipeReader struct {
   118		p *pipe
   119	}
   120	
   121	// Read implements the standard Read interface:
   122	// it reads data from the pipe, blocking until a writer
   123	// arrives or the write end is closed.
   124	// If the write end is closed with an error, that error is
   125	// returned as err; otherwise err is EOF.
   126	func (r *PipeReader) Read(data []byte) (n int, err error) {
   127		return r.p.Read(data)
   128	}
   129	
   130	// Close closes the reader; subsequent writes to the
   131	// write half of the pipe will return the error ErrClosedPipe.
   132	func (r *PipeReader) Close() error {
   133		return r.CloseWithError(nil)
   134	}
   135	
   136	// CloseWithError closes the reader; subsequent writes
   137	// to the write half of the pipe will return the error err.
   138	func (r *PipeReader) CloseWithError(err error) error {
   139		return r.p.CloseRead(err)
   140	}
   141	
   142	// A PipeWriter is the write half of a pipe.
   143	type PipeWriter struct {
   144		p *pipe
   145	}
   146	
   147	// Write implements the standard Write interface:
   148	// it writes data to the pipe, blocking until one or more readers
   149	// have consumed all the data or the read end is closed.
   150	// If the read end is closed with an error, that err is
   151	// returned as err; otherwise err is ErrClosedPipe.
   152	func (w *PipeWriter) Write(data []byte) (n int, err error) {
   153		return w.p.Write(data)
   154	}
   155	
   156	// Close closes the writer; subsequent reads from the
   157	// read half of the pipe will return no bytes and EOF.
   158	func (w *PipeWriter) Close() error {
   159		return w.CloseWithError(nil)
   160	}
   161	
   162	// CloseWithError closes the writer; subsequent reads from the
   163	// read half of the pipe will return no bytes and the error err,
   164	// or EOF if err is nil.
   165	//
   166	// CloseWithError always returns nil.
   167	func (w *PipeWriter) CloseWithError(err error) error {
   168		return w.p.CloseWrite(err)
   169	}
   170	
   171	// Pipe creates a synchronous in-memory pipe.
   172	// It can be used to connect code expecting an io.Reader
   173	// with code expecting an io.Writer.
   174	//
   175	// Reads and Writes on the pipe are matched one to one
   176	// except when multiple Reads are needed to consume a single Write.
   177	// That is, each Write to the PipeWriter blocks until it has satisfied
   178	// one or more Reads from the PipeReader that fully consume
   179	// the written data.
   180	// The data is copied directly from the Write to the corresponding
   181	// Read (or Reads); there is no internal buffering.
   182	//
   183	// It is safe to call Read and Write in parallel with each other or with Close.
   184	// Parallel calls to Read and parallel calls to Write are also safe:
   185	// the individual calls will be gated sequentially.
   186	func Pipe() (*PipeReader, *PipeWriter) {
   187		p := &pipe{
   188			wrCh: make(chan []byte),
   189			rdCh: make(chan int),
   190			done: make(chan struct{}),
   191		}
   192		return &PipeReader{p}, &PipeWriter{p}
   193	}
   194	

View as plain text