Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-picks for 2.10.24-RC.2 #6257

Merged
merged 6 commits into from
Dec 13, 2024
Merged
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
module github.com/nats-io/nats-server/v2

go 1.21.0
go 1.22

toolchain go1.22.8

require (
github.com/klauspost/compress v1.17.11
github.com/minio/highwayhash v1.0.3
github.com/nats-io/jwt/v2 v2.5.8
github.com/nats-io/jwt/v2 v2.7.3
github.com/nats-io/nats.go v1.36.0
github.com/nats-io/nkeys v0.4.8
github.com/nats-io/nkeys v0.4.9
github.com/nats-io/nuid v1.0.1
go.uber.org/automaxprocs v1.6.0
golang.org/x/crypto v0.31.0
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
@@ -4,12 +4,12 @@ github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IX
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q=
github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ=
github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE=
github.com/nats-io/jwt/v2 v2.5.8/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
github.com/nats-io/jwt/v2 v2.7.3 h1:6bNPK+FXgBeAqdj4cYQ0F8ViHRbi7woQLq4W29nUAzE=
github.com/nats-io/jwt/v2 v2.7.3/go.mod h1:GvkcbHhKquj3pkioy5put1wvPxs78UlZ7D/pY+BgZk4=
github.com/nats-io/nats.go v1.36.0 h1:suEUPuWzTSse/XhESwqLxXGuj8vGRuPRoG7MoRN/qyU=
github.com/nats-io/nats.go v1.36.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.8 h1:+wee30071y3vCZAYRsnrmIPaOe47A/SkK/UBDPdIV70=
github.com/nats-io/nkeys v0.4.8/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nkeys v0.4.9 h1:qe9Faq2Gxwi6RZnZMXfmGMZkg3afLLOtrU+gDZJ35b0=
github.com/nats-io/nkeys v0.4.9/go.mod h1:jcMqs+FLG+W5YO36OX6wFIFcmpdAns+w1Wm6D3I/evE=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
145 changes: 45 additions & 100 deletions server/filestore.go
Original file line number Diff line number Diff line change
@@ -2315,8 +2315,8 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor
fseq = lseq + 1
for _, subj := range subs {
ss, _ := mb.fss.Find(stringToBytes(subj))
if ss != nil && (ss.firstNeedsUpdate || ss.lastNeedsUpdate) {
mb.recalculateForSubj(subj, ss)
if ss != nil && ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
if ss == nil || start > ss.Last || ss.First >= fseq {
continue
@@ -2445,8 +2445,8 @@ func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, sseq uint64) (
// If we already found a partial then don't do anything else.
return
}
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
mb.recalculateForSubj(bytesToString(bsubj), ss)
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(bytesToString(bsubj), ss.First, ss)
}
if sseq <= ss.First {
update(ss)
@@ -2745,8 +2745,8 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState {
mb.lsts = time.Now().UnixNano()
mb.fss.Match(stringToBytes(subject), func(bsubj []byte, ss *SimpleState) {
subj := string(bsubj)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
mb.recalculateForSubj(subj, ss)
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
oss := fss[subj]
if oss.First == 0 { // New
@@ -2936,8 +2936,8 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool)
return
}
subj := bytesToString(bsubj)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
mb.recalculateForSubj(subj, ss)
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
if sseq <= ss.First {
t += ss.Msgs
@@ -3224,8 +3224,8 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo
// If we already found a partial then don't do anything else.
return
}
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
mb.recalculateForSubj(subj, ss)
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
if sseq <= ss.First {
t += ss.Msgs
@@ -3898,8 +3898,8 @@ func (fs *fileStore) firstSeqForSubj(subj string) (uint64, error) {
info.fblk = i
}
}
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
mb.recalculateForSubj(subj, ss)
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
mb.mu.Unlock()
// Re-acquire fs lock
@@ -4030,8 +4030,8 @@ func (fs *fileStore) enforceMsgPerSubjectLimit(fireCallback bool) {
mb.mu.Lock()
mb.ensurePerSubjectInfoLoaded()
ss, ok := mb.fss.Find(stringToBytes(subj))
if ok && ss != nil && (ss.firstNeedsUpdate || ss.lastNeedsUpdate) {
mb.recalculateForSubj(subj, ss)
if ok && ss != nil && ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
mb.mu.Unlock()
if ss == nil {
@@ -7832,115 +7832,60 @@ func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64) {

ss.Msgs--

// We can lazily calculate the first/last sequence when needed.
// We can lazily calculate the first sequence when needed.
ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate
ss.lastNeedsUpdate = seq == ss.Last || ss.lastNeedsUpdate
}

// Will recalculate the first and/or last sequence for this subject in this block.
// Will recalulate the first sequence for this subject in this block.
// Will avoid slower path message lookups and scan the cache directly instead.
func (mb *msgBlock) recalculateForSubj(subj string, ss *SimpleState) {
func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *SimpleState) {
// Need to make sure messages are loaded.
if mb.cacheNotLoaded() {
if err := mb.loadMsgsWithLock(); err != nil {
return
}
}

startSlot := int(ss.First - mb.cache.fseq)
if startSlot < 0 {
startSlot = 0
}
// Mark first as updated.
ss.firstNeedsUpdate = false

startSlot := int(startSeq - mb.cache.fseq)
if startSlot >= len(mb.cache.idx) {
ss.First = ss.Last
return
}
endSlot := int(ss.Last - mb.cache.fseq)
if endSlot < 0 {
endSlot = 0
}
if endSlot >= len(mb.cache.idx) || startSlot > endSlot {
return
} else if startSlot < 0 {
startSlot = 0
}

fseq := startSeq + 1
if mbFseq := atomic.LoadUint64(&mb.first.seq); fseq < mbFseq {
fseq = mbFseq
}
var le = binary.LittleEndian
if ss.firstNeedsUpdate {
// Mark first as updated.
ss.firstNeedsUpdate = false

fseq := ss.First + 1
if mbFseq := atomic.LoadUint64(&mb.first.seq); fseq < mbFseq {
fseq = mbFseq
}
for slot := startSlot; slot < len(mb.cache.idx); slot++ {
bi := mb.cache.idx[slot] &^ hbit
if bi == dbit {
// delete marker so skip.
continue
}
li := int(bi) - mb.cache.off
if li >= len(mb.cache.buf) {
ss.First = ss.Last
return
}
buf := mb.cache.buf[li:]
hdr := buf[:msgHdrSize]
slen := int(le.Uint16(hdr[20:]))
if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) {
seq := le.Uint64(hdr[4:])
if seq < fseq || seq&ebit != 0 || mb.dmap.Exists(seq) {
continue
}
ss.First = seq
if ss.Msgs == 1 {
ss.Last = seq
ss.lastNeedsUpdate = false
return
}
// Skip the start slot ahead, if we need to recalculate last we can stop early.
startSlot = slot
break
}
for slot := startSlot; slot < len(mb.cache.idx); slot++ {
bi := mb.cache.idx[slot] &^ hbit
if bi == dbit {
// delete marker so skip.
continue
}
}
if ss.lastNeedsUpdate {
// Mark last as updated.
ss.lastNeedsUpdate = false

lseq := ss.Last - 1
if mbLseq := atomic.LoadUint64(&mb.last.seq); lseq > mbLseq {
lseq = mbLseq
li := int(bi) - mb.cache.off
if li >= len(mb.cache.buf) {
ss.First = ss.Last
return
}
for slot := endSlot; slot >= startSlot; slot-- {
bi := mb.cache.idx[slot] &^ hbit
if bi == dbit {
// delete marker so skip.
buf := mb.cache.buf[li:]
hdr := buf[:msgHdrSize]
slen := int(le.Uint16(hdr[20:]))
if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) {
seq := le.Uint64(hdr[4:])
if seq < fseq || seq&ebit != 0 || mb.dmap.Exists(seq) {
continue
}
li := int(bi) - mb.cache.off
if li >= len(mb.cache.buf) {
// Can't overwrite ss.Last, just skip.
return
}
buf := mb.cache.buf[li:]
hdr := buf[:msgHdrSize]
slen := int(le.Uint16(hdr[20:]))
if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) {
seq := le.Uint64(hdr[4:])
if seq > lseq || seq&ebit != 0 || mb.dmap.Exists(seq) {
continue
}
// Sequence should never be lower, but guard against it nonetheless.
if seq < ss.First {
seq = ss.First
}
ss.First = seq
if ss.Msgs == 1 {
ss.Last = seq
if ss.Msgs == 1 {
ss.First = seq
ss.firstNeedsUpdate = false
}
return
}
return
}
}
}
2 changes: 1 addition & 1 deletion server/filestore_test.go
Original file line number Diff line number Diff line change
@@ -5030,7 +5030,7 @@ func TestFileStoreRecaluclateFirstForSubjBug(t *testing.T) {
mb.clearCacheAndOffset()
// Now call with start sequence of 1, the old one
// This will panic without the fix.
mb.recalculateForSubj("foo", ss)
mb.recalculateFirstForSubj("foo", 1, ss)
// Make sure it was update properly.
require_True(t, *ss == SimpleState{Msgs: 1, First: 3, Last: 3, firstNeedsUpdate: false})
}
72 changes: 25 additions & 47 deletions server/memstore.go
Original file line number Diff line number Diff line change
@@ -143,8 +143,8 @@ func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts int
return ErrMaxBytes
}
// If we are here we are at a subject maximum, need to determine if dropping last message gives us enough room.
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
ms.recalculateForSubj(subj, ss)
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, ss.First, ss)
}
sm, ok := ms.msgs[ss.First]
if !ok || memStoreMsgSize(sm.subj, sm.hdr, sm.msg) < memStoreMsgSize(subj, hdr, msg) {
@@ -430,8 +430,8 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje
var totalSkipped uint64
// We will track start and end sequences as we go.
ms.fss.Match(stringToBytes(filter), func(subj []byte, fss *SimpleState) {
if fss.firstNeedsUpdate || fss.lastNeedsUpdate {
ms.recalculateForSubj(bytesToString(subj), fss)
if fss.firstNeedsUpdate {
ms.recalculateFirstForSubj(bytesToString(subj), fss.First, fss)
}
if sseq <= fss.First {
update(fss)
@@ -585,8 +585,8 @@ func (ms *memStore) SubjectsState(subject string) map[string]SimpleState {
fss := make(map[string]SimpleState)
ms.fss.Match(stringToBytes(subject), func(subj []byte, ss *SimpleState) {
subjs := string(subj)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
ms.recalculateForSubj(subjs, ss)
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subjs, ss.First, ss)
}
oss := fss[subjs]
if oss.First == 0 { // New
@@ -675,8 +675,8 @@ func (ms *memStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject boo
var totalSkipped uint64
// We will track start and end sequences as we go.
IntersectStree[SimpleState](ms.fss, sl, func(subj []byte, fss *SimpleState) {
if fss.firstNeedsUpdate || fss.lastNeedsUpdate {
ms.recalculateForSubj(bytesToString(subj), fss)
if fss.firstNeedsUpdate {
ms.recalculateFirstForSubj(bytesToString(subj), fss.First, fss)
}
if sseq <= fss.First {
update(fss)
@@ -793,8 +793,8 @@ func (ms *memStore) enforcePerSubjectLimit(subj string, ss *SimpleState) {
return
}
for nmsgs := ss.Msgs; nmsgs > uint64(ms.maxp); nmsgs = ss.Msgs {
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
ms.recalculateForSubj(subj, ss)
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, ss.First, ss)
}
if !ms.removeMsg(ss.First, false) {
break
@@ -1267,8 +1267,8 @@ func (ms *memStore) LoadNextMsg(filter string, wc bool, start uint64, smp *Store
if !ok {
continue
}
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
ms.recalculateForSubj(subj, ss)
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, ss.First, ss)
}
if ss.First < fseq {
fseq = ss.First
@@ -1362,47 +1362,25 @@ func (ms *memStore) removeSeqPerSubject(subj string, seq uint64) {
}
ss.Msgs--

// We can lazily calculate the first/last sequence when needed.
// We can lazily calculate the first sequence when needed.
ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate
ss.lastNeedsUpdate = seq == ss.Last || ss.lastNeedsUpdate
}

// Will recalculate the first and/or last sequence for this subject.
// Will recalculate the first sequence for this subject in this block.
// Lock should be held.
func (ms *memStore) recalculateForSubj(subj string, ss *SimpleState) {
if ss.firstNeedsUpdate {
tseq := ss.First + 1
if tseq < ms.state.FirstSeq {
tseq = ms.state.FirstSeq
}
for ; tseq <= ss.Last; tseq++ {
if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj {
ss.First = tseq
ss.firstNeedsUpdate = false
if ss.Msgs == 1 {
ss.Last = tseq
ss.lastNeedsUpdate = false
return
}
break
}
}
}
if ss.lastNeedsUpdate {
tseq := ss.Last - 1
if tseq > ms.state.LastSeq {
tseq = ms.state.LastSeq
}
for ; tseq >= ss.First; tseq-- {
if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj {
func (ms *memStore) recalculateFirstForSubj(subj string, startSeq uint64, ss *SimpleState) {
tseq := startSeq + 1
if tseq < ms.state.FirstSeq {
tseq = ms.state.FirstSeq
}
for ; tseq <= ss.Last; tseq++ {
if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj {
ss.First = tseq
if ss.Msgs == 1 {
ss.Last = tseq
ss.lastNeedsUpdate = false
if ss.Msgs == 1 {
ss.First = tseq
ss.firstNeedsUpdate = false
}
return
}
ss.firstNeedsUpdate = false
return
}
}
}
Loading
Loading