Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Fixed a bug that could cause filestore LoadNextMsg to skip valid msgs. #5266

Merged
merged 3 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 13 additions & 42 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2286,6 +2286,10 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor
subs = append(subs, subj)
}
}
// Check if we matched anything
if len(subs) == 0 {
return nil, didLoad, ErrStoreMsgNotFound
}
}
fseq = lseq + 1
for _, subj := range subs {
Expand All @@ -2304,17 +2308,17 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor
}
}

if fseq > lseq {
return nil, didLoad, ErrStoreMsgNotFound
}

// If we guess to not do a linear scan, but the above resulted in alot of subs that will
// need to be checked for every scanned message, revert.
// TODO(dlc) - we could memoize the subs across calls.
if len(subs) > int(lseq-fseq) {
doLinearScan = true
}

if fseq > lseq {
return nil, didLoad, ErrStoreMsgNotFound
}

// Need messages loaded from here on out.
if mb.cacheNotLoaded() {
if err := mb.loadMsgsWithLock(); err != nil {
Expand Down Expand Up @@ -3774,15 +3778,6 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
secure = false
}

if fs.state.Msgs == 0 {
var err = ErrStoreEOF
if seq <= fs.state.LastSeq {
err = ErrStoreMsgNotFound
}
fsUnlock()
return false, err
}

mb := fs.selectMsgBlock(seq)
if mb == nil {
var err = ErrStoreEOF
Expand All @@ -3795,15 +3790,8 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (

mb.mu.Lock()

// See if we are closed or the sequence number is still relevant.
if mb.closed || seq < atomic.LoadUint64(&mb.first.seq) {
mb.mu.Unlock()
fsUnlock()
return false, nil
}

// Now check dmap if it is there.
if mb.dmap.Exists(seq) {
// See if we are closed or the sequence number is still relevant or if we know its deleted.
if mb.closed || seq < atomic.LoadUint64(&mb.first.seq) || mb.dmap.Exists(seq) {
mb.mu.Unlock()
fsUnlock()
return false, nil
Expand All @@ -3813,27 +3801,11 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
// Now just load regardless.
// TODO(dlc) - Figure out a way not to have to load it in, we need subject tracking outside main data block.
if mb.cacheNotLoaded() {
// We do not want to block possible activity within another msg block.
// We have to unlock both locks and acquire the mb lock in the loadMsgs() call to avoid a deadlock if another
// go routine was trying to get fs then this mb lock at the same time. E.g. another call to remove for same block.
mb.mu.Unlock()
fsUnlock()
if err := mb.loadMsgs(); err != nil {
return false, err
}
fsLock()
// We need to check if things changed out from underneath us.
if fs.closed {
fsUnlock()
return false, ErrStoreClosed
}
mb.mu.Lock()
if mb.closed || seq < atomic.LoadUint64(&mb.first.seq) {
if err := mb.loadMsgsWithLock(); err != nil {
mb.mu.Unlock()
fsUnlock()
return false, nil
return false, err
}
// cacheLookup below will do dmap check so no need to repeat here.
}

var smv StoreMsg
Expand Down Expand Up @@ -5288,7 +5260,7 @@ func (fs *fileStore) selectMsgBlock(seq uint64) *msgBlock {
// Lock should be held.
func (fs *fileStore) selectMsgBlockWithIndex(seq uint64) (int, *msgBlock) {
// Check for out of range.
if seq < fs.state.FirstSeq || seq > fs.state.LastSeq {
if seq < fs.state.FirstSeq || seq > fs.state.LastSeq || fs.state.Msgs == 0 {
return -1, nil
}

Expand Down Expand Up @@ -7234,7 +7206,6 @@ func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *Si

// Mark first as updated.
ss.firstNeedsUpdate = false
startSeq++

startSlot := int(startSeq - mb.cache.fseq)
if startSlot >= len(mb.cache.idx) {
Expand Down
2 changes: 0 additions & 2 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6071,8 +6071,6 @@ func TestFileStoreTrackSubjLenForPSIM(t *testing.T) {

// This was used to make sure our estimate was correct, but not needed normally.
func TestFileStoreLargeFullStatePSIM(t *testing.T) {
t.Skip()

sd := t.TempDir()
fs, err := newFileStore(
FileStoreConfig{StoreDir: sd},
Expand Down