Skip to content

Commit 240fce9

Browse files
MauriceVanVeenwallyqs
authored andcommitted
[FIXED] ss.Last was not kept up-to-date
Signed-off-by: Maurice van Veen <[email protected]>
1 parent 45b4857 commit 240fce9

File tree

4 files changed

+150
-71
lines changed

4 files changed

+150
-71
lines changed

server/filestore.go

+100-45
Original file line numberDiff line numberDiff line change
@@ -2315,8 +2315,8 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor
23152315
fseq = lseq + 1
23162316
for _, subj := range subs {
23172317
ss, _ := mb.fss.Find(stringToBytes(subj))
2318-
if ss != nil && ss.firstNeedsUpdate {
2319-
mb.recalculateFirstForSubj(subj, ss.First, ss)
2318+
if ss != nil && (ss.firstNeedsUpdate || ss.lastNeedsUpdate) {
2319+
mb.recalculateForSubj(subj, ss)
23202320
}
23212321
if ss == nil || start > ss.Last || ss.First >= fseq {
23222322
continue
@@ -2445,8 +2445,8 @@ func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, sseq uint64) (
24452445
// If we already found a partial then don't do anything else.
24462446
return
24472447
}
2448-
if ss.firstNeedsUpdate {
2449-
mb.recalculateFirstForSubj(bytesToString(bsubj), ss.First, ss)
2448+
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
2449+
mb.recalculateForSubj(bytesToString(bsubj), ss)
24502450
}
24512451
if sseq <= ss.First {
24522452
update(ss)
@@ -2745,8 +2745,8 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState {
27452745
mb.lsts = time.Now().UnixNano()
27462746
mb.fss.Match(stringToBytes(subject), func(bsubj []byte, ss *SimpleState) {
27472747
subj := string(bsubj)
2748-
if ss.firstNeedsUpdate {
2749-
mb.recalculateFirstForSubj(subj, ss.First, ss)
2748+
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
2749+
mb.recalculateForSubj(subj, ss)
27502750
}
27512751
oss := fss[subj]
27522752
if oss.First == 0 { // New
@@ -2936,8 +2936,8 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool)
29362936
return
29372937
}
29382938
subj := bytesToString(bsubj)
2939-
if ss.firstNeedsUpdate {
2940-
mb.recalculateFirstForSubj(subj, ss.First, ss)
2939+
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
2940+
mb.recalculateForSubj(subj, ss)
29412941
}
29422942
if sseq <= ss.First {
29432943
t += ss.Msgs
@@ -3224,8 +3224,8 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo
32243224
// If we already found a partial then don't do anything else.
32253225
return
32263226
}
3227-
if ss.firstNeedsUpdate {
3228-
mb.recalculateFirstForSubj(subj, ss.First, ss)
3227+
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
3228+
mb.recalculateForSubj(subj, ss)
32293229
}
32303230
if sseq <= ss.First {
32313231
t += ss.Msgs
@@ -3898,8 +3898,8 @@ func (fs *fileStore) firstSeqForSubj(subj string) (uint64, error) {
38983898
info.fblk = i
38993899
}
39003900
}
3901-
if ss.firstNeedsUpdate {
3902-
mb.recalculateFirstForSubj(subj, ss.First, ss)
3901+
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
3902+
mb.recalculateForSubj(subj, ss)
39033903
}
39043904
mb.mu.Unlock()
39053905
// Re-acquire fs lock
@@ -4030,8 +4030,8 @@ func (fs *fileStore) enforceMsgPerSubjectLimit(fireCallback bool) {
40304030
mb.mu.Lock()
40314031
mb.ensurePerSubjectInfoLoaded()
40324032
ss, ok := mb.fss.Find(stringToBytes(subj))
4033-
if ok && ss != nil && ss.firstNeedsUpdate {
4034-
mb.recalculateFirstForSubj(subj, ss.First, ss)
4033+
if ok && ss != nil && (ss.firstNeedsUpdate || ss.lastNeedsUpdate) {
4034+
mb.recalculateForSubj(subj, ss)
40354035
}
40364036
mb.mu.Unlock()
40374037
if ss == nil {
@@ -7832,60 +7832,115 @@ func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64) {
78327832

78337833
ss.Msgs--
78347834

7835-
// We can lazily calculate the first sequence when needed.
7835+
// We can lazily calculate the first/last sequence when needed.
78367836
ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate
7837+
ss.lastNeedsUpdate = seq == ss.Last || ss.lastNeedsUpdate
78377838
}
78387839

7839-
// Will recalulate the first sequence for this subject in this block.
7840+
// Will recalculate the first and/or last sequence for this subject in this block.
78407841
// Will avoid slower path message lookups and scan the cache directly instead.
7841-
func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *SimpleState) {
7842+
func (mb *msgBlock) recalculateForSubj(subj string, ss *SimpleState) {
78427843
// Need to make sure messages are loaded.
78437844
if mb.cacheNotLoaded() {
78447845
if err := mb.loadMsgsWithLock(); err != nil {
78457846
return
78467847
}
78477848
}
78487849

7849-
// Mark first as updated.
7850-
ss.firstNeedsUpdate = false
7851-
7852-
startSlot := int(startSeq - mb.cache.fseq)
7850+
startSlot := int(ss.First - mb.cache.fseq)
7851+
if startSlot < 0 {
7852+
startSlot = 0
7853+
}
78537854
if startSlot >= len(mb.cache.idx) {
78547855
ss.First = ss.Last
78557856
return
7856-
} else if startSlot < 0 {
7857-
startSlot = 0
78587857
}
7859-
7860-
fseq := startSeq + 1
7861-
if mbFseq := atomic.LoadUint64(&mb.first.seq); fseq < mbFseq {
7862-
fseq = mbFseq
7858+
endSlot := int(ss.Last - mb.cache.fseq)
7859+
if endSlot < 0 {
7860+
endSlot = 0
7861+
}
7862+
if endSlot >= len(mb.cache.idx) || startSlot > endSlot {
7863+
return
78637864
}
7865+
78647866
var le = binary.LittleEndian
7865-
for slot := startSlot; slot < len(mb.cache.idx); slot++ {
7866-
bi := mb.cache.idx[slot] &^ hbit
7867-
if bi == dbit {
7868-
// delete marker so skip.
7869-
continue
7867+
if ss.firstNeedsUpdate {
7868+
// Mark first as updated.
7869+
ss.firstNeedsUpdate = false
7870+
7871+
fseq := ss.First + 1
7872+
if mbFseq := atomic.LoadUint64(&mb.first.seq); fseq < mbFseq {
7873+
fseq = mbFseq
7874+
}
7875+
for slot := startSlot; slot < len(mb.cache.idx); slot++ {
7876+
bi := mb.cache.idx[slot] &^ hbit
7877+
if bi == dbit {
7878+
// delete marker so skip.
7879+
continue
7880+
}
7881+
li := int(bi) - mb.cache.off
7882+
if li >= len(mb.cache.buf) {
7883+
ss.First = ss.Last
7884+
return
7885+
}
7886+
buf := mb.cache.buf[li:]
7887+
hdr := buf[:msgHdrSize]
7888+
slen := int(le.Uint16(hdr[20:]))
7889+
if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) {
7890+
seq := le.Uint64(hdr[4:])
7891+
if seq < fseq || seq&ebit != 0 || mb.dmap.Exists(seq) {
7892+
continue
7893+
}
7894+
ss.First = seq
7895+
if ss.Msgs == 1 {
7896+
ss.Last = seq
7897+
ss.lastNeedsUpdate = false
7898+
return
7899+
}
7900+
// Skip the start slot ahead, if we need to recalculate last we can stop early.
7901+
startSlot = slot
7902+
break
7903+
}
78707904
}
7871-
li := int(bi) - mb.cache.off
7872-
if li >= len(mb.cache.buf) {
7873-
ss.First = ss.Last
7874-
return
7905+
}
7906+
if ss.lastNeedsUpdate {
7907+
// Mark last as updated.
7908+
ss.lastNeedsUpdate = false
7909+
7910+
lseq := ss.Last - 1
7911+
if mbLseq := atomic.LoadUint64(&mb.last.seq); lseq > mbLseq {
7912+
lseq = mbLseq
78757913
}
7876-
buf := mb.cache.buf[li:]
7877-
hdr := buf[:msgHdrSize]
7878-
slen := int(le.Uint16(hdr[20:]))
7879-
if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) {
7880-
seq := le.Uint64(hdr[4:])
7881-
if seq < fseq || seq&ebit != 0 || mb.dmap.Exists(seq) {
7914+
for slot := endSlot; slot >= startSlot; slot-- {
7915+
bi := mb.cache.idx[slot] &^ hbit
7916+
if bi == dbit {
7917+
// delete marker so skip.
78827918
continue
78837919
}
7884-
ss.First = seq
7885-
if ss.Msgs == 1 {
7920+
li := int(bi) - mb.cache.off
7921+
if li >= len(mb.cache.buf) {
7922+
// Can't overwrite ss.Last, just skip.
7923+
return
7924+
}
7925+
buf := mb.cache.buf[li:]
7926+
hdr := buf[:msgHdrSize]
7927+
slen := int(le.Uint16(hdr[20:]))
7928+
if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) {
7929+
seq := le.Uint64(hdr[4:])
7930+
if seq > lseq || seq&ebit != 0 || mb.dmap.Exists(seq) {
7931+
continue
7932+
}
7933+
// Sequence should never be lower, but guard against it nonetheless.
7934+
if seq < ss.First {
7935+
seq = ss.First
7936+
}
78867937
ss.Last = seq
7938+
if ss.Msgs == 1 {
7939+
ss.First = seq
7940+
ss.firstNeedsUpdate = false
7941+
}
7942+
return
78877943
}
7888-
return
78897944
}
78907945
}
78917946
}

server/filestore_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -5030,7 +5030,7 @@ func TestFileStoreRecaluclateFirstForSubjBug(t *testing.T) {
50305030
mb.clearCacheAndOffset()
50315031
// Now call with start sequence of 1, the old one
50325032
// This will panic without the fix.
5033-
mb.recalculateFirstForSubj("foo", 1, ss)
5033+
mb.recalculateForSubj("foo", ss)
50345034
// Make sure it was update properly.
50355035
require_True(t, *ss == SimpleState{Msgs: 1, First: 3, Last: 3, firstNeedsUpdate: false})
50365036
}

server/memstore.go

+47-25
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,8 @@ func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts int
143143
return ErrMaxBytes
144144
}
145145
// If we are here we are at a subject maximum, need to determine if dropping last message gives us enough room.
146-
if ss.firstNeedsUpdate {
147-
ms.recalculateFirstForSubj(subj, ss.First, ss)
146+
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
147+
ms.recalculateForSubj(subj, ss)
148148
}
149149
sm, ok := ms.msgs[ss.First]
150150
if !ok || memStoreMsgSize(sm.subj, sm.hdr, sm.msg) < memStoreMsgSize(subj, hdr, msg) {
@@ -430,8 +430,8 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje
430430
var totalSkipped uint64
431431
// We will track start and end sequences as we go.
432432
ms.fss.Match(stringToBytes(filter), func(subj []byte, fss *SimpleState) {
433-
if fss.firstNeedsUpdate {
434-
ms.recalculateFirstForSubj(bytesToString(subj), fss.First, fss)
433+
if fss.firstNeedsUpdate || fss.lastNeedsUpdate {
434+
ms.recalculateForSubj(bytesToString(subj), fss)
435435
}
436436
if sseq <= fss.First {
437437
update(fss)
@@ -585,8 +585,8 @@ func (ms *memStore) SubjectsState(subject string) map[string]SimpleState {
585585
fss := make(map[string]SimpleState)
586586
ms.fss.Match(stringToBytes(subject), func(subj []byte, ss *SimpleState) {
587587
subjs := string(subj)
588-
if ss.firstNeedsUpdate {
589-
ms.recalculateFirstForSubj(subjs, ss.First, ss)
588+
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
589+
ms.recalculateForSubj(subjs, ss)
590590
}
591591
oss := fss[subjs]
592592
if oss.First == 0 { // New
@@ -675,8 +675,8 @@ func (ms *memStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject boo
675675
var totalSkipped uint64
676676
// We will track start and end sequences as we go.
677677
IntersectStree[SimpleState](ms.fss, sl, func(subj []byte, fss *SimpleState) {
678-
if fss.firstNeedsUpdate {
679-
ms.recalculateFirstForSubj(bytesToString(subj), fss.First, fss)
678+
if fss.firstNeedsUpdate || fss.lastNeedsUpdate {
679+
ms.recalculateForSubj(bytesToString(subj), fss)
680680
}
681681
if sseq <= fss.First {
682682
update(fss)
@@ -793,8 +793,8 @@ func (ms *memStore) enforcePerSubjectLimit(subj string, ss *SimpleState) {
793793
return
794794
}
795795
for nmsgs := ss.Msgs; nmsgs > uint64(ms.maxp); nmsgs = ss.Msgs {
796-
if ss.firstNeedsUpdate {
797-
ms.recalculateFirstForSubj(subj, ss.First, ss)
796+
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
797+
ms.recalculateForSubj(subj, ss)
798798
}
799799
if !ms.removeMsg(ss.First, false) {
800800
break
@@ -1267,8 +1267,8 @@ func (ms *memStore) LoadNextMsg(filter string, wc bool, start uint64, smp *Store
12671267
if !ok {
12681268
continue
12691269
}
1270-
if ss.firstNeedsUpdate {
1271-
ms.recalculateFirstForSubj(subj, ss.First, ss)
1270+
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
1271+
ms.recalculateForSubj(subj, ss)
12721272
}
12731273
if ss.First < fseq {
12741274
fseq = ss.First
@@ -1362,25 +1362,47 @@ func (ms *memStore) removeSeqPerSubject(subj string, seq uint64) {
13621362
}
13631363
ss.Msgs--
13641364

1365-
// We can lazily calculate the first sequence when needed.
1365+
// We can lazily calculate the first/last sequence when needed.
13661366
ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate
1367+
ss.lastNeedsUpdate = seq == ss.Last || ss.lastNeedsUpdate
13671368
}
13681369

1369-
// Will recalculate the first sequence for this subject in this block.
1370+
// Will recalculate the first and/or last sequence for this subject.
13701371
// Lock should be held.
1371-
func (ms *memStore) recalculateFirstForSubj(subj string, startSeq uint64, ss *SimpleState) {
1372-
tseq := startSeq + 1
1373-
if tseq < ms.state.FirstSeq {
1374-
tseq = ms.state.FirstSeq
1375-
}
1376-
for ; tseq <= ss.Last; tseq++ {
1377-
if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj {
1378-
ss.First = tseq
1379-
if ss.Msgs == 1 {
1372+
func (ms *memStore) recalculateForSubj(subj string, ss *SimpleState) {
1373+
if ss.firstNeedsUpdate {
1374+
tseq := ss.First + 1
1375+
if tseq < ms.state.FirstSeq {
1376+
tseq = ms.state.FirstSeq
1377+
}
1378+
for ; tseq <= ss.Last; tseq++ {
1379+
if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj {
1380+
ss.First = tseq
1381+
ss.firstNeedsUpdate = false
1382+
if ss.Msgs == 1 {
1383+
ss.Last = tseq
1384+
ss.lastNeedsUpdate = false
1385+
return
1386+
}
1387+
break
1388+
}
1389+
}
1390+
}
1391+
if ss.lastNeedsUpdate {
1392+
tseq := ss.Last - 1
1393+
if tseq > ms.state.LastSeq {
1394+
tseq = ms.state.LastSeq
1395+
}
1396+
for ; tseq >= ss.First; tseq-- {
1397+
if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj {
13801398
ss.Last = tseq
1399+
ss.lastNeedsUpdate = false
1400+
if ss.Msgs == 1 {
1401+
ss.First = tseq
1402+
ss.firstNeedsUpdate = false
1403+
}
1404+
return
13811405
}
1382-
ss.firstNeedsUpdate = false
1383-
return
13841406
}
13851407
}
13861408
}

server/store.go

+2
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,8 @@ type SimpleState struct {
166166

167167
// Internal usage for when the first needs to be updated before use.
168168
firstNeedsUpdate bool
169+
// Internal usage for when the last needs to be updated before use.
170+
lastNeedsUpdate bool
169171
}
170172

171173
// LostStreamData indicates msgs that have been lost.

0 commit comments

Comments
 (0)