Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-picks for 2.10.24-RC.2 #6257

Merged
merged 6 commits into from
Dec 13, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Revert "[FIXED] ss.Last was not kept up-to-date"
This reverts commit 0630752.
MauriceVanVeen authored and wallyqs committed Dec 13, 2024

Verified

This commit was signed with the committer’s verified signature.
wallyqs Waldemar Quevedo
commit 04339d580327c15ae02a8588b0fe9159c0f83b88
145 changes: 45 additions & 100 deletions server/filestore.go
Original file line number Diff line number Diff line change
@@ -2315,8 +2315,8 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor
fseq = lseq + 1
for _, subj := range subs {
ss, _ := mb.fss.Find(stringToBytes(subj))
if ss != nil && (ss.firstNeedsUpdate || ss.lastNeedsUpdate) {
mb.recalculateForSubj(subj, ss)
if ss != nil && ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
if ss == nil || start > ss.Last || ss.First >= fseq {
continue
@@ -2445,8 +2445,8 @@ func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, sseq uint64) (
// If we already found a partial then don't do anything else.
return
}
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
mb.recalculateForSubj(bytesToString(bsubj), ss)
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(bytesToString(bsubj), ss.First, ss)
}
if sseq <= ss.First {
update(ss)
@@ -2745,8 +2745,8 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState {
mb.lsts = time.Now().UnixNano()
mb.fss.Match(stringToBytes(subject), func(bsubj []byte, ss *SimpleState) {
subj := string(bsubj)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
mb.recalculateForSubj(subj, ss)
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
oss := fss[subj]
if oss.First == 0 { // New
@@ -2936,8 +2936,8 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool)
return
}
subj := bytesToString(bsubj)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
mb.recalculateForSubj(subj, ss)
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
if sseq <= ss.First {
t += ss.Msgs
@@ -3224,8 +3224,8 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo
// If we already found a partial then don't do anything else.
return
}
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
mb.recalculateForSubj(subj, ss)
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
if sseq <= ss.First {
t += ss.Msgs
@@ -3898,8 +3898,8 @@ func (fs *fileStore) firstSeqForSubj(subj string) (uint64, error) {
info.fblk = i
}
}
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
mb.recalculateForSubj(subj, ss)
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
mb.mu.Unlock()
// Re-acquire fs lock
@@ -4030,8 +4030,8 @@ func (fs *fileStore) enforceMsgPerSubjectLimit(fireCallback bool) {
mb.mu.Lock()
mb.ensurePerSubjectInfoLoaded()
ss, ok := mb.fss.Find(stringToBytes(subj))
if ok && ss != nil && (ss.firstNeedsUpdate || ss.lastNeedsUpdate) {
mb.recalculateForSubj(subj, ss)
if ok && ss != nil && ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
mb.mu.Unlock()
if ss == nil {
@@ -7832,115 +7832,60 @@ func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64) {

ss.Msgs--

// We can lazily calculate the first/last sequence when needed.
// We can lazily calculate the first sequence when needed.
ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate
ss.lastNeedsUpdate = seq == ss.Last || ss.lastNeedsUpdate
}

// Will recalculate the first and/or last sequence for this subject in this block.
// Will recalulate the first sequence for this subject in this block.
// Will avoid slower path message lookups and scan the cache directly instead.
func (mb *msgBlock) recalculateForSubj(subj string, ss *SimpleState) {
func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *SimpleState) {
// Need to make sure messages are loaded.
if mb.cacheNotLoaded() {
if err := mb.loadMsgsWithLock(); err != nil {
return
}
}

startSlot := int(ss.First - mb.cache.fseq)
if startSlot < 0 {
startSlot = 0
}
// Mark first as updated.
ss.firstNeedsUpdate = false

startSlot := int(startSeq - mb.cache.fseq)
if startSlot >= len(mb.cache.idx) {
ss.First = ss.Last
return
}
endSlot := int(ss.Last - mb.cache.fseq)
if endSlot < 0 {
endSlot = 0
}
if endSlot >= len(mb.cache.idx) || startSlot > endSlot {
return
} else if startSlot < 0 {
startSlot = 0
}

fseq := startSeq + 1
if mbFseq := atomic.LoadUint64(&mb.first.seq); fseq < mbFseq {
fseq = mbFseq
}
var le = binary.LittleEndian
if ss.firstNeedsUpdate {
// Mark first as updated.
ss.firstNeedsUpdate = false

fseq := ss.First + 1
if mbFseq := atomic.LoadUint64(&mb.first.seq); fseq < mbFseq {
fseq = mbFseq
}
for slot := startSlot; slot < len(mb.cache.idx); slot++ {
bi := mb.cache.idx[slot] &^ hbit
if bi == dbit {
// delete marker so skip.
continue
}
li := int(bi) - mb.cache.off
if li >= len(mb.cache.buf) {
ss.First = ss.Last
return
}
buf := mb.cache.buf[li:]
hdr := buf[:msgHdrSize]
slen := int(le.Uint16(hdr[20:]))
if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) {
seq := le.Uint64(hdr[4:])
if seq < fseq || seq&ebit != 0 || mb.dmap.Exists(seq) {
continue
}
ss.First = seq
if ss.Msgs == 1 {
ss.Last = seq
ss.lastNeedsUpdate = false
return
}
// Skip the start slot ahead, if we need to recalculate last we can stop early.
startSlot = slot
break
}
for slot := startSlot; slot < len(mb.cache.idx); slot++ {
bi := mb.cache.idx[slot] &^ hbit
if bi == dbit {
// delete marker so skip.
continue
}
}
if ss.lastNeedsUpdate {
// Mark last as updated.
ss.lastNeedsUpdate = false

lseq := ss.Last - 1
if mbLseq := atomic.LoadUint64(&mb.last.seq); lseq > mbLseq {
lseq = mbLseq
li := int(bi) - mb.cache.off
if li >= len(mb.cache.buf) {
ss.First = ss.Last
return
}
for slot := endSlot; slot >= startSlot; slot-- {
bi := mb.cache.idx[slot] &^ hbit
if bi == dbit {
// delete marker so skip.
buf := mb.cache.buf[li:]
hdr := buf[:msgHdrSize]
slen := int(le.Uint16(hdr[20:]))
if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) {
seq := le.Uint64(hdr[4:])
if seq < fseq || seq&ebit != 0 || mb.dmap.Exists(seq) {
continue
}
li := int(bi) - mb.cache.off
if li >= len(mb.cache.buf) {
// Can't overwrite ss.Last, just skip.
return
}
buf := mb.cache.buf[li:]
hdr := buf[:msgHdrSize]
slen := int(le.Uint16(hdr[20:]))
if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) {
seq := le.Uint64(hdr[4:])
if seq > lseq || seq&ebit != 0 || mb.dmap.Exists(seq) {
continue
}
// Sequence should never be lower, but guard against it nonetheless.
if seq < ss.First {
seq = ss.First
}
ss.First = seq
if ss.Msgs == 1 {
ss.Last = seq
if ss.Msgs == 1 {
ss.First = seq
ss.firstNeedsUpdate = false
}
return
}
return
}
}
}
2 changes: 1 addition & 1 deletion server/filestore_test.go
Original file line number Diff line number Diff line change
@@ -5030,7 +5030,7 @@ func TestFileStoreRecaluclateFirstForSubjBug(t *testing.T) {
mb.clearCacheAndOffset()
// Now call with start sequence of 1, the old one
// This will panic without the fix.
mb.recalculateForSubj("foo", ss)
mb.recalculateFirstForSubj("foo", 1, ss)
// Make sure it was update properly.
require_True(t, *ss == SimpleState{Msgs: 1, First: 3, Last: 3, firstNeedsUpdate: false})
}
72 changes: 25 additions & 47 deletions server/memstore.go
Original file line number Diff line number Diff line change
@@ -143,8 +143,8 @@ func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts int
return ErrMaxBytes
}
// If we are here we are at a subject maximum, need to determine if dropping last message gives us enough room.
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
ms.recalculateForSubj(subj, ss)
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, ss.First, ss)
}
sm, ok := ms.msgs[ss.First]
if !ok || memStoreMsgSize(sm.subj, sm.hdr, sm.msg) < memStoreMsgSize(subj, hdr, msg) {
@@ -430,8 +430,8 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje
var totalSkipped uint64
// We will track start and end sequences as we go.
ms.fss.Match(stringToBytes(filter), func(subj []byte, fss *SimpleState) {
if fss.firstNeedsUpdate || fss.lastNeedsUpdate {
ms.recalculateForSubj(bytesToString(subj), fss)
if fss.firstNeedsUpdate {
ms.recalculateFirstForSubj(bytesToString(subj), fss.First, fss)
}
if sseq <= fss.First {
update(fss)
@@ -585,8 +585,8 @@ func (ms *memStore) SubjectsState(subject string) map[string]SimpleState {
fss := make(map[string]SimpleState)
ms.fss.Match(stringToBytes(subject), func(subj []byte, ss *SimpleState) {
subjs := string(subj)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
ms.recalculateForSubj(subjs, ss)
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subjs, ss.First, ss)
}
oss := fss[subjs]
if oss.First == 0 { // New
@@ -675,8 +675,8 @@ func (ms *memStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject boo
var totalSkipped uint64
// We will track start and end sequences as we go.
IntersectStree[SimpleState](ms.fss, sl, func(subj []byte, fss *SimpleState) {
if fss.firstNeedsUpdate || fss.lastNeedsUpdate {
ms.recalculateForSubj(bytesToString(subj), fss)
if fss.firstNeedsUpdate {
ms.recalculateFirstForSubj(bytesToString(subj), fss.First, fss)
}
if sseq <= fss.First {
update(fss)
@@ -793,8 +793,8 @@ func (ms *memStore) enforcePerSubjectLimit(subj string, ss *SimpleState) {
return
}
for nmsgs := ss.Msgs; nmsgs > uint64(ms.maxp); nmsgs = ss.Msgs {
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
ms.recalculateForSubj(subj, ss)
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, ss.First, ss)
}
if !ms.removeMsg(ss.First, false) {
break
@@ -1267,8 +1267,8 @@ func (ms *memStore) LoadNextMsg(filter string, wc bool, start uint64, smp *Store
if !ok {
continue
}
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
ms.recalculateForSubj(subj, ss)
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, ss.First, ss)
}
if ss.First < fseq {
fseq = ss.First
@@ -1362,47 +1362,25 @@ func (ms *memStore) removeSeqPerSubject(subj string, seq uint64) {
}
ss.Msgs--

// We can lazily calculate the first/last sequence when needed.
// We can lazily calculate the first sequence when needed.
ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate
ss.lastNeedsUpdate = seq == ss.Last || ss.lastNeedsUpdate
}

// Will recalculate the first and/or last sequence for this subject.
// Will recalculate the first sequence for this subject in this block.
// Lock should be held.
func (ms *memStore) recalculateForSubj(subj string, ss *SimpleState) {
if ss.firstNeedsUpdate {
tseq := ss.First + 1
if tseq < ms.state.FirstSeq {
tseq = ms.state.FirstSeq
}
for ; tseq <= ss.Last; tseq++ {
if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj {
ss.First = tseq
ss.firstNeedsUpdate = false
if ss.Msgs == 1 {
ss.Last = tseq
ss.lastNeedsUpdate = false
return
}
break
}
}
}
if ss.lastNeedsUpdate {
tseq := ss.Last - 1
if tseq > ms.state.LastSeq {
tseq = ms.state.LastSeq
}
for ; tseq >= ss.First; tseq-- {
if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj {
func (ms *memStore) recalculateFirstForSubj(subj string, startSeq uint64, ss *SimpleState) {
tseq := startSeq + 1
if tseq < ms.state.FirstSeq {
tseq = ms.state.FirstSeq
}
for ; tseq <= ss.Last; tseq++ {
if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj {
ss.First = tseq
if ss.Msgs == 1 {
ss.Last = tseq
ss.lastNeedsUpdate = false
if ss.Msgs == 1 {
ss.First = tseq
ss.firstNeedsUpdate = false
}
return
}
ss.firstNeedsUpdate = false
return
}
}
}
2 changes: 0 additions & 2 deletions server/store.go
Original file line number Diff line number Diff line change
@@ -166,8 +166,6 @@ type SimpleState struct {

// Internal usage for when the first needs to be updated before use.
firstNeedsUpdate bool
// Internal usage for when the last needs to be updated before use.
lastNeedsUpdate bool
}

// LostStreamData indicates msgs that have been lost.
39 changes: 8 additions & 31 deletions server/store_test.go
Original file line number Diff line number Diff line change
@@ -152,19 +152,6 @@ func TestStoreSubjectStateConsistency(t *testing.T) {
ss := fs.SubjectsState("foo")
return ss["foo"]
}
var smp StoreMsg
expectFirstSeq := func(eseq uint64) {
t.Helper()
sm, _, err := fs.LoadNextMsg("foo", false, 0, &smp)
require_NoError(t, err)
require_Equal(t, sm.seq, eseq)
}
expectLastSeq := func(eseq uint64) {
t.Helper()
sm, err := fs.LoadLastMsg("foo", &smp)
require_NoError(t, err)
require_Equal(t, sm.seq, eseq)
}

// Publish an initial batch of messages.
for i := 0; i < 4; i++ {
@@ -176,9 +163,7 @@ func TestStoreSubjectStateConsistency(t *testing.T) {
ss := getSubjectState()
require_Equal(t, ss.Msgs, 4)
require_Equal(t, ss.First, 1)
expectFirstSeq(1)
require_Equal(t, ss.Last, 4)
expectLastSeq(4)

// Remove first message, ss.First is lazy so will only mark ss.firstNeedsUpdate.
removed, err := fs.RemoveMsg(1)
@@ -189,35 +174,29 @@ func TestStoreSubjectStateConsistency(t *testing.T) {
ss = getSubjectState()
require_Equal(t, ss.Msgs, 3)
require_Equal(t, ss.First, 2)
expectFirstSeq(2)
require_Equal(t, ss.Last, 4)
expectLastSeq(4)

// Remove last message, ss.Last is lazy so will only mark ss.lastNeedsUpdate.
// Remove last message.
removed, err = fs.RemoveMsg(4)
require_NoError(t, err)
require_True(t, removed)

// Will update last, so corrects to 3.
// ss.Last is lazy, just like ss.First, but it's not recalculated. Only total msg count decreases.
ss = getSubjectState()
require_Equal(t, ss.Msgs, 2)
require_Equal(t, ss.First, 2)
expectFirstSeq(2)
require_Equal(t, ss.Last, 3)
expectLastSeq(3)
require_Equal(t, ss.Last, 4)

// Remove first message again.
removed, err = fs.RemoveMsg(2)
require_NoError(t, err)
require_True(t, removed)

// Since we only have one message left, must update ss.First and ensure ss.Last equals.
// Since we only have one message left, must update ss.First and set ss.Last to equal.
ss = getSubjectState()
require_Equal(t, ss.Msgs, 1)
require_Equal(t, ss.First, 3)
expectFirstSeq(3)
require_Equal(t, ss.Last, 3)
expectLastSeq(3)

// Publish some more messages so we can test another scenario.
for i := 0; i < 3; i++ {
@@ -229,9 +208,7 @@ func TestStoreSubjectStateConsistency(t *testing.T) {
ss = getSubjectState()
require_Equal(t, ss.Msgs, 4)
require_Equal(t, ss.First, 3)
expectFirstSeq(3)
require_Equal(t, ss.Last, 7)
expectLastSeq(7)

// Remove last sequence, ss.Last is lazy so doesn't get updated.
removed, err = fs.RemoveMsg(7)
@@ -243,18 +220,18 @@ func TestStoreSubjectStateConsistency(t *testing.T) {
require_NoError(t, err)
require_True(t, removed)

// Remove (now) first sequence. Both ss.First and ss.Last are lazy and both need to be recalculated later.
// Remove (now) first sequence, but because ss.First is lazy we first need to recalculate
// to know seq 5 became ss.First. And since we're removing seq 5 we need to recalculate ss.First
// yet again, since ss.Last is lazy and is not correct.
removed, err = fs.RemoveMsg(5)
require_NoError(t, err)
require_True(t, removed)

// ss.First and ss.Last should both be recalculated and equal each other.
// ss.First should equal ss.Last, last should have been updated now.
ss = getSubjectState()
require_Equal(t, ss.Msgs, 1)
require_Equal(t, ss.First, 6)
expectFirstSeq(6)
require_Equal(t, ss.Last, 6)
expectLastSeq(6)
},
)
}