...

Source file src/internal/poll/fd_mutex.go

     1	// Copyright 2013 The Go Authors. All rights reserved.
     2	// Use of this source code is governed by a BSD-style
     3	// license that can be found in the LICENSE file.
     4	
     5	package poll
     6	
     7	import "sync/atomic"
     8	
     9	// fdMutex is a specialized synchronization primitive that manages
    10	// lifetime of an fd and serializes access to Read, Write and Close
    11	// methods on FD.
    12	type fdMutex struct {
    13		state uint64
    14		rsema uint32
    15		wsema uint32
    16	}
    17	
    18	// fdMutex.state is organized as follows:
    19	// 1 bit - whether FD is closed, if set all subsequent lock operations will fail.
    20	// 1 bit - lock for read operations.
    21	// 1 bit - lock for write operations.
    22	// 20 bits - total number of references (read+write+misc).
    23	// 20 bits - number of outstanding read waiters.
    24	// 20 bits - number of outstanding write waiters.
    25	const (
    26		mutexClosed  = 1 << 0
    27		mutexRLock   = 1 << 1
    28		mutexWLock   = 1 << 2
    29		mutexRef     = 1 << 3
    30		mutexRefMask = (1<<20 - 1) << 3
    31		mutexRWait   = 1 << 23
    32		mutexRMask   = (1<<20 - 1) << 23
    33		mutexWWait   = 1 << 43
    34		mutexWMask   = (1<<20 - 1) << 43
    35	)
    36	
    37	const overflowMsg = "too many concurrent operations on a single file or socket (max 1048575)"
    38	
    39	// Read operations must do rwlock(true)/rwunlock(true).
    40	//
    41	// Write operations must do rwlock(false)/rwunlock(false).
    42	//
    43	// Misc operations must do incref/decref.
    44	// Misc operations include functions like setsockopt and setDeadline.
    45	// They need to use incref/decref to ensure that they operate on the
    46	// correct fd in presence of a concurrent close call (otherwise fd can
    47	// be closed under their feet).
    48	//
    49	// Close operations must do increfAndClose/decref.
    50	
    51	// incref adds a reference to mu.
    52	// It reports whether mu is available for reading or writing.
    53	func (mu *fdMutex) incref() bool {
    54		for {
    55			old := atomic.LoadUint64(&mu.state)
    56			if old&mutexClosed != 0 {
    57				return false
    58			}
    59			new := old + mutexRef
    60			if new&mutexRefMask == 0 {
    61				panic(overflowMsg)
    62			}
    63			if atomic.CompareAndSwapUint64(&mu.state, old, new) {
    64				return true
    65			}
    66		}
    67	}
    68	
    69	// increfAndClose sets the state of mu to closed.
    70	// It returns false if the file was already closed.
    71	func (mu *fdMutex) increfAndClose() bool {
    72		for {
    73			old := atomic.LoadUint64(&mu.state)
    74			if old&mutexClosed != 0 {
    75				return false
    76			}
    77			// Mark as closed and acquire a reference.
    78			new := (old | mutexClosed) + mutexRef
    79			if new&mutexRefMask == 0 {
    80				panic(overflowMsg)
    81			}
    82			// Remove all read and write waiters.
    83			new &^= mutexRMask | mutexWMask
    84			if atomic.CompareAndSwapUint64(&mu.state, old, new) {
    85				// Wake all read and write waiters,
    86				// they will observe closed flag after wakeup.
    87				for old&mutexRMask != 0 {
    88					old -= mutexRWait
    89					runtime_Semrelease(&mu.rsema)
    90				}
    91				for old&mutexWMask != 0 {
    92					old -= mutexWWait
    93					runtime_Semrelease(&mu.wsema)
    94				}
    95				return true
    96			}
    97		}
    98	}
    99	
   100	// decref removes a reference from mu.
   101	// It reports whether there is no remaining reference.
   102	func (mu *fdMutex) decref() bool {
   103		for {
   104			old := atomic.LoadUint64(&mu.state)
   105			if old&mutexRefMask == 0 {
   106				panic("inconsistent poll.fdMutex")
   107			}
   108			new := old - mutexRef
   109			if atomic.CompareAndSwapUint64(&mu.state, old, new) {
   110				return new&(mutexClosed|mutexRefMask) == mutexClosed
   111			}
   112		}
   113	}
   114	
   115	// lock adds a reference to mu and locks mu.
   116	// It reports whether mu is available for reading or writing.
   117	func (mu *fdMutex) rwlock(read bool) bool {
   118		var mutexBit, mutexWait, mutexMask uint64
   119		var mutexSema *uint32
   120		if read {
   121			mutexBit = mutexRLock
   122			mutexWait = mutexRWait
   123			mutexMask = mutexRMask
   124			mutexSema = &mu.rsema
   125		} else {
   126			mutexBit = mutexWLock
   127			mutexWait = mutexWWait
   128			mutexMask = mutexWMask
   129			mutexSema = &mu.wsema
   130		}
   131		for {
   132			old := atomic.LoadUint64(&mu.state)
   133			if old&mutexClosed != 0 {
   134				return false
   135			}
   136			var new uint64
   137			if old&mutexBit == 0 {
   138				// Lock is free, acquire it.
   139				new = (old | mutexBit) + mutexRef
   140				if new&mutexRefMask == 0 {
   141					panic(overflowMsg)
   142				}
   143			} else {
   144				// Wait for lock.
   145				new = old + mutexWait
   146				if new&mutexMask == 0 {
   147					panic(overflowMsg)
   148				}
   149			}
   150			if atomic.CompareAndSwapUint64(&mu.state, old, new) {
   151				if old&mutexBit == 0 {
   152					return true
   153				}
   154				runtime_Semacquire(mutexSema)
   155				// The signaller has subtracted mutexWait.
   156			}
   157		}
   158	}
   159	
   160	// unlock removes a reference from mu and unlocks mu.
   161	// It reports whether there is no remaining reference.
   162	func (mu *fdMutex) rwunlock(read bool) bool {
   163		var mutexBit, mutexWait, mutexMask uint64
   164		var mutexSema *uint32
   165		if read {
   166			mutexBit = mutexRLock
   167			mutexWait = mutexRWait
   168			mutexMask = mutexRMask
   169			mutexSema = &mu.rsema
   170		} else {
   171			mutexBit = mutexWLock
   172			mutexWait = mutexWWait
   173			mutexMask = mutexWMask
   174			mutexSema = &mu.wsema
   175		}
   176		for {
   177			old := atomic.LoadUint64(&mu.state)
   178			if old&mutexBit == 0 || old&mutexRefMask == 0 {
   179				panic("inconsistent poll.fdMutex")
   180			}
   181			// Drop lock, drop reference and wake read waiter if present.
   182			new := (old &^ mutexBit) - mutexRef
   183			if old&mutexMask != 0 {
   184				new -= mutexWait
   185			}
   186			if atomic.CompareAndSwapUint64(&mu.state, old, new) {
   187				if old&mutexMask != 0 {
   188					runtime_Semrelease(mutexSema)
   189				}
   190				return new&(mutexClosed|mutexRefMask) == mutexClosed
   191			}
   192		}
   193	}
   194	
   195	// Implemented in runtime package.
   196	func runtime_Semacquire(sema *uint32)
   197	func runtime_Semrelease(sema *uint32)
   198	
   199	// incref adds a reference to fd.
   200	// It returns an error when fd cannot be used.
   201	func (fd *FD) incref() error {
   202		if !fd.fdmu.incref() {
   203			return errClosing(fd.isFile)
   204		}
   205		return nil
   206	}
   207	
   208	// decref removes a reference from fd.
   209	// It also closes fd when the state of fd is set to closed and there
   210	// is no remaining reference.
   211	func (fd *FD) decref() error {
   212		if fd.fdmu.decref() {
   213			return fd.destroy()
   214		}
   215		return nil
   216	}
   217	
   218	// readLock adds a reference to fd and locks fd for reading.
   219	// It returns an error when fd cannot be used for reading.
   220	func (fd *FD) readLock() error {
   221		if !fd.fdmu.rwlock(true) {
   222			return errClosing(fd.isFile)
   223		}
   224		return nil
   225	}
   226	
   227	// readUnlock removes a reference from fd and unlocks fd for reading.
   228	// It also closes fd when the state of fd is set to closed and there
   229	// is no remaining reference.
   230	func (fd *FD) readUnlock() {
   231		if fd.fdmu.rwunlock(true) {
   232			fd.destroy()
   233		}
   234	}
   235	
   236	// writeLock adds a reference to fd and locks fd for writing.
   237	// It returns an error when fd cannot be used for writing.
   238	func (fd *FD) writeLock() error {
   239		if !fd.fdmu.rwlock(false) {
   240			return errClosing(fd.isFile)
   241		}
   242		return nil
   243	}
   244	
   245	// writeUnlock removes a reference from fd and unlocks fd for writing.
   246	// It also closes fd when the state of fd is set to closed and there
   247	// is no remaining reference.
   248	func (fd *FD) writeUnlock() {
   249		if fd.fdmu.rwunlock(false) {
   250			fd.destroy()
   251		}
   252	}
   253	

View as plain text