diff --git a/server/filestore.go b/server/filestore.go index 73eee4ac7f6..8552c84b110 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -2286,6 +2286,10 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor subs = append(subs, subj) } } + // Check if we matched anything + if len(subs) == 0 { + return nil, didLoad, ErrStoreMsgNotFound + } } fseq = lseq + 1 for _, subj := range subs { @@ -2304,6 +2308,10 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor } } + if fseq > lseq { + return nil, didLoad, ErrStoreMsgNotFound + } + // If we guess to not do a linear scan, but the above resulted in alot of subs that will // need to be checked for every scanned message, revert. // TODO(dlc) - we could memoize the subs across calls. @@ -2311,10 +2319,6 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor doLinearScan = true } - if fseq > lseq { - return nil, didLoad, ErrStoreMsgNotFound - } - // Need messages loaded from here on out. if mb.cacheNotLoaded() { if err := mb.loadMsgsWithLock(); err != nil { @@ -3774,15 +3778,6 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) ( secure = false } - if fs.state.Msgs == 0 { - var err = ErrStoreEOF - if seq <= fs.state.LastSeq { - err = ErrStoreMsgNotFound - } - fsUnlock() - return false, err - } - mb := fs.selectMsgBlock(seq) if mb == nil { var err = ErrStoreEOF @@ -3795,15 +3790,8 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) ( mb.mu.Lock() - // See if we are closed or the sequence number is still relevant. - if mb.closed || seq < atomic.LoadUint64(&mb.first.seq) { - mb.mu.Unlock() - fsUnlock() - return false, nil - } - - // Now check dmap if it is there. - if mb.dmap.Exists(seq) { + // See if we are closed or the sequence number is still relevant or if we know its deleted. + if mb.closed || seq < atomic.LoadUint64(&mb.first.seq) || mb.dmap.Exists(seq) { mb.mu.Unlock() fsUnlock() return false, nil @@ -3813,27 +3801,11 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) ( // Now just load regardless. // TODO(dlc) - Figure out a way not to have to load it in, we need subject tracking outside main data block. if mb.cacheNotLoaded() { - // We do not want to block possible activity within another msg block. - // We have to unlock both locks and acquire the mb lock in the loadMsgs() call to avoid a deadlock if another - // go routine was trying to get fs then this mb lock at the same time. E.g. another call to remove for same block. - mb.mu.Unlock() - fsUnlock() - if err := mb.loadMsgs(); err != nil { - return false, err - } - fsLock() - // We need to check if things changed out from underneath us. - if fs.closed { - fsUnlock() - return false, ErrStoreClosed - } - mb.mu.Lock() - if mb.closed || seq < atomic.LoadUint64(&mb.first.seq) { + if err := mb.loadMsgsWithLock(); err != nil { mb.mu.Unlock() fsUnlock() - return false, nil + return false, err } - // cacheLookup below will do dmap check so no need to repeat here. } var smv StoreMsg @@ -5288,7 +5260,7 @@ func (fs *fileStore) selectMsgBlock(seq uint64) *msgBlock { // Lock should be held. func (fs *fileStore) selectMsgBlockWithIndex(seq uint64) (int, *msgBlock) { // Check for out of range. - if seq < fs.state.FirstSeq || seq > fs.state.LastSeq { + if seq < fs.state.FirstSeq || seq > fs.state.LastSeq || fs.state.Msgs == 0 { return -1, nil } @@ -7234,7 +7206,6 @@ func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *Si // Mark first as updated. ss.firstNeedsUpdate = false - startSeq++ startSlot := int(startSeq - mb.cache.fseq) if startSlot >= len(mb.cache.idx) { diff --git a/server/filestore_test.go b/server/filestore_test.go index 45370cf7bb6..d70d4e4f25b 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -6071,8 +6071,6 @@ func TestFileStoreTrackSubjLenForPSIM(t *testing.T) { // This was used to make sure our estimate was correct, but not needed normally. func TestFileStoreLargeFullStatePSIM(t *testing.T) { - t.Skip() - sd := t.TempDir() fs, err := newFileStore( FileStoreConfig{StoreDir: sd},