...

Source file src/pkg/net/rpc/client.go

     1	// Copyright 2009 The Go Authors. All rights reserved.
     2	// Use of this source code is governed by a BSD-style
     3	// license that can be found in the LICENSE file.
     4	
     5	package rpc
     6	
     7	import (
     8		"bufio"
     9		"encoding/gob"
    10		"errors"
    11		"io"
    12		"log"
    13		"net"
    14		"net/http"
    15		"sync"
    16	)
    17	
    18	// ServerError represents an error that has been returned from
    19	// the remote side of the RPC connection.
    20	type ServerError string
    21	
    22	func (e ServerError) Error() string {
    23		return string(e)
    24	}
    25	
    26	var ErrShutdown = errors.New("connection is shut down")
    27	
    28	// Call represents an active RPC.
    29	type Call struct {
    30		ServiceMethod string      // The name of the service and method to call.
    31		Args          interface{} // The argument to the function (*struct).
    32		Reply         interface{} // The reply from the function (*struct).
    33		Error         error       // After completion, the error status.
    34		Done          chan *Call  // Strobes when call is complete.
    35	}
    36	
    37	// Client represents an RPC Client.
    38	// There may be multiple outstanding Calls associated
    39	// with a single Client, and a Client may be used by
    40	// multiple goroutines simultaneously.
    41	type Client struct {
    42		codec ClientCodec
    43	
    44		reqMutex sync.Mutex // protects following
    45		request  Request
    46	
    47		mutex    sync.Mutex // protects following
    48		seq      uint64
    49		pending  map[uint64]*Call
    50		closing  bool // user has called Close
    51		shutdown bool // server has told us to stop
    52	}
    53	
    54	// A ClientCodec implements writing of RPC requests and
    55	// reading of RPC responses for the client side of an RPC session.
    56	// The client calls WriteRequest to write a request to the connection
    57	// and calls ReadResponseHeader and ReadResponseBody in pairs
    58	// to read responses. The client calls Close when finished with the
    59	// connection. ReadResponseBody may be called with a nil
    60	// argument to force the body of the response to be read and then
    61	// discarded.
    62	// See NewClient's comment for information about concurrent access.
    63	type ClientCodec interface {
    64		WriteRequest(*Request, interface{}) error
    65		ReadResponseHeader(*Response) error
    66		ReadResponseBody(interface{}) error
    67	
    68		Close() error
    69	}
    70	
    71	func (client *Client) send(call *Call) {
    72		client.reqMutex.Lock()
    73		defer client.reqMutex.Unlock()
    74	
    75		// Register this call.
    76		client.mutex.Lock()
    77		if client.shutdown || client.closing {
    78			client.mutex.Unlock()
    79			call.Error = ErrShutdown
    80			call.done()
    81			return
    82		}
    83		seq := client.seq
    84		client.seq++
    85		client.pending[seq] = call
    86		client.mutex.Unlock()
    87	
    88		// Encode and send the request.
    89		client.request.Seq = seq
    90		client.request.ServiceMethod = call.ServiceMethod
    91		err := client.codec.WriteRequest(&client.request, call.Args)
    92		if err != nil {
    93			client.mutex.Lock()
    94			call = client.pending[seq]
    95			delete(client.pending, seq)
    96			client.mutex.Unlock()
    97			if call != nil {
    98				call.Error = err
    99				call.done()
   100			}
   101		}
   102	}
   103	
   104	func (client *Client) input() {
   105		var err error
   106		var response Response
   107		for err == nil {
   108			response = Response{}
   109			err = client.codec.ReadResponseHeader(&response)
   110			if err != nil {
   111				break
   112			}
   113			seq := response.Seq
   114			client.mutex.Lock()
   115			call := client.pending[seq]
   116			delete(client.pending, seq)
   117			client.mutex.Unlock()
   118	
   119			switch {
   120			case call == nil:
   121				// We've got no pending call. That usually means that
   122				// WriteRequest partially failed, and call was already
   123				// removed; response is a server telling us about an
   124				// error reading request body. We should still attempt
   125				// to read error body, but there's no one to give it to.
   126				err = client.codec.ReadResponseBody(nil)
   127				if err != nil {
   128					err = errors.New("reading error body: " + err.Error())
   129				}
   130			case response.Error != "":
   131				// We've got an error response. Give this to the request;
   132				// any subsequent requests will get the ReadResponseBody
   133				// error if there is one.
   134				call.Error = ServerError(response.Error)
   135				err = client.codec.ReadResponseBody(nil)
   136				if err != nil {
   137					err = errors.New("reading error body: " + err.Error())
   138				}
   139				call.done()
   140			default:
   141				err = client.codec.ReadResponseBody(call.Reply)
   142				if err != nil {
   143					call.Error = errors.New("reading body " + err.Error())
   144				}
   145				call.done()
   146			}
   147		}
   148		// Terminate pending calls.
   149		client.reqMutex.Lock()
   150		client.mutex.Lock()
   151		client.shutdown = true
   152		closing := client.closing
   153		if err == io.EOF {
   154			if closing {
   155				err = ErrShutdown
   156			} else {
   157				err = io.ErrUnexpectedEOF
   158			}
   159		}
   160		for _, call := range client.pending {
   161			call.Error = err
   162			call.done()
   163		}
   164		client.mutex.Unlock()
   165		client.reqMutex.Unlock()
   166		if debugLog && err != io.EOF && !closing {
   167			log.Println("rpc: client protocol error:", err)
   168		}
   169	}
   170	
   171	func (call *Call) done() {
   172		select {
   173		case call.Done <- call:
   174			// ok
   175		default:
   176			// We don't want to block here. It is the caller's responsibility to make
   177			// sure the channel has enough buffer space. See comment in Go().
   178			if debugLog {
   179				log.Println("rpc: discarding Call reply due to insufficient Done chan capacity")
   180			}
   181		}
   182	}
   183	
   184	// NewClient returns a new Client to handle requests to the
   185	// set of services at the other end of the connection.
   186	// It adds a buffer to the write side of the connection so
   187	// the header and payload are sent as a unit.
   188	//
   189	// The read and write halves of the connection are serialized independently,
   190	// so no interlocking is required. However each half may be accessed
   191	// concurrently so the implementation of conn should protect against
   192	// concurrent reads or concurrent writes.
   193	func NewClient(conn io.ReadWriteCloser) *Client {
   194		encBuf := bufio.NewWriter(conn)
   195		client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
   196		return NewClientWithCodec(client)
   197	}
   198	
   199	// NewClientWithCodec is like NewClient but uses the specified
   200	// codec to encode requests and decode responses.
   201	func NewClientWithCodec(codec ClientCodec) *Client {
   202		client := &Client{
   203			codec:   codec,
   204			pending: make(map[uint64]*Call),
   205		}
   206		go client.input()
   207		return client
   208	}
   209	
   210	type gobClientCodec struct {
   211		rwc    io.ReadWriteCloser
   212		dec    *gob.Decoder
   213		enc    *gob.Encoder
   214		encBuf *bufio.Writer
   215	}
   216	
   217	func (c *gobClientCodec) WriteRequest(r *Request, body interface{}) (err error) {
   218		if err = c.enc.Encode(r); err != nil {
   219			return
   220		}
   221		if err = c.enc.Encode(body); err != nil {
   222			return
   223		}
   224		return c.encBuf.Flush()
   225	}
   226	
   227	func (c *gobClientCodec) ReadResponseHeader(r *Response) error {
   228		return c.dec.Decode(r)
   229	}
   230	
   231	func (c *gobClientCodec) ReadResponseBody(body interface{}) error {
   232		return c.dec.Decode(body)
   233	}
   234	
   235	func (c *gobClientCodec) Close() error {
   236		return c.rwc.Close()
   237	}
   238	
   239	// DialHTTP connects to an HTTP RPC server at the specified network address
   240	// listening on the default HTTP RPC path.
   241	func DialHTTP(network, address string) (*Client, error) {
   242		return DialHTTPPath(network, address, DefaultRPCPath)
   243	}
   244	
   245	// DialHTTPPath connects to an HTTP RPC server
   246	// at the specified network address and path.
   247	func DialHTTPPath(network, address, path string) (*Client, error) {
   248		var err error
   249		conn, err := net.Dial(network, address)
   250		if err != nil {
   251			return nil, err
   252		}
   253		io.WriteString(conn, "CONNECT "+path+" HTTP/1.0\n\n")
   254	
   255		// Require successful HTTP response
   256		// before switching to RPC protocol.
   257		resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"})
   258		if err == nil && resp.Status == connected {
   259			return NewClient(conn), nil
   260		}
   261		if err == nil {
   262			err = errors.New("unexpected HTTP response: " + resp.Status)
   263		}
   264		conn.Close()
   265		return nil, &net.OpError{
   266			Op:   "dial-http",
   267			Net:  network + " " + address,
   268			Addr: nil,
   269			Err:  err,
   270		}
   271	}
   272	
   273	// Dial connects to an RPC server at the specified network address.
   274	func Dial(network, address string) (*Client, error) {
   275		conn, err := net.Dial(network, address)
   276		if err != nil {
   277			return nil, err
   278		}
   279		return NewClient(conn), nil
   280	}
   281	
   282	// Close calls the underlying codec's Close method. If the connection is already
   283	// shutting down, ErrShutdown is returned.
   284	func (client *Client) Close() error {
   285		client.mutex.Lock()
   286		if client.closing {
   287			client.mutex.Unlock()
   288			return ErrShutdown
   289		}
   290		client.closing = true
   291		client.mutex.Unlock()
   292		return client.codec.Close()
   293	}
   294	
   295	// Go invokes the function asynchronously. It returns the Call structure representing
   296	// the invocation. The done channel will signal when the call is complete by returning
   297	// the same Call object. If done is nil, Go will allocate a new channel.
   298	// If non-nil, done must be buffered or Go will deliberately crash.
   299	func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
   300		call := new(Call)
   301		call.ServiceMethod = serviceMethod
   302		call.Args = args
   303		call.Reply = reply
   304		if done == nil {
   305			done = make(chan *Call, 10) // buffered.
   306		} else {
   307			// If caller passes done != nil, it must arrange that
   308			// done has enough buffer for the number of simultaneous
   309			// RPCs that will be using that channel. If the channel
   310			// is totally unbuffered, it's best not to run at all.
   311			if cap(done) == 0 {
   312				log.Panic("rpc: done channel is unbuffered")
   313			}
   314		}
   315		call.Done = done
   316		client.send(call)
   317		return call
   318	}
   319	
   320	// Call invokes the named function, waits for it to complete, and returns its error status.
   321	func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
   322		call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
   323		return call.Error
   324	}
   325	

View as plain text