diff --git a/server/filestore.go b/server/filestore.go index 423fe3d2440..2b45a578552 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -474,13 +474,24 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim // Check if our prior state remembers a last sequence past where we can see. if fs.ld != nil && prior.LastSeq > fs.state.LastSeq { fs.state.LastSeq, fs.state.LastTime = prior.LastSeq, prior.LastTime + // writeTombstone & newMsgBlockForWrite require a lock + preemptiveLock := !preEmptLock(&fs.mu) if _, err := fs.newMsgBlockForWrite(); err == nil { if err = fs.writeTombstone(prior.LastSeq, prior.LastTime.UnixNano()); err != nil { + if preemptiveLock { + logUnLock(&fs.mu) + } return nil, err } } else { + if preemptiveLock { + logUnLock(&fs.mu) + } return nil, err } + if preemptiveLock { + logUnLock(&fs.mu) + } } // Since we recovered here, make sure to kick ourselves to write out our stream state. fs.dirty++ @@ -549,9 +560,17 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim meta := filepath.Join(fcfg.StoreDir, JetStreamMetaFile) fi, err := os.Stat(meta) if err != nil && os.IsNotExist(err) || fi != nil && fi.Size() == 0 { + // writeStreamMeta is often called without a lock being held + preemptiveLock := !preEmptLock(&fs.mu) if err := fs.writeStreamMeta(); err != nil { + if preemptiveLock { + logUnLock(&fs.mu) + } return nil, err } + if preemptiveLock { + logUnLock(&fs.mu) + } } // If we expect to be encrypted check that what we are restoring is not plaintext. @@ -766,6 +785,13 @@ func genBlockEncryptionKey(sc StoreCipher, seed, nonce []byte) (cipher.Stream, e // Lock should be held. func (fs *fileStore) recoverAEK() error { + // Used to determine if a lock is already held or not, as this function + // states that a lock should be held. If no lock is held, the function will + // error with ErrNoLockHeld + if !hasLock(&fs.mu) { + return ErrNoLockHeld + } + if fs.prf != nil && fs.aek == nil { ekey, err := os.ReadFile(filepath.Join(fs.fcfg.StoreDir, JetStreamMetaFileKey)) if err != nil { @@ -795,6 +821,13 @@ func (fs *fileStore) recoverAEK() error { // Lock should be held. func (fs *fileStore) setupAEK() error { + // Used to determine if a lock is already held or not, as this function + // states that a lock should be held. If no lock is held, the function will + // error with ErrNoLockHeld + if !hasLock(&fs.mu) { + return ErrNoLockHeld + } + if fs.prf != nil && fs.aek == nil { key, _, _, encrypted, err := fs.genEncryptionKeys(fs.cfg.Name) if err != nil { @@ -817,6 +850,13 @@ func (fs *fileStore) setupAEK() error { // Write out meta and the checksum. // Lock should be held. func (fs *fileStore) writeStreamMeta() error { + // Used to determine if a lock is already held or not, as this function + // states that a lock should be held. If no lock is held, the function will + // error with ErrNoLockHeld + if !hasLock(&fs.mu) { + return ErrNoLockHeld + } + if err := fs.setupAEK(); err != nil { return err } @@ -914,6 +954,13 @@ const ( // Lock should be held. func (fs *fileStore) noTrackSubjects() bool { + // Used to preemptively obtain a lock if not already held, as this function + // states that a lock should be held. MUTEX_CHECK_DEBUG_FILE can be used to + // log instances where a lock was obtained when it was already expected + // (i.e. this function was called without a lock being held) + if !preEmptLock(&fs.mu) { + defer logUnLock(&fs.mu) + } return !(fs.psim.Size() > 0 || len(fs.cfg.Subjects) > 0 || fs.cfg.Mirror != nil || len(fs.cfg.Sources) > 0) } @@ -1001,6 +1048,16 @@ func (fs *fileStore) loadEncryptionForMsgBlock(mb *msgBlock) error { // Load a last checksum if needed from the block file. // Lock should be held. func (mb *msgBlock) ensureLastChecksumLoaded() { + // Used to preemptively obtain a lock if not already held, as this function + // states that a lock should be held. MUTEX_CHECK_DEBUG_FILE can be used to + // log instances where a lock was obtained when it was already expected + // (i.e. this function was called without a lock being held). This is used + // in place of hasLock and returning an error as this function does not have + // error as one of it's return types + if !preEmptLock(&mb.mu) { + defer logUnLock(&mb.mu) + } + var empty [8]byte if mb.lchk != empty { return @@ -1013,8 +1070,14 @@ func (fs *fileStore) recoverMsgBlock(index uint32) (*msgBlock, error) { mb := fs.initMsgBlock(index) // Open up the message file, but we will try to recover from the index file. // We will check that the last checksums match. + + // openBlock, loadBlock, populateGlobalPerSubjectInfo, addLostData, + // populateGlobalPerSubjectInfo, and addMsgBlock all require a lock. + mb.mu.Lock() + file, err := mb.openBlock() if err != nil { + mb.mu.Unlock() return nil, err } defer file.Close() @@ -1022,6 +1085,7 @@ func (fs *fileStore) recoverMsgBlock(index uint32) (*msgBlock, error) { if fi, err := file.Stat(); fi != nil { mb.rbytes = uint64(fi.Size()) } else { + mb.mu.Unlock() return nil, err } @@ -1042,6 +1106,7 @@ func (fs *fileStore) recoverMsgBlock(index uint32) (*msgBlock, error) { } file.Close() + mb.mu.Unlock() // Read our index file. Use this as source of truth if possible. // This not applicable in >= 2.10 servers. Here for upgrade paths from < 2.10. @@ -1050,11 +1115,15 @@ func (fs *fileStore) recoverMsgBlock(index uint32) (*msgBlock, error) { // Note this only checks that the message blk file is not newer then this file, or is empty and we expect empty. if (mb.rbytes == 0 && mb.msgs == 0) || bytes.Equal(lchk[:], mb.lchk[:]) { if mb.msgs > 0 && !mb.noTrack && fs.psim != nil { + mb.mu.Lock() fs.populateGlobalPerSubjectInfo(mb) + mb.mu.Unlock() // Try to dump any state we needed on recovery. mb.tryForceExpireCacheLocked() } + mb.mu.Lock() fs.addMsgBlock(mb) + mb.mu.Unlock() return mb, nil } } @@ -1070,7 +1139,9 @@ func (fs *fileStore) recoverMsgBlock(index uint32) (*msgBlock, error) { } if mb.msgs > 0 && !mb.noTrack && fs.psim != nil { + mb.mu.Lock() fs.populateGlobalPerSubjectInfo(mb) + mb.mu.Unlock() // Try to dump any state we needed on recovery. mb.tryForceExpireCacheLocked() } @@ -1093,6 +1164,9 @@ func (fs *fileStore) lostData() *LostStreamData { // Lock should be held. func (fs *fileStore) addLostData(ld *LostStreamData) { + if !preEmptLock(&fs.mu) { + defer logUnLock(&fs.mu) + } if ld == nil { return } @@ -1142,6 +1216,10 @@ func (fs *fileStore) rebuildState(ld *LostStreamData) { // Lock should be held. func (fs *fileStore) rebuildStateLocked(ld *LostStreamData) { + if !preEmptLock(&fs.mu) { + defer logUnLock(&fs.mu) + } + fs.addLostData(ld) fs.state.Msgs, fs.state.Bytes = 0, 0 @@ -1290,6 +1368,12 @@ func (mb *msgBlock) rebuildState() (*LostStreamData, []uint64, error) { // Rebuild the state of the blk based on what we have on disk in the N.blk file. // Lock should be held. func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) { + // Used to determine if a lock is already held or not, as this function + // states that a lock should be held. If no lock is held, the function will + // error with ErrNoLockHeld + if !hasLock(&mb.mu) { + return nil, nil, ErrNoLockHeld + } startLastSeq := atomic.LoadUint64(&mb.last.seq) // Remove the .fss file and clear any cache we have set. @@ -1529,6 +1613,16 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) { // For doing warn logging. // Lock should be held. func (fs *fileStore) warn(format string, args ...any) { + // Used to preemptively obtain a lock if not already held, as this function + // states that a lock should be held. MUTEX_CHECK_DEBUG_FILE can be used to + // log instances where a lock was obtained when it was already expected + // (i.e. this function was called without a lock being held). This is used + // in place of hasLock and returning an error as this function does not have + // error as one of it's return types + if !preEmptLock(&fs.mu) { + defer logUnLock(&fs.mu) + } + // No-op if no server configured. if fs.srv == nil { return @@ -1539,6 +1633,16 @@ func (fs *fileStore) warn(format string, args ...any) { // For doing debug logging. // Lock should be held. func (fs *fileStore) debug(format string, args ...any) { + // Used to preemptively obtain a lock if not already held, as this function + // states that a lock should be held. MUTEX_CHECK_DEBUG_FILE can be used to + // log instances where a lock was obtained when it was already expected + // (i.e. this function was called without a lock being held). This is used + // in place of hasLock and returning an error as this function does not have + // error as one of it's return types + if !preEmptLock(&fs.mu) { + defer logUnLock(&fs.mu) + } + // No-op if no server configured. if fs.srv == nil { return @@ -1911,7 +2015,9 @@ func (fs *fileStore) recoverTTLState() error { // Grabs last checksum for the named block file. // Takes into account encryption etc. func (mb *msgBlock) lastChecksum() []byte { + mb.mu.Lock() f, err := mb.openBlock() + mb.mu.Unlock() if err != nil { return nil } @@ -2609,6 +2715,16 @@ func (mb *msgBlock) filteredPending(subj string, wc bool, seq uint64) (total, fi // This will traverse a message block and generate the filtered pending. // Lock should be held. func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, sseq uint64) (total, first, last uint64) { + // Used to preemptively obtain a lock if not already held, as this function + // states that a lock should be held. MUTEX_CHECK_DEBUG_FILE can be used to + // log instances where a lock was obtained when it was already expected + // (i.e. this function was called without a lock being held). This is used + // in place of hasLock and returning an error as this function does not have + // error as one of it's return types + if !preEmptLock(&mb.mu) { + defer logUnLock(&mb.mu) + } + isAll := filter == _EMPTY_ || filter == fwcs // First check if we can optimize this part. @@ -2805,12 +2921,32 @@ func (fs *fileStore) checkSkipFirstBlock(filter string, wc bool, bi int) (int, e // Optimized way for getting all num pending matching a filter subject. // Lock should be held. func (fs *fileStore) numFilteredPending(filter string, ss *SimpleState) { + // Used to preemptively obtain a lock if not already held, as this function + // states that a lock should be held. MUTEX_CHECK_DEBUG_FILE can be used to + // log instances where a lock was obtained when it was already expected + // (i.e. this function was called without a lock being held). This is used + // in place of hasLock and returning an error as this function does not have + // error as one of it's return types + if !preEmptLock(&fs.mu) { + defer logUnLock(&fs.mu) + } + fs.numFilteredPendingWithLast(filter, true, ss) } // Optimized way for getting all num pending matching a filter subject and first sequence only. // Lock should be held. func (fs *fileStore) numFilteredPendingNoLast(filter string, ss *SimpleState) { + // Used to preemptively obtain a lock if not already held, as this function + // states that a lock should be held. MUTEX_CHECK_DEBUG_FILE can be used to + // log instances where a lock was obtained when it was already expected + // (i.e. this function was called without a lock being held). This is used + // in place of hasLock and returning an error as this function does not have + // error as one of it's return types + if !preEmptLock(&fs.mu) { + defer logUnLock(&fs.mu) + } + fs.numFilteredPendingWithLast(filter, false, ss) } @@ -2818,6 +2954,16 @@ func (fs *fileStore) numFilteredPendingNoLast(filter string, ss *SimpleState) { // Optionally look up last sequence. Sometimes do not need last and this avoids cost. // Read lock should be held. func (fs *fileStore) numFilteredPendingWithLast(filter string, last bool, ss *SimpleState) { + // Used to preemptively obtain a lock if not already held, as this function + // states that a lock should be held. MUTEX_CHECK_DEBUG_FILE can be used to + // log instances where a lock was obtained when it was already expected + // (i.e. this function was called without a lock being held). This is used + // in place of hasLock and returning an error as this function does not have + // error as one of it's return types + if !preEmptLock(&fs.mu) { + defer logUnLock(&fs.mu) + } + isAll := filter == _EMPTY_ || filter == fwcs // If isAll we do not need to do anything special to calculate the first and last and total. @@ -3775,6 +3921,15 @@ func (fs *fileStore) RegisterSubjectDeleteMarkerUpdates(cb SubjectDeleteMarkerUp // Helper to get hash key for specific message block. // Lock should be held func (fs *fileStore) hashKeyForBlock(index uint32) []byte { + // Used to preemptively obtain a lock if not already held, as this function + // states that a lock should be held. MUTEX_CHECK_DEBUG_FILE can be used to + // log instances where a lock was obtained when it was already expected + // (i.e. this function was called without a lock being held). This is used + // in place of hasLock and returning an error as this function does not have + // error as one of it's return types + if !preEmptLock(&fs.mu) { + defer logUnLock(&fs.mu) + } return []byte(fmt.Sprintf("%s-%d", fs.cfg.Name, index)) } @@ -3803,6 +3958,13 @@ func (mb *msgBlock) setupWriteCache(buf []byte) { // This rolls to a new append msg block. // Lock should be held. func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) { + // Used to determine if a lock is already held or not, as this function + // states that a lock should be held. If no lock is held, the function will + // error with ErrNoLockHeld + if !hasLock(&fs.mu) { + return nil, ErrNoLockHeld + } + index := uint32(1) var rbuf []byte @@ -3903,6 +4065,13 @@ func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts, t return ErrStoreClosed } + // Used to determine if a lock is already held or not, as this function + // states that a lock should be held. If no lock is held, the function will + // error with ErrNoLockHeld + if !hasLock(&fs.mu) { + return ErrNoLockHeld + } + // Per subject max check needed. mmp := uint64(fs.cfg.MaxMsgsPer) var psmc uint64 @@ -4232,6 +4401,16 @@ func (fs *fileStore) SkipMsgs(seq uint64, num uint64) error { // Lock should be held. func (fs *fileStore) rebuildFirst() { + // Used to preemptively obtain a lock if not already held, as this function + // states that a lock should be held. MUTEX_CHECK_DEBUG_FILE can be used to + // log instances where a lock was obtained when it was already expected + // (i.e. this function was called without a lock being held). This is used + // in place of hasLock and returning an error as this function does not have + // error as one of it's return types + if !preEmptLock(&fs.mu) { + defer logUnLock(&fs.mu) + } + if len(fs.blks) == 0 { return } @@ -4258,6 +4437,13 @@ func (fs *fileStore) rebuildFirst() { // We assume a fast check that this subj even exists already happened. // Write lock should be held. func (fs *fileStore) firstSeqForSubj(subj string) (uint64, error) { + // Used to determine if a lock is already held or not, as this function + // states that a lock should be held. If no lock is held, the function will + // error with ErrNoLockHeld + if !hasLock(&fs.mu) { + return 0, ErrNoLockHeld + } + if len(fs.blks) == 0 { return 0, nil } @@ -4323,6 +4509,16 @@ func (fs *fileStore) firstSeqForSubj(subj string) (uint64, error) { // Will check the msg limit and drop firstSeq msg if needed. // Lock should be held. func (fs *fileStore) enforceMsgLimit() { + // Used to preemptively obtain a lock if not already held, as this function + // states that a lock should be held. MUTEX_CHECK_DEBUG_FILE can be used to + // log instances where a lock was obtained when it was already expected + // (i.e. this function was called without a lock being held). This is used + // in place of hasLock and returning an error as this function does not have + // error as one of it's return types + if !preEmptLock(&fs.mu) { + defer logUnLock(&fs.mu) + } + if fs.cfg.Discard != DiscardOld { return } @@ -4340,6 +4536,16 @@ func (fs *fileStore) enforceMsgLimit() { // Will check the bytes limit and drop msgs if needed. // Lock should be held. func (fs *fileStore) enforceBytesLimit() { + // Used to preemptively obtain a lock if not already held, as this function + // states that a lock should be held. MUTEX_CHECK_DEBUG_FILE can be used to + // log instances where a lock was obtained when it was already expected + // (i.e. this function was called without a lock being held). This is used + // in place of hasLock and returning an error as this function does not have + // error as one of it's return types + if !preEmptLock(&fs.mu) { + defer logUnLock(&fs.mu) + } + if fs.cfg.Discard != DiscardOld { return } @@ -4359,6 +4565,16 @@ func (fs *fileStore) enforceBytesLimit() { // will most likely only be the last one, so can take a more conservative approach. // Lock should be held. func (fs *fileStore) enforceMsgPerSubjectLimit(fireCallback bool) { + // Used to preemptively obtain a lock if not already held, as this function + // states that a lock should be held. MUTEX_CHECK_DEBUG_FILE can be used to + // log instances where a lock was obtained when it was already expected + // (i.e. this function was called without a lock being held). This is used + // in place of hasLock and returning an error as this function does not have + // error as one of it's return types + if !preEmptLock(&fs.mu) { + defer logUnLock(&fs.mu) + } + start := time.Now() defer func() { if took := time.Since(start); took > time.Minute { @@ -4467,6 +4683,12 @@ func (fs *fileStore) enforceMsgPerSubjectLimit(fireCallback bool) { // Lock should be held. func (fs *fileStore) deleteFirstMsg() (bool, error) { + // Used to determine if a lock is already held or not, as this function + // states that a lock should be held. If no lock is held, the function will + // error with ErrNoLockHeld + if !hasLock(&fs.mu) { + return false, ErrNoLockHeld + } return fs.removeMsgViaLimits(fs.state.FirstSeq) } @@ -4474,6 +4696,12 @@ func (fs *fileStore) deleteFirstMsg() (bool, error) { // do not force the system to update the index file. // Lock should be held. func (fs *fileStore) removeMsgViaLimits(seq uint64) (bool, error) { + // Used to determine if a lock is already held or not, as this function + // states that a lock should be held. If no lock is held, the function will + // error with ErrNoLockHeld + if !hasLock(&fs.mu) { + return false, ErrNoLockHeld + } return fs.removeMsg(seq, false, true, false) } @@ -4490,6 +4718,16 @@ func (fs *fileStore) EraseMsg(seq uint64) (bool, error) { // Convenience function to remove per subject tracking at the filestore level. // Lock should be held. Returns if we deleted the last message on the subject. func (fs *fileStore) removePerSubject(subj string, marker bool) bool { + // Used to preemptively obtain a lock if not already held, as this function + // states that a lock should be held. MUTEX_CHECK_DEBUG_FILE can be used to + // log instances where a lock was obtained when it was already expected + // (i.e. this function was called without a lock being held). This is used + // in place of hasLock and returning an error as this function does not have + // error as one of it's return types + if !preEmptLock(&fs.mu) { + defer logUnLock(&fs.mu) + } + if len(subj) == 0 || fs.psim == nil { return false } @@ -4734,6 +4972,15 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) ( // If we compacted before but rbytes didn't improve much, guard against constantly compacting. // Lock should be held. func (mb *msgBlock) shouldCompactInline() bool { + // Used to preemptively obtain a lock if not already held, as this function + // states that a lock should be held. MUTEX_CHECK_DEBUG_FILE can be used to + // log instances where a lock was obtained when it was already expected + // (i.e. this function was called without a lock being held). This is used + // in place of hasLock and returning an error as this function does not have + // error as one of it's return types + if !preEmptLock(&mb.mu) { + defer logUnLock(&mb.mu) + } return mb.rbytes > compactMinimum && mb.bytes*2 < mb.rbytes && (mb.cbytes == 0 || mb.bytes*2 < mb.cbytes) } @@ -4742,6 +4989,15 @@ func (mb *msgBlock) shouldCompactInline() bool { // Ignores 2MB minimum. // Lock should be held. func (mb *msgBlock) shouldCompactSync() bool { + // Used to preemptively obtain a lock if not already held, as this function + // states that a lock should be held. MUTEX_CHECK_DEBUG_FILE can be used to + // log instances where a lock was obtained when it was already expected + // (i.e. this function was called without a lock being held). This is used + // in place of hasLock and returning an error as this function does not have + // error as one of it's return types + if !preEmptLock(&mb.mu) { + defer logUnLock(&mb.mu) + } return mb.bytes*2 < mb.rbytes && !mb.noCompact } @@ -4883,6 +5139,13 @@ func (mb *msgBlock) compactWithFloor(floor uint64) { // Grab info from a slot. // Lock should be held. func (mb *msgBlock) slotInfo(slot int) (uint32, uint32, bool, error) { + // Used to determine if a lock is already held or not, as this function + // states that a lock should be held. If no lock is held, the function will + // error with ErrNoLockHeld + if !hasLock(&mb.mu) { + return 0, 0, false, ErrNoLockHeld + } + if mb.cache == nil || slot >= len(mb.cache.idx) { return 0, 0, false, errPartialCache } @@ -5022,6 +5285,12 @@ func (mb *msgBlock) flushLoop(fch, qch chan struct{}) { // Lock should be held. func (mb *msgBlock) eraseMsg(seq uint64, ri, rl int) error { + // Used to determine if a lock is already held or not, as this function + // states that a lock should be held. If no lock is held, the function will + // error with ErrNoLockHeld + if !hasLock(&mb.mu) { + return ErrNoLockHeld + } var le = binary.LittleEndian var hdr [msgHdrSize]byte @@ -5211,6 +5480,16 @@ func (mb *msgBlock) isEmpty() bool { // Lock should be held. func (mb *msgBlock) selectNextFirst() { + // Used to preemptively obtain a lock if not already held, as this function + // states that a lock should be held. MUTEX_CHECK_DEBUG_FILE can be used to + // log instances where a lock was obtained when it was already expected + // (i.e. this function was called without a lock being held). This is used + // in place of hasLock and returning an error as this function does not have + // error as one of it's return types + if !preEmptLock(&mb.mu) { + defer logUnLock(&mb.mu) + } + var seq uint64 fseq, lseq := atomic.LoadUint64(&mb.first.seq), atomic.LoadUint64(&mb.last.seq) for seq = fseq + 1; seq <= lseq; seq++ { @@ -5250,6 +5529,16 @@ func (mb *msgBlock) selectNextFirst() { // Select the next FirstSeq // Lock should be held. func (fs *fileStore) selectNextFirst() { + // Used to preemptively obtain a lock if not already held, as this function + // states that a lock should be held. MUTEX_CHECK_DEBUG_FILE can be used to + // log instances where a lock was obtained when it was already expected + // (i.e. this function was called without a lock being held). This is used + // in place of hasLock and returning an error as this function does not have + // error as one of it's return types + if !preEmptLock(&fs.mu) { + defer logUnLock(&fs.mu) + } + if len(fs.blks) > 0 { mb := fs.blks[0] mb.mu.RLock() @@ -5267,6 +5556,16 @@ func (fs *fileStore) selectNextFirst() { // Lock should be held. func (mb *msgBlock) resetCacheExpireTimer(td time.Duration) { + // Used to preemptively obtain a lock if not already held, as this function + // states that a lock should be held. MUTEX_CHECK_DEBUG_FILE can be used to + // log instances where a lock was obtained when it was already expected + // (i.e. this function was called without a lock being held). This is used + // in place of hasLock and returning an error as this function does not have + // error as one of it's return types + if !preEmptLock(&mb.mu) { + defer logUnLock(&mb.mu) + } + if td == 0 { td = mb.cexp + 100*time.Millisecond } @@ -5279,12 +5578,30 @@ func (mb *msgBlock) resetCacheExpireTimer(td time.Duration) { // Lock should be held. func (mb *msgBlock) startCacheExpireTimer() { + // Used to preemptively obtain a lock if not already held, as this function + // states that a lock should be held. MUTEX_CHECK_DEBUG_FILE can be used to + // log instances where a lock was obtained when it was already expected + // (i.e. this function was called without a lock being held). This is used + // in place of hasLock and returning an error as this function does not have + // error as one of it's return types + if !preEmptLock(&mb.mu) { + defer logUnLock(&mb.mu) + } mb.resetCacheExpireTimer(0) } // Used when we load in a message block. // Lock should be held. func (mb *msgBlock) clearCacheAndOffset() { + // Used to preemptively obtain a lock if not already held, as this function + // states that a lock should be held. MUTEX_CHECK_DEBUG_FILE can be used to + // log instances where a lock was obtained when it was already expected + // (i.e. this function was called without a lock being held). This is used + // in place of hasLock and returning an error as this function does not have + // error as one of it's return types + if !preEmptLock(&mb.mu) { + defer logUnLock(&mb.mu) + } // Reset linear scan tracker. mb.llseq = 0 if mb.cache != nil { @@ -5296,6 +5613,15 @@ func (mb *msgBlock) clearCacheAndOffset() { // Lock should be held. func (mb *msgBlock) clearCache() { + // Used to preemptively obtain a lock if not already held, as this function + // states that a lock should be held. MUTEX_CHECK_DEBUG_FILE can be used to + // log instances where a lock was obtained when it was already expected + // (i.e. this function was called without a lock being held). This is used + // in place of hasLock and returning an error as this function does not have + // error as one of it's return types + if !preEmptLock(&mb.mu) { + defer logUnLock(&mb.mu) + } if mb.ctmr != nil { tsla := mb.sinceLastActivity() if mb.fss == nil || tsla > mb.fexp { @@ -5349,6 +5675,16 @@ func (mb *msgBlock) tryForceExpireCacheLocked() { // So we want to bypass the Pools here. // Lock should be held. func (mb *msgBlock) tryExpireWriteCache() []byte { + // Used to preemptively obtain a lock if not already held, as this function + // states that a lock should be held. MUTEX_CHECK_DEBUG_FILE can be used to + // log instances where a lock was obtained when it was already expected + // (i.e. this function was called without a lock being held). This is used + // in place of hasLock and returning an error as this function does not have + // error as one of it's return types + if !preEmptLock(&mb.mu) { + defer logUnLock(&mb.mu) + } + if mb.cache == nil { return nil } @@ -5370,6 +5706,16 @@ func (mb *msgBlock) tryExpireWriteCache() []byte { // Lock should be held. func (mb *msgBlock) expireCacheLocked() { + // Used to preemptively obtain a lock if not already held, as this function + // states that a lock should be held. MUTEX_CHECK_DEBUG_FILE can be used to + // log instances where a lock was obtained when it was already expected + // (i.e. this function was called without a lock being held). This is used + // in place of hasLock and returning an error as this function does not have + // error as one of it's return types + if !preEmptLock(&mb.mu) { + defer logUnLock(&mb.mu) + } + if mb.cache == nil && mb.fss == nil { if mb.ctmr != nil { mb.ctmr.Stop() @@ -5426,6 +5772,16 @@ func (fs *fileStore) startAgeChk() { // Lock should be held. func (fs *fileStore) resetAgeChk(delta int64) { + // Used to preemptively obtain a lock if not already held, as this function + // states that a lock should be held. MUTEX_CHECK_DEBUG_FILE can be used to + // log instances where a lock was obtained when it was already expected + // (i.e. this function was called without a lock being held). This is used + // in place of hasLock and returning an error as this function does not have + // error as one of it's return types + if !preEmptLock(&fs.mu) { + defer logUnLock(&fs.mu) + } + var next int64 = math.MaxInt64 if fs.ttls != nil { next = fs.ttls.GetNextExpiration(next) @@ -5471,6 +5827,16 @@ func (fs *fileStore) resetAgeChk(delta int64) { // Lock should be held. func (fs *fileStore) cancelAgeChk() { + // Used to preemptively obtain a lock if not already held, as this function + // states that a lock should be held. MUTEX_CHECK_DEBUG_FILE can be used to + // log instances where a lock was obtained when it was already expected + // (i.e. this function was called without a lock being held). This is used + // in place of hasLock and returning an error as this function does not have + // error as one of it's return types + if !preEmptLock(&fs.mu) { + defer logUnLock(&fs.mu) + } + if fs.ageChk != nil { fs.ageChk.Stop() fs.ageChk = nil @@ -5602,6 +5968,16 @@ func (fs *fileStore) expireMsgs() { // Lock should be held. func (fs *fileStore) checkAndFlushAllBlocks() { + // Used to preemptively obtain a lock if not already held, as this function + // states that a lock should be held. MUTEX_CHECK_DEBUG_FILE can be used to + // log instances where a lock was obtained when it was already expected + // (i.e. this function was called without a lock being held). This is used + // in place of hasLock and returning an error as this function does not have + // error as one of it's return types + if !preEmptLock(&fs.mu) { + defer logUnLock(&fs.mu) + } + for _, mb := range fs.blks { if mb.pendingWriteSize() > 0 { // Since fs lock is held need to pull this apart in case we need to rebuild state. @@ -5641,6 +6017,13 @@ func (fs *fileStore) checkMsgs() *LostStreamData { // Lock should be held. func (mb *msgBlock) enableForWriting(fip bool) error { + // Used to determine if a lock is already held or not, as this function + // states that a lock should be held. If no lock is held, the function will + // error with ErrNoLockHeld + if !hasLock(&mb.mu) { + return ErrNoLockHeld + } + if mb == nil { return errNoMsgBlk } @@ -5845,6 +6228,13 @@ func (mb *msgBlock) closeFDsLockedNoCheck() { // This marks we are in flush and will return nil if asked again until cleared. // Lock should be held. func (mb *msgBlock) bytesPending() ([]byte, error) { + // Used to determine if a lock is already held or not, as this function + // states that a lock should be held. If no lock is held, the function will + // error with ErrNoLockHeld + if !hasLock(&mb.mu) { + return nil, ErrNoLockHeld + } + if mb == nil || mb.mfd == nil { return nil, errNoPending } @@ -5872,6 +6262,16 @@ func (mb *msgBlock) blkSize() uint64 { // Update accounting on a write msg. // Lock should be held. func (mb *msgBlock) updateAccounting(seq uint64, ts int64, rl uint64) { + // Used to preemptively obtain a lock if not already held, as this function + // states that a lock should be held. MUTEX_CHECK_DEBUG_FILE can be used to + // log instances where a lock was obtained when it was already expected + // (i.e. this function was called without a lock being held). This is used + // in place of hasLock and returning an error as this function does not have + // error as one of it's return types + if !preEmptLock(&mb.mu) { + defer logUnLock(&mb.mu) + } + isDeleted := seq&ebit != 0 if isDeleted { seq = seq &^ ebit @@ -5896,6 +6296,13 @@ func (mb *msgBlock) updateAccounting(seq uint64, ts int64, rl uint64) { func (fs *fileStore) writeMsgRecord(seq uint64, ts int64, subj string, hdr, msg []byte) (uint64, error) { var err error + // Used to determine if a lock is already held or not, as this function + // states that a lock should be held. If no lock is held, the function will + // error with ErrNoLockHeld + if !hasLock(&fs.mu) { + return 0, ErrNoLockHeld + } + // Get size for this message. rl := fileStoreMsgSize(subj, hdr, msg) if rl&hbit != 0 { @@ -5927,6 +6334,13 @@ func (fs *fileStore) writeMsgRecord(seq uint64, ts int64, subj string, hdr, msg // For writing tombstones to our lmb. This version will enforce maximum block sizes. // Lock should be held. func (fs *fileStore) writeTombstone(seq uint64, ts int64) error { + // Used to determine if a lock is already held or not, as this function + // states that a lock should be held. If no lock is held, the function will + // error with ErrNoLockHeld + if !hasLock(&fs.mu) { + return ErrNoLockHeld + } + // Grab our current last message block. lmb := fs.lmb var err error @@ -6101,6 +6515,13 @@ func (mb *msgBlock) decompressIfNeeded(buf []byte) ([]byte, error) { // Lock should be held. func (mb *msgBlock) ensureRawBytesLoaded() error { + // Used to determine if a lock is already held or not, as this function + // states that a lock should be held. If no lock is held, the function will + // error with ErrNoLockHeld + if !hasLock(&mb.mu) { + return ErrNoLockHeld + } + if mb.rbytes > 0 { return nil } @@ -6251,6 +6672,16 @@ func (fs *fileStore) selectMsgBlock(seq uint64) *msgBlock { // Lock should be held. func (fs *fileStore) selectMsgBlockWithIndex(seq uint64) (int, *msgBlock) { + // Used to preemptively obtain a lock if not already held, as this function + // states that a lock should be held. MUTEX_CHECK_DEBUG_FILE can be used to + // log instances where a lock was obtained when it was already expected + // (i.e. this function was called without a lock being held). This is used + // in place of hasLock and returning an error as this function does not have + // error as one of it's return types + if !preEmptLock(&fs.mu) { + defer logUnLock(&fs.mu) + } + // Check for out of range. if seq < fs.state.FirstSeq || seq > fs.state.LastSeq || fs.state.Msgs == 0 { return -1, nil @@ -6314,6 +6745,13 @@ func (fs *fileStore) selectMsgBlockForStart(minTime time.Time) *msgBlock { // Index a raw msg buffer. // Lock should be held. func (mb *msgBlock) indexCacheBuf(buf []byte) error { + // Used to determine if a lock is already held or not, as this function + // states that a lock should be held. If no lock is held, the function will + // error with ErrNoLockHeld + if !hasLock(&mb.mu) { + return ErrNoLockHeld + } + var le = binary.LittleEndian var fseq uint64 @@ -6489,6 +6927,9 @@ func (mb *msgBlock) flushPendingMsgs() error { // mb.mfd should not be nil. // Lock should held. func (mb *msgBlock) writeAt(buf []byte, woff int64) (int, error) { + if !preEmptLock(&mb.mu) { + defer logUnLock(&mb.mu) + } // Used to mock write failures. if mb.mockWriteErr { // Reset on trip. @@ -6504,6 +6945,13 @@ func (mb *msgBlock) writeAt(buf []byte, woff int64) (int, error) { // flushPendingMsgsLocked writes out any messages for this message block. // Lock should be held. func (mb *msgBlock) flushPendingMsgsLocked() (*LostStreamData, error) { + // Used to determine if a lock is already held or not, as this function + // states that a lock should be held. If no lock is held, the function will + // error with ErrNoLockHeld + if !hasLock(&mb.mu) { + return nil, ErrNoLockHeld + } + // Signals us that we need to rebuild filestore state. var fsLostData *LostStreamData @@ -6613,6 +7061,11 @@ func (mb *msgBlock) flushPendingMsgsLocked() (*LostStreamData, error) { // Lock should be held. func (mb *msgBlock) clearLoading() { + // Used to determine if a lock is already held or not, as this function + // states that a lock should be held. If no lock is held, the function will + // error with ErrNoLockHeld + hasLock(&mb.mu) + mb.loading = false } @@ -6626,6 +7079,16 @@ func (mb *msgBlock) loadMsgs() error { // Lock should be held. func (mb *msgBlock) cacheAlreadyLoaded() bool { + // Used to preemptively obtain a lock if not already held, as this function + // states that a lock should be held. MUTEX_CHECK_DEBUG_FILE can be used to + // log instances where a lock was obtained when it was already expected + // (i.e. this function was called without a lock being held). This is used + // in place of hasLock and returning an error as this function does not have + // error as one of it's return types + if !preEmptLock(&mb.mu) { + defer logUnLock(&mb.mu) + } + if mb.cache == nil || mb.cache.off != 0 || mb.cache.fseq == 0 || len(mb.cache.buf) == 0 { return false } @@ -6635,18 +7098,45 @@ func (mb *msgBlock) cacheAlreadyLoaded() bool { // Lock should be held. func (mb *msgBlock) cacheNotLoaded() bool { + // Used to preemptively obtain a lock if not already held, as this function + // states that a lock should be held. MUTEX_CHECK_DEBUG_FILE can be used to + // log instances where a lock was obtained when it was already expected + // (i.e. this function was called without a lock being held). This is used + // in place of hasLock and returning an error as this function does not have + // error as one of it's return types + if !preEmptLock(&mb.mu) { + defer logUnLock(&mb.mu) + } + return !mb.cacheAlreadyLoaded() } // Report if our fss is not loaded. // Lock should be held. func (mb *msgBlock) fssNotLoaded() bool { + // Used to preemptively obtain a lock if not already held, as this function + // states that a lock should be held. MUTEX_CHECK_DEBUG_FILE can be used to + // log instances where a lock was obtained when it was already expected + // (i.e. this function was called without a lock being held). This is used + // in place of hasLock and returning an error as this function does not have + // error as one of it's return types + if !preEmptLock(&mb.mu) { + defer logUnLock(&mb.mu) + } + return mb.fss == nil && !mb.noTrack } // Wrap openBlock for the gated semaphore processing. // Lock should be held func (mb *msgBlock) openBlock() (*os.File, error) { + // Used to determine if a lock is already held or not, as this function + // states that a lock should be held. If no lock is held, the function will + // error with ErrNoLockHeld + if !hasLock(&mb.mu) { + return nil, ErrNoLockHeld + } + // Gate with concurrent IO semaphore. <-dios f, err := os.Open(mb.mfn) @@ -6657,6 +7147,13 @@ func (mb *msgBlock) openBlock() (*os.File, error) { // Used to load in the block contents. // Lock should be held and all conditionals satisfied prior. func (mb *msgBlock) loadBlock(buf []byte) ([]byte, error) { + // Used to determine if a lock is already held or not, as this function + // states that a lock should be held. If no lock is held, the function will + // error with ErrNoLockHeld + if !hasLock(&mb.mu) { + return nil, ErrNoLockHeld + } + var f *os.File // Re-use if we have mfd open. if mb.mfd != nil { @@ -6715,6 +7212,13 @@ func (mb *msgBlock) loadBlock(buf []byte) ([]byte, error) { // Lock should be held. func (mb *msgBlock) loadMsgsWithLock() error { + // Used to determine if a lock is already held or not, as this function + // states that a lock should be held. If no lock is held, the function will + // error with ErrNoLockHeld + if !hasLock(&mb.mu) { + return ErrNoLockHeld + } + // Check for encryption, we do not load keys on startup anymore so might need to load them here. if mb.fs != nil && mb.fs.prf != nil && (mb.aek == nil || mb.bek == nil) { if err := mb.fs.loadEncryptionForMsgBlock(mb); err != nil { @@ -6889,6 +7393,13 @@ const ( // Will do a lookup from cache. // Lock should be held. func (mb *msgBlock) cacheLookup(seq uint64, sm *StoreMsg) (*StoreMsg, error) { + // Used to determine if a lock is already held or not, as this function + // states that a lock should be held. If no lock is held, the function will + // error with ErrNoLockHeld + if !hasLock(&mb.mu) { + return nil, ErrNoLockHeld + } + if seq < atomic.LoadUint64(&mb.first.seq) || seq > atomic.LoadUint64(&mb.last.seq) { return nil, ErrStoreMsgNotFound } @@ -7030,6 +7541,13 @@ func (fs *fileStore) msgForSeq(seq uint64, sm *StoreMsg) (*StoreMsg, error) { // Internal function to return msg parts from a raw buffer. // Lock should be held. func (mb *msgBlock) msgFromBuf(buf []byte, sm *StoreMsg, hh hash.Hash64) (*StoreMsg, error) { + // Used to determine if a lock is already held or not, as this function + // states that a lock should be held. If no lock is held, the function will + // error with ErrNoLockHeld + if !hasLock(&mb.mu) { + return nil, ErrNoLockHeld + } + if len(buf) < emptyRecordLen { return nil, errBadMsg } @@ -7364,6 +7882,16 @@ func (fs *fileStore) Type() StorageType { // Returns number of subjects in this store. // Lock should be held. func (fs *fileStore) numSubjects() int { + // Used to preemptively obtain a lock if not already held, as this function + // states that a lock should be held. MUTEX_CHECK_DEBUG_FILE can be used to + // log instances where a lock was obtained when it was already expected + // (i.e. this function was called without a lock being held). This is used + // in place of hasLock and returning an error as this function does not have + // error as one of it's return types + if !preEmptLock(&fs.mu) { + defer logUnLock(&fs.mu) + } + return fs.psim.Size() } @@ -8215,6 +8743,16 @@ func (mb *msgBlock) tombs() []msgId { // Return all active tombstones in this msgBlock. // Write lock should be held. func (mb *msgBlock) tombsLocked() []msgId { + // Used to preemptively obtain a lock if not already held, as this function + // states that a lock should be held. MUTEX_CHECK_DEBUG_FILE can be used to + // log instances where a lock was obtained when it was already expected + // (i.e. this function was called without a lock being held). This is used + // in place of hasLock and returning an error as this function does not have + // error as one of it's return types + if !preEmptLock(&mb.mu) { + defer logUnLock(&mb.mu) + } + if mb.cacheNotLoaded() { if err := mb.loadMsgsWithLock(); err != nil { return nil @@ -8276,10 +8814,13 @@ func (fs *fileStore) Truncate(seq uint64) error { // Set lmb to nlmb and make sure writeable. fs.lmb = nlmb + nlmb.mu.Lock() if err := nlmb.enableForWriting(fs.fip); err != nil { fs.mu.Unlock() return err } + nlmb.mu.Unlock() + // Collect all tombstones, we want to put these back so we can survive // a restore without index.db properly. var tombs []msgId @@ -8372,6 +8913,16 @@ func (fs *fileStore) numMsgBlocks() int { // Will add a new msgBlock. // Lock should be held. func (fs *fileStore) addMsgBlock(mb *msgBlock) { + // Used to preemptively obtain a lock if not already held, as this function + // states that a lock should be held. MUTEX_CHECK_DEBUG_FILE can be used to + // log instances where a lock was obtained when it was already expected + // (i.e. this function was called without a lock being held). This is used + // in place of hasLock and returning an error as this function does not have + // error as one of it's return types + if !preEmptLock(&fs.mu) { + defer logUnLock(&fs.mu) + } + fs.blks = append(fs.blks, mb) fs.lmb = mb fs.bim[mb.index] = mb @@ -8464,6 +9015,16 @@ func (mb *msgBlock) dirtyCloseWithRemove(remove bool) error { // Remove a seq from the fss and select new first. // Lock should be held. func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64) { + // Used to preemptively obtain a lock if not already held, as this function + // states that a lock should be held. MUTEX_CHECK_DEBUG_FILE can be used to + // log instances where a lock was obtained when it was already expected + // (i.e. this function was called without a lock being held). This is used + // in place of hasLock and returning an error as this function does not have + // error as one of it's return types + if !preEmptLock(&mb.mu) { + defer logUnLock(&mb.mu) + } + mb.ensurePerSubjectInfoLoaded() if mb.fss == nil { return @@ -8601,6 +9162,16 @@ func (mb *msgBlock) recalculateForSubj(subj string, ss *SimpleState) { // Lock should be held. func (fs *fileStore) resetGlobalPerSubjectInfo() { + // Used to preemptively obtain a lock if not already held, as this function + // states that a lock should be held. MUTEX_CHECK_DEBUG_FILE can be used to + // log instances where a lock was obtained when it was already expected + // (i.e. this function was called without a lock being held). This is used + // in place of hasLock and returning an error as this function does not have + // error as one of it's return types + if !preEmptLock(&fs.mu) { + defer logUnLock(&fs.mu) + } + // Clear any global subject state. fs.psim, fs.tsl = fs.psim.Empty(), 0 for _, mb := range fs.blks { @@ -8610,6 +9181,12 @@ func (fs *fileStore) resetGlobalPerSubjectInfo() { // Lock should be held. func (mb *msgBlock) resetPerSubjectInfo() error { + // Used to determine if a lock is already held or not, as this function + // states that a lock should be held. If no lock is held, the function will + // error with ErrNoLockHeld + if !hasLock(&mb.mu) { + return ErrNoLockHeld + } mb.fss = nil return mb.generatePerSubjectInfo() } @@ -8617,6 +9194,13 @@ func (mb *msgBlock) resetPerSubjectInfo() error { // generatePerSubjectInfo will generate the per subject info via the raw msg block. // Lock should be held. func (mb *msgBlock) generatePerSubjectInfo() error { + // Used to determine if a lock is already held or not, as this function + // states that a lock should be held. If no lock is held, the function will + // error with ErrNoLockHeld + if !hasLock(&mb.mu) { + return ErrNoLockHeld + } + // Check if this mb is empty. This can happen when its the last one and we are holding onto it for seq and timestamp info. if mb.msgs == 0 { return nil @@ -8678,6 +9262,13 @@ func (mb *msgBlock) generatePerSubjectInfo() error { // Helper to make sure fss loaded if we are tracking. // Lock should be held func (mb *msgBlock) ensurePerSubjectInfoLoaded() error { + // Used to determine if a lock is already held or not, as this function + // states that a lock should be held. If no lock is held, the function will + // error with ErrNoLockHeld + if !hasLock(&mb.mu) { + return ErrNoLockHeld + } + if mb.fss != nil || mb.noTrack { if mb.fss != nil { // Mark fss activity. @@ -8843,6 +9434,16 @@ func (fs *fileStore) Delete() error { // Lock should be held. func (fs *fileStore) setSyncTimer() { + // Used to preemptively obtain a lock if not already held, as this function + // states that a lock should be held. MUTEX_CHECK_DEBUG_FILE can be used to + // log instances where a lock was obtained when it was already expected + // (i.e. this function was called without a lock being held). This is used + // in place of hasLock and returning an error as this function does not have + // error as one of it's return types + if !preEmptLock(&fs.mu) { + defer logUnLock(&fs.mu) + } + if fs.syncTmr != nil { fs.syncTmr.Reset(fs.fcfg.SyncInterval) } else { @@ -8856,6 +9457,16 @@ func (fs *fileStore) setSyncTimer() { // Lock should be held. func (fs *fileStore) cancelSyncTimer() { + // Used to preemptively obtain a lock if not already held, as this function + // states that a lock should be held. MUTEX_CHECK_DEBUG_FILE can be used to + // log instances where a lock was obtained when it was already expected + // (i.e. this function was called without a lock being held). This is used + // in place of hasLock and returning an error as this function does not have + // error as one of it's return types + if !preEmptLock(&fs.mu) { + defer logUnLock(&fs.mu) + } + if fs.syncTmr != nil { fs.syncTmr.Stop() fs.syncTmr = nil @@ -9786,6 +10397,9 @@ func (o *consumerFileStore) convertCipher() error { // Kick flusher for this consumer. // Lock should be held. func (o *consumerFileStore) kickFlusher() { + // Used to debug where a function is expecting a lock, but doesn't return an + // error to savely handle a lack of lock + if o.fch != nil { select { case o.fch <- struct{}{}: diff --git a/server/opts.go b/server/opts.go index 3cf8f23aa5e..abbb7d673b2 100644 --- a/server/opts.go +++ b/server/opts.go @@ -453,6 +453,8 @@ type Options struct { // configDigest represents the state of configuration. configDigest string + + // mutex debugging options } // WebsocketOpts are options for websocket diff --git a/server/store.go b/server/store.go index 53dab93871e..488e51bc503 100644 --- a/server/store.go +++ b/server/store.go @@ -68,6 +68,8 @@ var ( ErrCorruptStreamState = errors.New("stream state snapshot is corrupt") // ErrTooManyResults ErrTooManyResults = errors.New("too many matching results for request") + // ErrNoLockHeld + ErrNoLockHeld = errors.New("expected lock was not held") ) // StoreMsg is the stored message format for messages that are retained by the Store layer. diff --git a/server/util.go b/server/util.go index f9fd695c328..2ad202f4370 100644 --- a/server/util.go +++ b/server/util.go @@ -19,12 +19,16 @@ import ( "encoding/json" "errors" "fmt" + "log" "math" "net" "net/url" + "os" "reflect" + "runtime" "strconv" "strings" + "sync" "time" ) @@ -340,3 +344,184 @@ func generateInfoJSON(info *Info) []byte { pcs := [][]byte{[]byte("INFO"), b, []byte(CR_LF)} return bytes.Join(pcs, []byte(" ")) } + +// hasLock tests if a lock is held on the supplied RWMutex, so that functions +// which require a lock to be held by their caller can check if that lock is +// indeed held or not. +// +// hasLock is non-blocking and so if a lock is already held it will immediately +// return true or false depending on if the lock is held or not +// +// hasLock does not log anything even when MUTEX_CHECK_DEBUG_FILE is set as it +// is intended to assist with logic in the rest of the code. To capture +// debugging for where a lock is expected, please use preEmptLock +func hasLock(lock *sync.RWMutex) bool { + // May want to switch this to being part of the main configuration instead + // of an ENV VAR if this is something that sticks around + logfile, ok := os.LookupEnv("MUTEX_CHECK_DEBUG_FILE") + if ok { + return hasLockthing(lock, false, &logfile) + } + return hasLockthing(lock, false, nil) +} + +// preEmptLock is the same as hasLock, except that if it receives a lock it does +// not unlock it, allowing the calling function to effectively obtain a mutex +// lock if it was incorrectly called without one being held. The calling +// function will need to unlock the mutex when complete +// +// preEmptLock is non-blocking and so if a lock is already held it will immediately +// return true or false depending on if the lock is held or not +// +// Setting the environment variable MUTEX_CHECK_DEBUG_FILE causes hasLock to +// append a line to the filename set in the variable which contains a JSON'ified +// stacktrace of what called it, so that it is simple to debug where errant +// locks may lie +func preEmptLock(lock *sync.RWMutex) bool { + // May want to switch this to being part of the main configuration instead + // of an ENV VAR if this is something that sticks around + logfile, ok := os.LookupEnv("MUTEX_CHECK_DEBUG_FILE") + if ok { + return hasLockthing(lock, true, &logfile) + } + return hasLockthing(lock, true, nil) +} + +// hasLockthing is the function which unlies hasLock and preEmptLock, which +// should be called in preference to calling this directly. +func hasLockthing(lock *sync.RWMutex, keepLock bool, logfile *string) bool { + // TryLock is non-blocking and so this can be used to test if a lock is held + // by something already. If we can get a lock, then none was already held, + // if we can't then something already has one + if !lock.TryLock() { + // We didn't get a lock, therefore something else has it. Therefore we + // don't need to release it and can immediately return true + return true + } + + // We obtained a lock, which means nothing else had a lock. Therefore we + // can (and should) safely unlock it again (as we are now the one with the + // lock). We can do this immediately as we were only locking to test, not + // to actually use it + if !keepLock { + lock.Unlock() + } + + // Probably only want to use this for debugging, as using `runtime` for + // normal logging is probably not a great idea. However this allows us to + // trace back calls to `hasLock` to determine why a function expecting a + // lock to be held has been called without one being held + if logfile != nil { + var ( + output trace + ) + pc := make([]uintptr, 32) + callers := runtime.Callers(1, pc) + // Can't use range here as we risk a nil pointer deref if the slice isn't full + for i := 0; i < callers; i++ { + // Work our way back through the stack trace + runtimeFunc := runtime.FuncForPC(pc[i]) + file, line := runtimeFunc.FileLine(pc[i]) + + // Will use this to create JSON output to make it easier to read/parse + output.Trace = append(output.Trace, traceEntry{ + Depth: i, + File: file, + Line: line, + Function: runtimeFunc.Name(), + }) + } + + outStr, err := json.Marshal(output) + if err != nil { + outStr = []byte(fmt.Sprintf("could not marshal output, err=[%s], output=[%+v]", err, output)) + } + outStr = append(outStr, '\n') + + logHandle, err := os.OpenFile(*logfile, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600) + if err != nil { + log.Printf("hasLock could not write to mutex log, err=[%s]", err) + } + defer logHandle.Close() + _, err = logHandle.WriteString(string(outStr)) + if err != nil { + log.Printf("hasLock could not write to mutex log, err=[%s]", err) + } + } + + // We can state that nothing had a lock, because we got one, and so can return false + return false +} + +// logUnLock simply unlocks a held lock, but with additional logging to pair +// with preEmptLock and hasLock. e.g. you can check that the log contains both +// locks and unlocks to ensure that all paths are covered +func logUnLock(lock *sync.RWMutex) { + doubleUnlock := false + + // Unlock the lock *if* it was held + if lock.TryLock() { + // Hmmmm, got a lock when we were trying to unlock + doubleUnlock = true + } + + // Either the lock was held already and we want to unlock it, or it was not + // and TryLock obtained it, so either way we want to unlock :) + lock.Unlock() + + logfile, ok := os.LookupEnv("MUTEX_CHECK_DEBUG_FILE") + if ok { + var ( + output trace + ) + if doubleUnlock { + output.Comments = "attempted to double unlock" + } + pc := make([]uintptr, 32) + callers := runtime.Callers(1, pc) + // Can't use range here as we risk a nil pointer deref if the slice isn't full + for i := 0; i < callers; i++ { + // Work our way back through the stack trace + runtimeFunc := runtime.FuncForPC(pc[i]) + file, line := runtimeFunc.FileLine(pc[i]) + + // Will use this to create JSON output to make it easier to read/parse + output.Trace = append(output.Trace, traceEntry{ + Depth: i, + File: file, + Line: line, + Function: runtimeFunc.Name(), + }) + } + + outStr, err := json.Marshal(output) + if err != nil { + outStr = []byte(fmt.Sprintf("could not marshal output, err=[%s], output=[%+v]", err, output)) + } + outStr = append(outStr, '\n') + + logHandle, err := os.OpenFile(logfile, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600) + if err != nil { + log.Printf("logUnLock could not write to mutex log, err=[%s]", err) + } + defer logHandle.Close() + _, err = logHandle.WriteString(string(outStr)) + if err != nil { + log.Printf("logUnLock could not write to mutex log, err=[%s]", err) + } + } + +} + +// trace and traceEntry are types used to construct the json'ified stack trace +type trace struct { + Trace []traceEntry `json:"trace"` + Comments string `json:"comments,omitempty"` +} + +type traceEntry struct { + Depth int `json:"depth"` + File string `json:"file"` + Line int `json:"line"` + Function string `json:"function"` +}