...

Source file src/internal/poll/fd_windows.go

     1	// Copyright 2017 The Go Authors. All rights reserved.
     2	// Use of this source code is governed by a BSD-style
     3	// license that can be found in the LICENSE file.
     4	
     5	package poll
     6	
     7	import (
     8		"errors"
     9		"internal/race"
    10		"internal/syscall/windows"
    11		"io"
    12		"runtime"
    13		"sync"
    14		"syscall"
    15		"unicode/utf16"
    16		"unicode/utf8"
    17		"unsafe"
    18	)
    19	
    20	var (
    21		initErr error
    22		ioSync  uint64
    23	)
    24	
    25	// CancelIo Windows API cancels all outstanding IO for a particular
    26	// socket on current thread. To overcome that limitation, we run
    27	// special goroutine, locked to OS single thread, that both starts
    28	// and cancels IO. It means, there are 2 unavoidable thread switches
    29	// for every IO.
    30	// Some newer versions of Windows has new CancelIoEx API, that does
    31	// not have that limitation and can be used from any thread. This
    32	// package uses CancelIoEx API, if present, otherwise it fallback
    33	// to CancelIo.
    34	
    35	var canCancelIO bool // determines if CancelIoEx API is present
    36	
    37	// This package uses the SetFileCompletionNotificationModes Windows
    38	// API to skip calling GetQueuedCompletionStatus if an IO operation
    39	// completes synchronously. There is a known bug where
    40	// SetFileCompletionNotificationModes crashes on some systems (see
    41	// https://support.microsoft.com/kb/2568167 for details).
    42	
    43	var useSetFileCompletionNotificationModes bool // determines is SetFileCompletionNotificationModes is present and safe to use
    44	
    45	// checkSetFileCompletionNotificationModes verifies that
    46	// SetFileCompletionNotificationModes Windows API is present
    47	// on the system and is safe to use.
    48	// See https://support.microsoft.com/kb/2568167 for details.
    49	func checkSetFileCompletionNotificationModes() {
    50		err := syscall.LoadSetFileCompletionNotificationModes()
    51		if err != nil {
    52			return
    53		}
    54		protos := [2]int32{syscall.IPPROTO_TCP, 0}
    55		var buf [32]syscall.WSAProtocolInfo
    56		len := uint32(unsafe.Sizeof(buf))
    57		n, err := syscall.WSAEnumProtocols(&protos[0], &buf[0], &len)
    58		if err != nil {
    59			return
    60		}
    61		for i := int32(0); i < n; i++ {
    62			if buf[i].ServiceFlags1&syscall.XP1_IFS_HANDLES == 0 {
    63				return
    64			}
    65		}
    66		useSetFileCompletionNotificationModes = true
    67	}
    68	
    69	func init() {
    70		var d syscall.WSAData
    71		e := syscall.WSAStartup(uint32(0x202), &d)
    72		if e != nil {
    73			initErr = e
    74		}
    75		canCancelIO = syscall.LoadCancelIoEx() == nil
    76		checkSetFileCompletionNotificationModes()
    77	}
    78	
    79	// operation contains superset of data necessary to perform all async IO.
    80	type operation struct {
    81		// Used by IOCP interface, it must be first field
    82		// of the struct, as our code rely on it.
    83		o syscall.Overlapped
    84	
    85		// fields used by runtime.netpoll
    86		runtimeCtx uintptr
    87		mode       int32
    88		errno      int32
    89		qty        uint32
    90	
    91		// fields used only by net package
    92		fd     *FD
    93		errc   chan error
    94		buf    syscall.WSABuf
    95		msg    windows.WSAMsg
    96		sa     syscall.Sockaddr
    97		rsa    *syscall.RawSockaddrAny
    98		rsan   int32
    99		handle syscall.Handle
   100		flags  uint32
   101		bufs   []syscall.WSABuf
   102	}
   103	
   104	func (o *operation) InitBuf(buf []byte) {
   105		o.buf.Len = uint32(len(buf))
   106		o.buf.Buf = nil
   107		if len(buf) != 0 {
   108			o.buf.Buf = &buf[0]
   109		}
   110	}
   111	
   112	func (o *operation) InitBufs(buf *[][]byte) {
   113		if o.bufs == nil {
   114			o.bufs = make([]syscall.WSABuf, 0, len(*buf))
   115		} else {
   116			o.bufs = o.bufs[:0]
   117		}
   118		for _, b := range *buf {
   119			if len(b) == 0 {
   120				o.bufs = append(o.bufs, syscall.WSABuf{})
   121				continue
   122			}
   123			for len(b) > maxRW {
   124				o.bufs = append(o.bufs, syscall.WSABuf{Len: maxRW, Buf: &b[0]})
   125				b = b[maxRW:]
   126			}
   127			if len(b) > 0 {
   128				o.bufs = append(o.bufs, syscall.WSABuf{Len: uint32(len(b)), Buf: &b[0]})
   129			}
   130		}
   131	}
   132	
   133	// ClearBufs clears all pointers to Buffers parameter captured
   134	// by InitBufs, so it can be released by garbage collector.
   135	func (o *operation) ClearBufs() {
   136		for i := range o.bufs {
   137			o.bufs[i].Buf = nil
   138		}
   139		o.bufs = o.bufs[:0]
   140	}
   141	
   142	func (o *operation) InitMsg(p []byte, oob []byte) {
   143		o.InitBuf(p)
   144		o.msg.Buffers = &o.buf
   145		o.msg.BufferCount = 1
   146	
   147		o.msg.Name = nil
   148		o.msg.Namelen = 0
   149	
   150		o.msg.Flags = 0
   151		o.msg.Control.Len = uint32(len(oob))
   152		o.msg.Control.Buf = nil
   153		if len(oob) != 0 {
   154			o.msg.Control.Buf = &oob[0]
   155		}
   156	}
   157	
   158	// ioSrv executes net IO requests.
   159	type ioSrv struct {
   160		req chan ioSrvReq
   161	}
   162	
   163	type ioSrvReq struct {
   164		o      *operation
   165		submit func(o *operation) error // if nil, cancel the operation
   166	}
   167	
   168	// ProcessRemoteIO will execute submit IO requests on behalf
   169	// of other goroutines, all on a single os thread, so it can
   170	// cancel them later. Results of all operations will be sent
   171	// back to their requesters via channel supplied in request.
   172	// It is used only when the CancelIoEx API is unavailable.
   173	func (s *ioSrv) ProcessRemoteIO() {
   174		runtime.LockOSThread()
   175		defer runtime.UnlockOSThread()
   176		for r := range s.req {
   177			if r.submit != nil {
   178				r.o.errc <- r.submit(r.o)
   179			} else {
   180				r.o.errc <- syscall.CancelIo(r.o.fd.Sysfd)
   181			}
   182		}
   183	}
   184	
   185	// ExecIO executes a single IO operation o. It submits and cancels
   186	// IO in the current thread for systems where Windows CancelIoEx API
   187	// is available. Alternatively, it passes the request onto
   188	// runtime netpoll and waits for completion or cancels request.
   189	func (s *ioSrv) ExecIO(o *operation, submit func(o *operation) error) (int, error) {
   190		if o.fd.pd.runtimeCtx == 0 {
   191			return 0, errors.New("internal error: polling on unsupported descriptor type")
   192		}
   193	
   194		if !canCancelIO {
   195			onceStartServer.Do(startServer)
   196		}
   197	
   198		fd := o.fd
   199		// Notify runtime netpoll about starting IO.
   200		err := fd.pd.prepare(int(o.mode), fd.isFile)
   201		if err != nil {
   202			return 0, err
   203		}
   204		// Start IO.
   205		if canCancelIO {
   206			err = submit(o)
   207		} else {
   208			// Send request to a special dedicated thread,
   209			// so it can stop the IO with CancelIO later.
   210			s.req <- ioSrvReq{o, submit}
   211			err = <-o.errc
   212		}
   213		switch err {
   214		case nil:
   215			// IO completed immediately
   216			if o.fd.skipSyncNotif {
   217				// No completion message will follow, so return immediately.
   218				return int(o.qty), nil
   219			}
   220			// Need to get our completion message anyway.
   221		case syscall.ERROR_IO_PENDING:
   222			// IO started, and we have to wait for its completion.
   223			err = nil
   224		default:
   225			return 0, err
   226		}
   227		// Wait for our request to complete.
   228		err = fd.pd.wait(int(o.mode), fd.isFile)
   229		if err == nil {
   230			// All is good. Extract our IO results and return.
   231			if o.errno != 0 {
   232				err = syscall.Errno(o.errno)
   233				// More data available. Return back the size of received data.
   234				if err == syscall.ERROR_MORE_DATA || err == windows.WSAEMSGSIZE {
   235					return int(o.qty), err
   236				}
   237				return 0, err
   238			}
   239			return int(o.qty), nil
   240		}
   241		// IO is interrupted by "close" or "timeout"
   242		netpollErr := err
   243		switch netpollErr {
   244		case ErrNetClosing, ErrFileClosing, ErrTimeout:
   245			// will deal with those.
   246		default:
   247			panic("unexpected runtime.netpoll error: " + netpollErr.Error())
   248		}
   249		// Cancel our request.
   250		if canCancelIO {
   251			err := syscall.CancelIoEx(fd.Sysfd, &o.o)
   252			// Assuming ERROR_NOT_FOUND is returned, if IO is completed.
   253			if err != nil && err != syscall.ERROR_NOT_FOUND {
   254				// TODO(brainman): maybe do something else, but panic.
   255				panic(err)
   256			}
   257		} else {
   258			s.req <- ioSrvReq{o, nil}
   259			<-o.errc
   260		}
   261		// Wait for cancellation to complete.
   262		fd.pd.waitCanceled(int(o.mode))
   263		if o.errno != 0 {
   264			err = syscall.Errno(o.errno)
   265			if err == syscall.ERROR_OPERATION_ABORTED { // IO Canceled
   266				err = netpollErr
   267			}
   268			return 0, err
   269		}
   270		// We issued a cancellation request. But, it seems, IO operation succeeded
   271		// before the cancellation request run. We need to treat the IO operation as
   272		// succeeded (the bytes are actually sent/recv from network).
   273		return int(o.qty), nil
   274	}
   275	
   276	// Start helper goroutines.
   277	var rsrv, wsrv ioSrv
   278	var onceStartServer sync.Once
   279	
   280	func startServer() {
   281		// This is called, once, when only the CancelIo API is available.
   282		// Start two special goroutines, both locked to an OS thread,
   283		// that start and cancel IO requests.
   284		// One will process read requests, while the other will do writes.
   285		rsrv.req = make(chan ioSrvReq)
   286		go rsrv.ProcessRemoteIO()
   287		wsrv.req = make(chan ioSrvReq)
   288		go wsrv.ProcessRemoteIO()
   289	}
   290	
   291	// FD is a file descriptor. The net and os packages embed this type in
   292	// a larger type representing a network connection or OS file.
   293	type FD struct {
   294		// Lock sysfd and serialize access to Read and Write methods.
   295		fdmu fdMutex
   296	
   297		// System file descriptor. Immutable until Close.
   298		Sysfd syscall.Handle
   299	
   300		// Read operation.
   301		rop operation
   302		// Write operation.
   303		wop operation
   304	
   305		// I/O poller.
   306		pd pollDesc
   307	
   308		// Used to implement pread/pwrite.
   309		l sync.Mutex
   310	
   311		// For console I/O.
   312		lastbits       []byte   // first few bytes of the last incomplete rune in last write
   313		readuint16     []uint16 // buffer to hold uint16s obtained with ReadConsole
   314		readbyte       []byte   // buffer to hold decoding of readuint16 from utf16 to utf8
   315		readbyteOffset int      // readbyte[readOffset:] is yet to be consumed with file.Read
   316	
   317		// Semaphore signaled when file is closed.
   318		csema uint32
   319	
   320		skipSyncNotif bool
   321	
   322		// Whether this is a streaming descriptor, as opposed to a
   323		// packet-based descriptor like a UDP socket.
   324		IsStream bool
   325	
   326		// Whether a zero byte read indicates EOF. This is false for a
   327		// message based socket connection.
   328		ZeroReadIsEOF bool
   329	
   330		// Whether this is a file rather than a network socket.
   331		isFile bool
   332	
   333		// The kind of this file.
   334		kind fileKind
   335	}
   336	
   337	// fileKind describes the kind of file.
   338	type fileKind byte
   339	
   340	const (
   341		kindNet fileKind = iota
   342		kindFile
   343		kindConsole
   344		kindDir
   345		kindPipe
   346	)
   347	
   348	// logInitFD is set by tests to enable file descriptor initialization logging.
   349	var logInitFD func(net string, fd *FD, err error)
   350	
   351	// Init initializes the FD. The Sysfd field should already be set.
   352	// This can be called multiple times on a single FD.
   353	// The net argument is a network name from the net package (e.g., "tcp"),
   354	// or "file" or "console" or "dir".
   355	// Set pollable to true if fd should be managed by runtime netpoll.
   356	func (fd *FD) Init(net string, pollable bool) (string, error) {
   357		if initErr != nil {
   358			return "", initErr
   359		}
   360	
   361		switch net {
   362		case "file":
   363			fd.kind = kindFile
   364		case "console":
   365			fd.kind = kindConsole
   366		case "dir":
   367			fd.kind = kindDir
   368		case "pipe":
   369			fd.kind = kindPipe
   370		case "tcp", "tcp4", "tcp6",
   371			"udp", "udp4", "udp6",
   372			"ip", "ip4", "ip6",
   373			"unix", "unixgram", "unixpacket":
   374			fd.kind = kindNet
   375		default:
   376			return "", errors.New("internal error: unknown network type " + net)
   377		}
   378		fd.isFile = fd.kind != kindNet
   379	
   380		var err error
   381		if pollable {
   382			// Only call init for a network socket.
   383			// This means that we don't add files to the runtime poller.
   384			// Adding files to the runtime poller can confuse matters
   385			// if the user is doing their own overlapped I/O.
   386			// See issue #21172.
   387			//
   388			// In general the code below avoids calling the ExecIO
   389			// method for non-network sockets. If some method does
   390			// somehow call ExecIO, then ExecIO, and therefore the
   391			// calling method, will return an error, because
   392			// fd.pd.runtimeCtx will be 0.
   393			err = fd.pd.init(fd)
   394		}
   395		if logInitFD != nil {
   396			logInitFD(net, fd, err)
   397		}
   398		if err != nil {
   399			return "", err
   400		}
   401		if pollable && useSetFileCompletionNotificationModes {
   402			// We do not use events, so we can skip them always.
   403			flags := uint8(syscall.FILE_SKIP_SET_EVENT_ON_HANDLE)
   404			// It's not safe to skip completion notifications for UDP:
   405			// https://blogs.technet.com/b/winserverperformance/archive/2008/06/26/designing-applications-for-high-performance-part-iii.aspx
   406			if net == "tcp" {
   407				flags |= syscall.FILE_SKIP_COMPLETION_PORT_ON_SUCCESS
   408			}
   409			err := syscall.SetFileCompletionNotificationModes(fd.Sysfd, flags)
   410			if err == nil && flags&syscall.FILE_SKIP_COMPLETION_PORT_ON_SUCCESS != 0 {
   411				fd.skipSyncNotif = true
   412			}
   413		}
   414		// Disable SIO_UDP_CONNRESET behavior.
   415		// http://support.microsoft.com/kb/263823
   416		switch net {
   417		case "udp", "udp4", "udp6":
   418			ret := uint32(0)
   419			flag := uint32(0)
   420			size := uint32(unsafe.Sizeof(flag))
   421			err := syscall.WSAIoctl(fd.Sysfd, syscall.SIO_UDP_CONNRESET, (*byte)(unsafe.Pointer(&flag)), size, nil, 0, &ret, nil, 0)
   422			if err != nil {
   423				return "wsaioctl", err
   424			}
   425		}
   426		fd.rop.mode = 'r'
   427		fd.wop.mode = 'w'
   428		fd.rop.fd = fd
   429		fd.wop.fd = fd
   430		fd.rop.runtimeCtx = fd.pd.runtimeCtx
   431		fd.wop.runtimeCtx = fd.pd.runtimeCtx
   432		if !canCancelIO {
   433			fd.rop.errc = make(chan error)
   434			fd.wop.errc = make(chan error)
   435		}
   436		return "", nil
   437	}
   438	
   439	func (fd *FD) destroy() error {
   440		if fd.Sysfd == syscall.InvalidHandle {
   441			return syscall.EINVAL
   442		}
   443		// Poller may want to unregister fd in readiness notification mechanism,
   444		// so this must be executed before fd.CloseFunc.
   445		fd.pd.close()
   446		var err error
   447		switch fd.kind {
   448		case kindNet:
   449			// The net package uses the CloseFunc variable for testing.
   450			err = CloseFunc(fd.Sysfd)
   451		case kindDir:
   452			err = syscall.FindClose(fd.Sysfd)
   453		default:
   454			err = syscall.CloseHandle(fd.Sysfd)
   455		}
   456		fd.Sysfd = syscall.InvalidHandle
   457		runtime_Semrelease(&fd.csema)
   458		return err
   459	}
   460	
   461	// Close closes the FD. The underlying file descriptor is closed by
   462	// the destroy method when there are no remaining references.
   463	func (fd *FD) Close() error {
   464		if !fd.fdmu.increfAndClose() {
   465			return errClosing(fd.isFile)
   466		}
   467		if fd.kind == kindPipe {
   468			syscall.CancelIoEx(fd.Sysfd, nil)
   469		}
   470		// unblock pending reader and writer
   471		fd.pd.evict()
   472		err := fd.decref()
   473		// Wait until the descriptor is closed. If this was the only
   474		// reference, it is already closed.
   475		runtime_Semacquire(&fd.csema)
   476		return err
   477	}
   478	
   479	// Shutdown wraps the shutdown network call.
   480	func (fd *FD) Shutdown(how int) error {
   481		if err := fd.incref(); err != nil {
   482			return err
   483		}
   484		defer fd.decref()
   485		return syscall.Shutdown(fd.Sysfd, how)
   486	}
   487	
   488	// Windows ReadFile and WSARecv use DWORD (uint32) parameter to pass buffer length.
   489	// This prevents us reading blocks larger than 4GB.
   490	// See golang.org/issue/26923.
   491	const maxRW = 1 << 30 // 1GB is large enough and keeps subsequent reads aligned
   492	
   493	// Read implements io.Reader.
   494	func (fd *FD) Read(buf []byte) (int, error) {
   495		if err := fd.readLock(); err != nil {
   496			return 0, err
   497		}
   498		defer fd.readUnlock()
   499	
   500		if len(buf) > maxRW {
   501			buf = buf[:maxRW]
   502		}
   503	
   504		var n int
   505		var err error
   506		if fd.isFile {
   507			fd.l.Lock()
   508			defer fd.l.Unlock()
   509			switch fd.kind {
   510			case kindConsole:
   511				n, err = fd.readConsole(buf)
   512			default:
   513				n, err = syscall.Read(fd.Sysfd, buf)
   514				if fd.kind == kindPipe && err == syscall.ERROR_OPERATION_ABORTED {
   515					// Close uses CancelIoEx to interrupt concurrent I/O for pipes.
   516					// If the fd is a pipe and the Read was interrupted by CancelIoEx,
   517					// we assume it is interrupted by Close.
   518					err = ErrFileClosing
   519				}
   520			}
   521			if err != nil {
   522				n = 0
   523			}
   524		} else {
   525			o := &fd.rop
   526			o.InitBuf(buf)
   527			n, err = rsrv.ExecIO(o, func(o *operation) error {
   528				return syscall.WSARecv(o.fd.Sysfd, &o.buf, 1, &o.qty, &o.flags, &o.o, nil)
   529			})
   530			if race.Enabled {
   531				race.Acquire(unsafe.Pointer(&ioSync))
   532			}
   533		}
   534		if len(buf) != 0 {
   535			err = fd.eofError(n, err)
   536		}
   537		return n, err
   538	}
   539	
   540	var ReadConsole = syscall.ReadConsole // changed for testing
   541	
   542	// readConsole reads utf16 characters from console File,
   543	// encodes them into utf8 and stores them in buffer b.
   544	// It returns the number of utf8 bytes read and an error, if any.
   545	func (fd *FD) readConsole(b []byte) (int, error) {
   546		if len(b) == 0 {
   547			return 0, nil
   548		}
   549	
   550		if fd.readuint16 == nil {
   551			// Note: syscall.ReadConsole fails for very large buffers.
   552			// The limit is somewhere around (but not exactly) 16384.
   553			// Stay well below.
   554			fd.readuint16 = make([]uint16, 0, 10000)
   555			fd.readbyte = make([]byte, 0, 4*cap(fd.readuint16))
   556		}
   557	
   558		for fd.readbyteOffset >= len(fd.readbyte) {
   559			n := cap(fd.readuint16) - len(fd.readuint16)
   560			if n > len(b) {
   561				n = len(b)
   562			}
   563			var nw uint32
   564			err := ReadConsole(fd.Sysfd, &fd.readuint16[:len(fd.readuint16)+1][len(fd.readuint16)], uint32(n), &nw, nil)
   565			if err != nil {
   566				return 0, err
   567			}
   568			uint16s := fd.readuint16[:len(fd.readuint16)+int(nw)]
   569			fd.readuint16 = fd.readuint16[:0]
   570			buf := fd.readbyte[:0]
   571			for i := 0; i < len(uint16s); i++ {
   572				r := rune(uint16s[i])
   573				if utf16.IsSurrogate(r) {
   574					if i+1 == len(uint16s) {
   575						if nw > 0 {
   576							// Save half surrogate pair for next time.
   577							fd.readuint16 = fd.readuint16[:1]
   578							fd.readuint16[0] = uint16(r)
   579							break
   580						}
   581						r = utf8.RuneError
   582					} else {
   583						r = utf16.DecodeRune(r, rune(uint16s[i+1]))
   584						if r != utf8.RuneError {
   585							i++
   586						}
   587					}
   588				}
   589				n := utf8.EncodeRune(buf[len(buf):cap(buf)], r)
   590				buf = buf[:len(buf)+n]
   591			}
   592			fd.readbyte = buf
   593			fd.readbyteOffset = 0
   594			if nw == 0 {
   595				break
   596			}
   597		}
   598	
   599		src := fd.readbyte[fd.readbyteOffset:]
   600		var i int
   601		for i = 0; i < len(src) && i < len(b); i++ {
   602			x := src[i]
   603			if x == 0x1A { // Ctrl-Z
   604				if i == 0 {
   605					fd.readbyteOffset++
   606				}
   607				break
   608			}
   609			b[i] = x
   610		}
   611		fd.readbyteOffset += i
   612		return i, nil
   613	}
   614	
   615	// Pread emulates the Unix pread system call.
   616	func (fd *FD) Pread(b []byte, off int64) (int, error) {
   617		// Call incref, not readLock, because since pread specifies the
   618		// offset it is independent from other reads.
   619		if err := fd.incref(); err != nil {
   620			return 0, err
   621		}
   622		defer fd.decref()
   623	
   624		if len(b) > maxRW {
   625			b = b[:maxRW]
   626		}
   627	
   628		fd.l.Lock()
   629		defer fd.l.Unlock()
   630		curoffset, e := syscall.Seek(fd.Sysfd, 0, io.SeekCurrent)
   631		if e != nil {
   632			return 0, e
   633		}
   634		defer syscall.Seek(fd.Sysfd, curoffset, io.SeekStart)
   635		o := syscall.Overlapped{
   636			OffsetHigh: uint32(off >> 32),
   637			Offset:     uint32(off),
   638		}
   639		var done uint32
   640		e = syscall.ReadFile(fd.Sysfd, b, &done, &o)
   641		if e != nil {
   642			done = 0
   643			if e == syscall.ERROR_HANDLE_EOF {
   644				e = io.EOF
   645			}
   646		}
   647		if len(b) != 0 {
   648			e = fd.eofError(int(done), e)
   649		}
   650		return int(done), e
   651	}
   652	
   653	// ReadFrom wraps the recvfrom network call.
   654	func (fd *FD) ReadFrom(buf []byte) (int, syscall.Sockaddr, error) {
   655		if len(buf) == 0 {
   656			return 0, nil, nil
   657		}
   658		if len(buf) > maxRW {
   659			buf = buf[:maxRW]
   660		}
   661		if err := fd.readLock(); err != nil {
   662			return 0, nil, err
   663		}
   664		defer fd.readUnlock()
   665		o := &fd.rop
   666		o.InitBuf(buf)
   667		n, err := rsrv.ExecIO(o, func(o *operation) error {
   668			if o.rsa == nil {
   669				o.rsa = new(syscall.RawSockaddrAny)
   670			}
   671			o.rsan = int32(unsafe.Sizeof(*o.rsa))
   672			return syscall.WSARecvFrom(o.fd.Sysfd, &o.buf, 1, &o.qty, &o.flags, o.rsa, &o.rsan, &o.o, nil)
   673		})
   674		err = fd.eofError(n, err)
   675		if err != nil {
   676			return n, nil, err
   677		}
   678		sa, _ := o.rsa.Sockaddr()
   679		return n, sa, nil
   680	}
   681	
   682	// Write implements io.Writer.
   683	func (fd *FD) Write(buf []byte) (int, error) {
   684		if err := fd.writeLock(); err != nil {
   685			return 0, err
   686		}
   687		defer fd.writeUnlock()
   688		if fd.isFile {
   689			fd.l.Lock()
   690			defer fd.l.Unlock()
   691		}
   692	
   693		ntotal := 0
   694		for len(buf) > 0 {
   695			b := buf
   696			if len(b) > maxRW {
   697				b = b[:maxRW]
   698			}
   699			var n int
   700			var err error
   701			if fd.isFile {
   702				switch fd.kind {
   703				case kindConsole:
   704					n, err = fd.writeConsole(b)
   705				default:
   706					n, err = syscall.Write(fd.Sysfd, b)
   707					if fd.kind == kindPipe && err == syscall.ERROR_OPERATION_ABORTED {
   708						// Close uses CancelIoEx to interrupt concurrent I/O for pipes.
   709						// If the fd is a pipe and the Write was interrupted by CancelIoEx,
   710						// we assume it is interrupted by Close.
   711						err = ErrFileClosing
   712					}
   713				}
   714				if err != nil {
   715					n = 0
   716				}
   717			} else {
   718				if race.Enabled {
   719					race.ReleaseMerge(unsafe.Pointer(&ioSync))
   720				}
   721				o := &fd.wop
   722				o.InitBuf(b)
   723				n, err = wsrv.ExecIO(o, func(o *operation) error {
   724					return syscall.WSASend(o.fd.Sysfd, &o.buf, 1, &o.qty, 0, &o.o, nil)
   725				})
   726			}
   727			ntotal += n
   728			if err != nil {
   729				return ntotal, err
   730			}
   731			buf = buf[n:]
   732		}
   733		return ntotal, nil
   734	}
   735	
   736	// writeConsole writes len(b) bytes to the console File.
   737	// It returns the number of bytes written and an error, if any.
   738	func (fd *FD) writeConsole(b []byte) (int, error) {
   739		n := len(b)
   740		runes := make([]rune, 0, 256)
   741		if len(fd.lastbits) > 0 {
   742			b = append(fd.lastbits, b...)
   743			fd.lastbits = nil
   744	
   745		}
   746		for len(b) >= utf8.UTFMax || utf8.FullRune(b) {
   747			r, l := utf8.DecodeRune(b)
   748			runes = append(runes, r)
   749			b = b[l:]
   750		}
   751		if len(b) > 0 {
   752			fd.lastbits = make([]byte, len(b))
   753			copy(fd.lastbits, b)
   754		}
   755		// syscall.WriteConsole seems to fail, if given large buffer.
   756		// So limit the buffer to 16000 characters. This number was
   757		// discovered by experimenting with syscall.WriteConsole.
   758		const maxWrite = 16000
   759		for len(runes) > 0 {
   760			m := len(runes)
   761			if m > maxWrite {
   762				m = maxWrite
   763			}
   764			chunk := runes[:m]
   765			runes = runes[m:]
   766			uint16s := utf16.Encode(chunk)
   767			for len(uint16s) > 0 {
   768				var written uint32
   769				err := syscall.WriteConsole(fd.Sysfd, &uint16s[0], uint32(len(uint16s)), &written, nil)
   770				if err != nil {
   771					return 0, err
   772				}
   773				uint16s = uint16s[written:]
   774			}
   775		}
   776		return n, nil
   777	}
   778	
   779	// Pwrite emulates the Unix pwrite system call.
   780	func (fd *FD) Pwrite(buf []byte, off int64) (int, error) {
   781		// Call incref, not writeLock, because since pwrite specifies the
   782		// offset it is independent from other writes.
   783		if err := fd.incref(); err != nil {
   784			return 0, err
   785		}
   786		defer fd.decref()
   787	
   788		fd.l.Lock()
   789		defer fd.l.Unlock()
   790		curoffset, e := syscall.Seek(fd.Sysfd, 0, io.SeekCurrent)
   791		if e != nil {
   792			return 0, e
   793		}
   794		defer syscall.Seek(fd.Sysfd, curoffset, io.SeekStart)
   795	
   796		ntotal := 0
   797		for len(buf) > 0 {
   798			b := buf
   799			if len(b) > maxRW {
   800				b = b[:maxRW]
   801			}
   802			var n uint32
   803			o := syscall.Overlapped{
   804				OffsetHigh: uint32(off >> 32),
   805				Offset:     uint32(off),
   806			}
   807			e = syscall.WriteFile(fd.Sysfd, b, &n, &o)
   808			ntotal += int(n)
   809			if e != nil {
   810				return ntotal, e
   811			}
   812			buf = buf[n:]
   813			off += int64(n)
   814		}
   815		return ntotal, nil
   816	}
   817	
   818	// Writev emulates the Unix writev system call.
   819	func (fd *FD) Writev(buf *[][]byte) (int64, error) {
   820		if len(*buf) == 0 {
   821			return 0, nil
   822		}
   823		if err := fd.writeLock(); err != nil {
   824			return 0, err
   825		}
   826		defer fd.writeUnlock()
   827		if race.Enabled {
   828			race.ReleaseMerge(unsafe.Pointer(&ioSync))
   829		}
   830		o := &fd.wop
   831		o.InitBufs(buf)
   832		n, err := wsrv.ExecIO(o, func(o *operation) error {
   833			return syscall.WSASend(o.fd.Sysfd, &o.bufs[0], uint32(len(o.bufs)), &o.qty, 0, &o.o, nil)
   834		})
   835		o.ClearBufs()
   836		TestHookDidWritev(n)
   837		consume(buf, int64(n))
   838		return int64(n), err
   839	}
   840	
   841	// WriteTo wraps the sendto network call.
   842	func (fd *FD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) {
   843		if err := fd.writeLock(); err != nil {
   844			return 0, err
   845		}
   846		defer fd.writeUnlock()
   847	
   848		if len(buf) == 0 {
   849			// handle zero-byte payload
   850			o := &fd.wop
   851			o.InitBuf(buf)
   852			o.sa = sa
   853			n, err := wsrv.ExecIO(o, func(o *operation) error {
   854				return syscall.WSASendto(o.fd.Sysfd, &o.buf, 1, &o.qty, 0, o.sa, &o.o, nil)
   855			})
   856			return n, err
   857		}
   858	
   859		ntotal := 0
   860		for len(buf) > 0 {
   861			b := buf
   862			if len(b) > maxRW {
   863				b = b[:maxRW]
   864			}
   865			o := &fd.wop
   866			o.InitBuf(b)
   867			o.sa = sa
   868			n, err := wsrv.ExecIO(o, func(o *operation) error {
   869				return syscall.WSASendto(o.fd.Sysfd, &o.buf, 1, &o.qty, 0, o.sa, &o.o, nil)
   870			})
   871			ntotal += int(n)
   872			if err != nil {
   873				return ntotal, err
   874			}
   875			buf = buf[n:]
   876		}
   877		return ntotal, nil
   878	}
   879	
   880	// Call ConnectEx. This doesn't need any locking, since it is only
   881	// called when the descriptor is first created. This is here rather
   882	// than in the net package so that it can use fd.wop.
   883	func (fd *FD) ConnectEx(ra syscall.Sockaddr) error {
   884		o := &fd.wop
   885		o.sa = ra
   886		_, err := wsrv.ExecIO(o, func(o *operation) error {
   887			return ConnectExFunc(o.fd.Sysfd, o.sa, nil, 0, nil, &o.o)
   888		})
   889		return err
   890	}
   891	
   892	func (fd *FD) acceptOne(s syscall.Handle, rawsa []syscall.RawSockaddrAny, o *operation) (string, error) {
   893		// Submit accept request.
   894		o.handle = s
   895		o.rsan = int32(unsafe.Sizeof(rawsa[0]))
   896		_, err := rsrv.ExecIO(o, func(o *operation) error {
   897			return AcceptFunc(o.fd.Sysfd, o.handle, (*byte)(unsafe.Pointer(&rawsa[0])), 0, uint32(o.rsan), uint32(o.rsan), &o.qty, &o.o)
   898		})
   899		if err != nil {
   900			CloseFunc(s)
   901			return "acceptex", err
   902		}
   903	
   904		// Inherit properties of the listening socket.
   905		err = syscall.Setsockopt(s, syscall.SOL_SOCKET, syscall.SO_UPDATE_ACCEPT_CONTEXT, (*byte)(unsafe.Pointer(&fd.Sysfd)), int32(unsafe.Sizeof(fd.Sysfd)))
   906		if err != nil {
   907			CloseFunc(s)
   908			return "setsockopt", err
   909		}
   910	
   911		return "", nil
   912	}
   913	
   914	// Accept handles accepting a socket. The sysSocket parameter is used
   915	// to allocate the net socket.
   916	func (fd *FD) Accept(sysSocket func() (syscall.Handle, error)) (syscall.Handle, []syscall.RawSockaddrAny, uint32, string, error) {
   917		if err := fd.readLock(); err != nil {
   918			return syscall.InvalidHandle, nil, 0, "", err
   919		}
   920		defer fd.readUnlock()
   921	
   922		o := &fd.rop
   923		var rawsa [2]syscall.RawSockaddrAny
   924		for {
   925			s, err := sysSocket()
   926			if err != nil {
   927				return syscall.InvalidHandle, nil, 0, "", err
   928			}
   929	
   930			errcall, err := fd.acceptOne(s, rawsa[:], o)
   931			if err == nil {
   932				return s, rawsa[:], uint32(o.rsan), "", nil
   933			}
   934	
   935			// Sometimes we see WSAECONNRESET and ERROR_NETNAME_DELETED is
   936			// returned here. These happen if connection reset is received
   937			// before AcceptEx could complete. These errors relate to new
   938			// connection, not to AcceptEx, so ignore broken connection and
   939			// try AcceptEx again for more connections.
   940			errno, ok := err.(syscall.Errno)
   941			if !ok {
   942				return syscall.InvalidHandle, nil, 0, errcall, err
   943			}
   944			switch errno {
   945			case syscall.ERROR_NETNAME_DELETED, syscall.WSAECONNRESET:
   946				// ignore these and try again
   947			default:
   948				return syscall.InvalidHandle, nil, 0, errcall, err
   949			}
   950		}
   951	}
   952	
   953	// Seek wraps syscall.Seek.
   954	func (fd *FD) Seek(offset int64, whence int) (int64, error) {
   955		if err := fd.incref(); err != nil {
   956			return 0, err
   957		}
   958		defer fd.decref()
   959	
   960		fd.l.Lock()
   961		defer fd.l.Unlock()
   962	
   963		return syscall.Seek(fd.Sysfd, offset, whence)
   964	}
   965	
   966	// FindNextFile wraps syscall.FindNextFile.
   967	func (fd *FD) FindNextFile(data *syscall.Win32finddata) error {
   968		if err := fd.incref(); err != nil {
   969			return err
   970		}
   971		defer fd.decref()
   972		return syscall.FindNextFile(fd.Sysfd, data)
   973	}
   974	
   975	// Fchdir wraps syscall.Fchdir.
   976	func (fd *FD) Fchdir() error {
   977		if err := fd.incref(); err != nil {
   978			return err
   979		}
   980		defer fd.decref()
   981		return syscall.Fchdir(fd.Sysfd)
   982	}
   983	
   984	// GetFileType wraps syscall.GetFileType.
   985	func (fd *FD) GetFileType() (uint32, error) {
   986		if err := fd.incref(); err != nil {
   987			return 0, err
   988		}
   989		defer fd.decref()
   990		return syscall.GetFileType(fd.Sysfd)
   991	}
   992	
   993	// GetFileInformationByHandle wraps GetFileInformationByHandle.
   994	func (fd *FD) GetFileInformationByHandle(data *syscall.ByHandleFileInformation) error {
   995		if err := fd.incref(); err != nil {
   996			return err
   997		}
   998		defer fd.decref()
   999		return syscall.GetFileInformationByHandle(fd.Sysfd, data)
  1000	}
  1001	
  1002	// RawControl invokes the user-defined function f for a non-IO
  1003	// operation.
  1004	func (fd *FD) RawControl(f func(uintptr)) error {
  1005		if err := fd.incref(); err != nil {
  1006			return err
  1007		}
  1008		defer fd.decref()
  1009		f(uintptr(fd.Sysfd))
  1010		return nil
  1011	}
  1012	
  1013	// RawRead invokes the user-defined function f for a read operation.
  1014	func (fd *FD) RawRead(f func(uintptr) bool) error {
  1015		if err := fd.readLock(); err != nil {
  1016			return err
  1017		}
  1018		defer fd.readUnlock()
  1019		for {
  1020			if f(uintptr(fd.Sysfd)) {
  1021				return nil
  1022			}
  1023	
  1024			// Use a zero-byte read as a way to get notified when this
  1025			// socket is readable. h/t https://stackoverflow.com/a/42019668/332798
  1026			o := &fd.rop
  1027			o.InitBuf(nil)
  1028			if !fd.IsStream {
  1029				o.flags |= windows.MSG_PEEK
  1030			}
  1031			_, err := rsrv.ExecIO(o, func(o *operation) error {
  1032				return syscall.WSARecv(o.fd.Sysfd, &o.buf, 1, &o.qty, &o.flags, &o.o, nil)
  1033			})
  1034			if err == windows.WSAEMSGSIZE {
  1035				// expected with a 0-byte peek, ignore.
  1036			} else if err != nil {
  1037				return err
  1038			}
  1039		}
  1040	}
  1041	
  1042	// RawWrite invokes the user-defined function f for a write operation.
  1043	func (fd *FD) RawWrite(f func(uintptr) bool) error {
  1044		if err := fd.writeLock(); err != nil {
  1045			return err
  1046		}
  1047		defer fd.writeUnlock()
  1048	
  1049		if f(uintptr(fd.Sysfd)) {
  1050			return nil
  1051		}
  1052	
  1053		// TODO(tmm1): find a way to detect socket writability
  1054		return syscall.EWINDOWS
  1055	}
  1056	
  1057	func sockaddrToRaw(sa syscall.Sockaddr) (unsafe.Pointer, int32, error) {
  1058		switch sa := sa.(type) {
  1059		case *syscall.SockaddrInet4:
  1060			var raw syscall.RawSockaddrInet4
  1061			raw.Family = syscall.AF_INET
  1062			p := (*[2]byte)(unsafe.Pointer(&raw.Port))
  1063			p[0] = byte(sa.Port >> 8)
  1064			p[1] = byte(sa.Port)
  1065			for i := 0; i < len(sa.Addr); i++ {
  1066				raw.Addr[i] = sa.Addr[i]
  1067			}
  1068			return unsafe.Pointer(&raw), int32(unsafe.Sizeof(raw)), nil
  1069		case *syscall.SockaddrInet6:
  1070			var raw syscall.RawSockaddrInet6
  1071			raw.Family = syscall.AF_INET6
  1072			p := (*[2]byte)(unsafe.Pointer(&raw.Port))
  1073			p[0] = byte(sa.Port >> 8)
  1074			p[1] = byte(sa.Port)
  1075			raw.Scope_id = sa.ZoneId
  1076			for i := 0; i < len(sa.Addr); i++ {
  1077				raw.Addr[i] = sa.Addr[i]
  1078			}
  1079			return unsafe.Pointer(&raw), int32(unsafe.Sizeof(raw)), nil
  1080		default:
  1081			return nil, 0, syscall.EWINDOWS
  1082		}
  1083	}
  1084	
  1085	// ReadMsg wraps the WSARecvMsg network call.
  1086	func (fd *FD) ReadMsg(p []byte, oob []byte) (int, int, int, syscall.Sockaddr, error) {
  1087		if err := fd.readLock(); err != nil {
  1088			return 0, 0, 0, nil, err
  1089		}
  1090		defer fd.readUnlock()
  1091	
  1092		if len(p) > maxRW {
  1093			p = p[:maxRW]
  1094		}
  1095	
  1096		o := &fd.rop
  1097		o.InitMsg(p, oob)
  1098		o.rsa = new(syscall.RawSockaddrAny)
  1099		o.msg.Name = o.rsa
  1100		o.msg.Namelen = int32(unsafe.Sizeof(*o.rsa))
  1101		n, err := rsrv.ExecIO(o, func(o *operation) error {
  1102			return windows.WSARecvMsg(o.fd.Sysfd, &o.msg, &o.qty, &o.o, nil)
  1103		})
  1104		err = fd.eofError(n, err)
  1105		var sa syscall.Sockaddr
  1106		if err == nil {
  1107			sa, err = o.rsa.Sockaddr()
  1108		}
  1109		return n, int(o.msg.Control.Len), int(o.msg.Flags), sa, err
  1110	}
  1111	
  1112	// WriteMsg wraps the WSASendMsg network call.
  1113	func (fd *FD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (int, int, error) {
  1114		if len(p) > maxRW {
  1115			return 0, 0, errors.New("packet is too large (only 1GB is allowed)")
  1116		}
  1117	
  1118		if err := fd.writeLock(); err != nil {
  1119			return 0, 0, err
  1120		}
  1121		defer fd.writeUnlock()
  1122	
  1123		o := &fd.wop
  1124		o.InitMsg(p, oob)
  1125		if sa != nil {
  1126			rsa, len, err := sockaddrToRaw(sa)
  1127			if err != nil {
  1128				return 0, 0, err
  1129			}
  1130			o.msg.Name = (*syscall.RawSockaddrAny)(rsa)
  1131			o.msg.Namelen = len
  1132		}
  1133		n, err := wsrv.ExecIO(o, func(o *operation) error {
  1134			return windows.WSASendMsg(o.fd.Sysfd, &o.msg, 0, &o.qty, &o.o, nil)
  1135		})
  1136		return n, int(o.msg.Control.Len), err
  1137	}
  1138	

View as plain text