@@ -2315,8 +2315,8 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor
2315
2315
fseq = lseq + 1
2316
2316
for _ , subj := range subs {
2317
2317
ss , _ := mb .fss .Find (stringToBytes (subj ))
2318
- if ss != nil && ( ss .firstNeedsUpdate || ss . lastNeedsUpdate ) {
2319
- mb .recalculateForSubj (subj , ss )
2318
+ if ss != nil && ss .firstNeedsUpdate {
2319
+ mb .recalculateFirstForSubj (subj , ss . First , ss )
2320
2320
}
2321
2321
if ss == nil || start > ss .Last || ss .First >= fseq {
2322
2322
continue
@@ -2445,8 +2445,8 @@ func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, sseq uint64) (
2445
2445
// If we already found a partial then don't do anything else.
2446
2446
return
2447
2447
}
2448
- if ss .firstNeedsUpdate || ss . lastNeedsUpdate {
2449
- mb .recalculateForSubj (bytesToString (bsubj ), ss )
2448
+ if ss .firstNeedsUpdate {
2449
+ mb .recalculateFirstForSubj (bytesToString (bsubj ), ss . First , ss )
2450
2450
}
2451
2451
if sseq <= ss .First {
2452
2452
update (ss )
@@ -2616,6 +2616,10 @@ func (fs *fileStore) numFilteredPendingWithLast(filter string, last bool, ss *Si
2616
2616
// Always reset.
2617
2617
ss .First , ss .Last , ss .Msgs = 0 , 0 , 0
2618
2618
2619
+ if filter == _EMPTY_ {
2620
+ filter = fwcs
2621
+ }
2622
+
2619
2623
// We do need to figure out the first and last sequences.
2620
2624
wc := subjectHasWildcard (filter )
2621
2625
start , stop := uint32 (math .MaxUint32 ), uint32 (0 )
@@ -2745,8 +2749,8 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState {
2745
2749
mb .lsts = time .Now ().UnixNano ()
2746
2750
mb .fss .Match (stringToBytes (subject ), func (bsubj []byte , ss * SimpleState ) {
2747
2751
subj := string (bsubj )
2748
- if ss .firstNeedsUpdate || ss . lastNeedsUpdate {
2749
- mb .recalculateForSubj (subj , ss )
2752
+ if ss .firstNeedsUpdate {
2753
+ mb .recalculateFirstForSubj (subj , ss . First , ss )
2750
2754
}
2751
2755
oss := fss [subj ]
2752
2756
if oss .First == 0 { // New
@@ -2936,8 +2940,8 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool)
2936
2940
return
2937
2941
}
2938
2942
subj := bytesToString (bsubj )
2939
- if ss .firstNeedsUpdate || ss . lastNeedsUpdate {
2940
- mb .recalculateForSubj (subj , ss )
2943
+ if ss .firstNeedsUpdate {
2944
+ mb .recalculateFirstForSubj (subj , ss . First , ss )
2941
2945
}
2942
2946
if sseq <= ss .First {
2943
2947
t += ss .Msgs
@@ -3224,8 +3228,8 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo
3224
3228
// If we already found a partial then don't do anything else.
3225
3229
return
3226
3230
}
3227
- if ss .firstNeedsUpdate || ss . lastNeedsUpdate {
3228
- mb .recalculateForSubj (subj , ss )
3231
+ if ss .firstNeedsUpdate {
3232
+ mb .recalculateFirstForSubj (subj , ss . First , ss )
3229
3233
}
3230
3234
if sseq <= ss .First {
3231
3235
t += ss .Msgs
@@ -3898,8 +3902,8 @@ func (fs *fileStore) firstSeqForSubj(subj string) (uint64, error) {
3898
3902
info .fblk = i
3899
3903
}
3900
3904
}
3901
- if ss .firstNeedsUpdate || ss . lastNeedsUpdate {
3902
- mb .recalculateForSubj (subj , ss )
3905
+ if ss .firstNeedsUpdate {
3906
+ mb .recalculateFirstForSubj (subj , ss . First , ss )
3903
3907
}
3904
3908
mb .mu .Unlock ()
3905
3909
// Re-acquire fs lock
@@ -4030,8 +4034,8 @@ func (fs *fileStore) enforceMsgPerSubjectLimit(fireCallback bool) {
4030
4034
mb .mu .Lock ()
4031
4035
mb .ensurePerSubjectInfoLoaded ()
4032
4036
ss , ok := mb .fss .Find (stringToBytes (subj ))
4033
- if ok && ss != nil && ( ss .firstNeedsUpdate || ss . lastNeedsUpdate ) {
4034
- mb .recalculateForSubj (subj , ss )
4037
+ if ok && ss != nil && ss .firstNeedsUpdate {
4038
+ mb .recalculateFirstForSubj (subj , ss . First , ss )
4035
4039
}
4036
4040
mb .mu .Unlock ()
4037
4041
if ss == nil {
@@ -7832,115 +7836,67 @@ func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64) {
7832
7836
7833
7837
ss .Msgs --
7834
7838
7835
- // We can lazily calculate the first/last sequence when needed.
7839
+ // Only one left.
7840
+ if ss .Msgs == 1 {
7841
+ if seq == ss .Last {
7842
+ ss .Last = ss .First
7843
+ } else {
7844
+ ss .First = ss .Last
7845
+ }
7846
+ ss .firstNeedsUpdate = false
7847
+ return
7848
+ }
7849
+
7850
+ // We can lazily calculate the first sequence when needed.
7836
7851
ss .firstNeedsUpdate = seq == ss .First || ss .firstNeedsUpdate
7837
- ss .lastNeedsUpdate = seq == ss .Last || ss .lastNeedsUpdate
7838
7852
}
7839
7853
7840
- // Will recalculate the first and/or last sequence for this subject in this block.
7854
+ // Will recalulate the first sequence for this subject in this block.
7841
7855
// Will avoid slower path message lookups and scan the cache directly instead.
7842
- func (mb * msgBlock ) recalculateForSubj (subj string , ss * SimpleState ) {
7856
+ func (mb * msgBlock ) recalculateFirstForSubj (subj string , startSeq uint64 , ss * SimpleState ) {
7843
7857
// Need to make sure messages are loaded.
7844
7858
if mb .cacheNotLoaded () {
7845
7859
if err := mb .loadMsgsWithLock (); err != nil {
7846
7860
return
7847
7861
}
7848
7862
}
7849
7863
7850
- startSlot := int ( ss . First - mb . cache . fseq )
7851
- if startSlot < 0 {
7852
- startSlot = 0
7853
- }
7864
+ // Mark first as updated.
7865
+ ss . firstNeedsUpdate = false
7866
+
7867
+ startSlot := int ( startSeq - mb . cache . fseq )
7854
7868
if startSlot >= len (mb .cache .idx ) {
7855
7869
ss .First = ss .Last
7856
7870
return
7857
- }
7858
- endSlot := int (ss .Last - mb .cache .fseq )
7859
- if endSlot < 0 {
7860
- endSlot = 0
7861
- }
7862
- if endSlot >= len (mb .cache .idx ) || startSlot > endSlot {
7863
- return
7871
+ } else if startSlot < 0 {
7872
+ startSlot = 0
7864
7873
}
7865
7874
7866
7875
var le = binary .LittleEndian
7867
- if ss .firstNeedsUpdate {
7868
- // Mark first as updated.
7869
- ss .firstNeedsUpdate = false
7870
-
7871
- fseq := ss .First + 1
7872
- if mbFseq := atomic .LoadUint64 (& mb .first .seq ); fseq < mbFseq {
7873
- fseq = mbFseq
7874
- }
7875
- for slot := startSlot ; slot < len (mb .cache .idx ); slot ++ {
7876
- bi := mb .cache .idx [slot ] &^ hbit
7877
- if bi == dbit {
7878
- // delete marker so skip.
7879
- continue
7880
- }
7881
- li := int (bi ) - mb .cache .off
7882
- if li >= len (mb .cache .buf ) {
7883
- ss .First = ss .Last
7884
- return
7885
- }
7886
- buf := mb .cache .buf [li :]
7887
- hdr := buf [:msgHdrSize ]
7888
- slen := int (le .Uint16 (hdr [20 :]))
7889
- if subj == bytesToString (buf [msgHdrSize :msgHdrSize + slen ]) {
7890
- seq := le .Uint64 (hdr [4 :])
7891
- if seq < fseq || seq & ebit != 0 || mb .dmap .Exists (seq ) {
7892
- continue
7893
- }
7894
- ss .First = seq
7895
- if ss .Msgs == 1 {
7896
- ss .Last = seq
7897
- ss .lastNeedsUpdate = false
7898
- return
7899
- }
7900
- // Skip the start slot ahead, if we need to recalculate last we can stop early.
7901
- startSlot = slot
7902
- break
7903
- }
7876
+ for slot , fseq := startSlot , atomic .LoadUint64 (& mb .first .seq ); slot < len (mb .cache .idx ); slot ++ {
7877
+ bi := mb .cache .idx [slot ] &^ hbit
7878
+ if bi == dbit {
7879
+ // delete marker so skip.
7880
+ continue
7904
7881
}
7905
- }
7906
- if ss .lastNeedsUpdate {
7907
- // Mark last as updated.
7908
- ss .lastNeedsUpdate = false
7909
-
7910
- lseq := ss .Last - 1
7911
- if mbLseq := atomic .LoadUint64 (& mb .last .seq ); lseq > mbLseq {
7912
- lseq = mbLseq
7882
+ li := int (bi ) - mb .cache .off
7883
+ if li >= len (mb .cache .buf ) {
7884
+ ss .First = ss .Last
7885
+ return
7913
7886
}
7914
- for slot := endSlot ; slot >= startSlot ; slot -- {
7915
- bi := mb .cache .idx [slot ] &^ hbit
7916
- if bi == dbit {
7917
- // delete marker so skip.
7887
+ buf := mb .cache .buf [li :]
7888
+ hdr := buf [:msgHdrSize ]
7889
+ slen := int (le .Uint16 (hdr [20 :]))
7890
+ if subj == bytesToString (buf [msgHdrSize :msgHdrSize + slen ]) {
7891
+ seq := le .Uint64 (hdr [4 :])
7892
+ if seq < fseq || seq & ebit != 0 || mb .dmap .Exists (seq ) {
7918
7893
continue
7919
7894
}
7920
- li := int (bi ) - mb .cache .off
7921
- if li >= len (mb .cache .buf ) {
7922
- // Can't overwrite ss.Last, just skip.
7923
- return
7924
- }
7925
- buf := mb .cache .buf [li :]
7926
- hdr := buf [:msgHdrSize ]
7927
- slen := int (le .Uint16 (hdr [20 :]))
7928
- if subj == bytesToString (buf [msgHdrSize :msgHdrSize + slen ]) {
7929
- seq := le .Uint64 (hdr [4 :])
7930
- if seq > lseq || seq & ebit != 0 || mb .dmap .Exists (seq ) {
7931
- continue
7932
- }
7933
- // Sequence should never be lower, but guard against it nonetheless.
7934
- if seq < ss .First {
7935
- seq = ss .First
7936
- }
7895
+ ss .First = seq
7896
+ if ss .Msgs == 1 {
7937
7897
ss .Last = seq
7938
- if ss .Msgs == 1 {
7939
- ss .First = seq
7940
- ss .firstNeedsUpdate = false
7941
- }
7942
- return
7943
7898
}
7899
+ return
7944
7900
}
7945
7901
}
7946
7902
}
0 commit comments