...

Source file src/net/pipe.go

     1	// Copyright 2010 The Go Authors. All rights reserved.
     2	// Use of this source code is governed by a BSD-style
     3	// license that can be found in the LICENSE file.
     4	
     5	package net
     6	
     7	import (
     8		"io"
     9		"sync"
    10		"time"
    11	)
    12	
    13	// pipeDeadline is an abstraction for handling timeouts.
    14	type pipeDeadline struct {
    15		mu     sync.Mutex // Guards timer and cancel
    16		timer  *time.Timer
    17		cancel chan struct{} // Must be non-nil
    18	}
    19	
    20	func makePipeDeadline() pipeDeadline {
    21		return pipeDeadline{cancel: make(chan struct{})}
    22	}
    23	
    24	// set sets the point in time when the deadline will time out.
    25	// A timeout event is signaled by closing the channel returned by waiter.
    26	// Once a timeout has occurred, the deadline can be refreshed by specifying a
    27	// t value in the future.
    28	//
    29	// A zero value for t prevents timeout.
    30	func (d *pipeDeadline) set(t time.Time) {
    31		d.mu.Lock()
    32		defer d.mu.Unlock()
    33	
    34		if d.timer != nil && !d.timer.Stop() {
    35			<-d.cancel // Wait for the timer callback to finish and close cancel
    36		}
    37		d.timer = nil
    38	
    39		// Time is zero, then there is no deadline.
    40		closed := isClosedChan(d.cancel)
    41		if t.IsZero() {
    42			if closed {
    43				d.cancel = make(chan struct{})
    44			}
    45			return
    46		}
    47	
    48		// Time in the future, setup a timer to cancel in the future.
    49		if dur := time.Until(t); dur > 0 {
    50			if closed {
    51				d.cancel = make(chan struct{})
    52			}
    53			d.timer = time.AfterFunc(dur, func() {
    54				close(d.cancel)
    55			})
    56			return
    57		}
    58	
    59		// Time in the past, so close immediately.
    60		if !closed {
    61			close(d.cancel)
    62		}
    63	}
    64	
    65	// wait returns a channel that is closed when the deadline is exceeded.
    66	func (d *pipeDeadline) wait() chan struct{} {
    67		d.mu.Lock()
    68		defer d.mu.Unlock()
    69		return d.cancel
    70	}
    71	
    72	func isClosedChan(c <-chan struct{}) bool {
    73		select {
    74		case <-c:
    75			return true
    76		default:
    77			return false
    78		}
    79	}
    80	
    81	type timeoutError struct{}
    82	
    83	func (timeoutError) Error() string   { return "deadline exceeded" }
    84	func (timeoutError) Timeout() bool   { return true }
    85	func (timeoutError) Temporary() bool { return true }
    86	
    87	type pipeAddr struct{}
    88	
    89	func (pipeAddr) Network() string { return "pipe" }
    90	func (pipeAddr) String() string  { return "pipe" }
    91	
    92	type pipe struct {
    93		wrMu sync.Mutex // Serialize Write operations
    94	
    95		// Used by local Read to interact with remote Write.
    96		// Successful receive on rdRx is always followed by send on rdTx.
    97		rdRx <-chan []byte
    98		rdTx chan<- int
    99	
   100		// Used by local Write to interact with remote Read.
   101		// Successful send on wrTx is always followed by receive on wrRx.
   102		wrTx chan<- []byte
   103		wrRx <-chan int
   104	
   105		once       sync.Once // Protects closing localDone
   106		localDone  chan struct{}
   107		remoteDone <-chan struct{}
   108	
   109		readDeadline  pipeDeadline
   110		writeDeadline pipeDeadline
   111	}
   112	
   113	// Pipe creates a synchronous, in-memory, full duplex
   114	// network connection; both ends implement the Conn interface.
   115	// Reads on one end are matched with writes on the other,
   116	// copying data directly between the two; there is no internal
   117	// buffering.
   118	func Pipe() (Conn, Conn) {
   119		cb1 := make(chan []byte)
   120		cb2 := make(chan []byte)
   121		cn1 := make(chan int)
   122		cn2 := make(chan int)
   123		done1 := make(chan struct{})
   124		done2 := make(chan struct{})
   125	
   126		p1 := &pipe{
   127			rdRx: cb1, rdTx: cn1,
   128			wrTx: cb2, wrRx: cn2,
   129			localDone: done1, remoteDone: done2,
   130			readDeadline:  makePipeDeadline(),
   131			writeDeadline: makePipeDeadline(),
   132		}
   133		p2 := &pipe{
   134			rdRx: cb2, rdTx: cn2,
   135			wrTx: cb1, wrRx: cn1,
   136			localDone: done2, remoteDone: done1,
   137			readDeadline:  makePipeDeadline(),
   138			writeDeadline: makePipeDeadline(),
   139		}
   140		return p1, p2
   141	}
   142	
   143	func (*pipe) LocalAddr() Addr  { return pipeAddr{} }
   144	func (*pipe) RemoteAddr() Addr { return pipeAddr{} }
   145	
   146	func (p *pipe) Read(b []byte) (int, error) {
   147		n, err := p.read(b)
   148		if err != nil && err != io.EOF && err != io.ErrClosedPipe {
   149			err = &OpError{Op: "read", Net: "pipe", Err: err}
   150		}
   151		return n, err
   152	}
   153	
   154	func (p *pipe) read(b []byte) (n int, err error) {
   155		switch {
   156		case isClosedChan(p.localDone):
   157			return 0, io.ErrClosedPipe
   158		case isClosedChan(p.remoteDone):
   159			return 0, io.EOF
   160		case isClosedChan(p.readDeadline.wait()):
   161			return 0, timeoutError{}
   162		}
   163	
   164		select {
   165		case bw := <-p.rdRx:
   166			nr := copy(b, bw)
   167			p.rdTx <- nr
   168			return nr, nil
   169		case <-p.localDone:
   170			return 0, io.ErrClosedPipe
   171		case <-p.remoteDone:
   172			return 0, io.EOF
   173		case <-p.readDeadline.wait():
   174			return 0, timeoutError{}
   175		}
   176	}
   177	
   178	func (p *pipe) Write(b []byte) (int, error) {
   179		n, err := p.write(b)
   180		if err != nil && err != io.ErrClosedPipe {
   181			err = &OpError{Op: "write", Net: "pipe", Err: err}
   182		}
   183		return n, err
   184	}
   185	
   186	func (p *pipe) write(b []byte) (n int, err error) {
   187		switch {
   188		case isClosedChan(p.localDone):
   189			return 0, io.ErrClosedPipe
   190		case isClosedChan(p.remoteDone):
   191			return 0, io.ErrClosedPipe
   192		case isClosedChan(p.writeDeadline.wait()):
   193			return 0, timeoutError{}
   194		}
   195	
   196		p.wrMu.Lock() // Ensure entirety of b is written together
   197		defer p.wrMu.Unlock()
   198		for once := true; once || len(b) > 0; once = false {
   199			select {
   200			case p.wrTx <- b:
   201				nw := <-p.wrRx
   202				b = b[nw:]
   203				n += nw
   204			case <-p.localDone:
   205				return n, io.ErrClosedPipe
   206			case <-p.remoteDone:
   207				return n, io.ErrClosedPipe
   208			case <-p.writeDeadline.wait():
   209				return n, timeoutError{}
   210			}
   211		}
   212		return n, nil
   213	}
   214	
   215	func (p *pipe) SetDeadline(t time.Time) error {
   216		if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) {
   217			return io.ErrClosedPipe
   218		}
   219		p.readDeadline.set(t)
   220		p.writeDeadline.set(t)
   221		return nil
   222	}
   223	
   224	func (p *pipe) SetReadDeadline(t time.Time) error {
   225		if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) {
   226			return io.ErrClosedPipe
   227		}
   228		p.readDeadline.set(t)
   229		return nil
   230	}
   231	
   232	func (p *pipe) SetWriteDeadline(t time.Time) error {
   233		if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) {
   234			return io.ErrClosedPipe
   235		}
   236		p.writeDeadline.set(t)
   237		return nil
   238	}
   239	
   240	func (p *pipe) Close() error {
   241		p.once.Do(func() { close(p.localDone) })
   242		return nil
   243	}
   244	

View as plain text