diff --git a/server/consumer.go b/server/consumer.go index ec98c8ae92c..44b65f795f8 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -321,6 +321,7 @@ type consumer struct { stream string sseq uint64 // next stream sequence subjf subjectFilters // subject filters and their sequences + filters *Sublist // When we have multiple filters we will use LoadNextMsgMulti and pass this in. dseq uint64 // delivered consumer sequence adflr uint64 // ack delivery floor asflr uint64 // ack store floor @@ -401,12 +402,8 @@ type consumer struct { // A single subject filter. type subjectFilter struct { subject string - nextSeq uint64 - currentSeq uint64 - pmsg *jsPubMsg - err error - hasWildcard bool tokenizedSubject []string + hasWildcard bool } type subjectFilters []*subjectFilter @@ -947,8 +944,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri o.store = store } - subjects := gatherSubjectFilters(o.cfg.FilterSubject, o.cfg.FilterSubjects) - for _, filter := range subjects { + for _, filter := range gatherSubjectFilters(o.cfg.FilterSubject, o.cfg.FilterSubjects) { sub := &subjectFilter{ subject: filter, hasWildcard: subjectHasWildcard(filter), @@ -957,6 +953,18 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri o.subjf = append(o.subjf, sub) } + // If we have multiple filter subjects, create a sublist which we will use + // in calling store.LoadNextMsgMulti. + if len(o.cfg.FilterSubjects) > 0 { + o.filters = NewSublistWithCache() + for _, filter := range o.cfg.FilterSubjects { + o.filters.Insert(&subscription{subject: []byte(filter)}) + } + } else { + // Make sure this is nil otherwise. + o.filters = nil + } + if o.store != nil && o.store.HasState() { // Restore our saved state. o.mu.Lock() @@ -1967,16 +1975,6 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error { hasWildcard: subjectHasWildcard(newFilter), tokenizedSubject: tokenizeSubjectIntoSlice(nil, newFilter), } - // If given subject was present, we will retain its fields values - // so `getNextMgs` can take advantage of already buffered `pmsgs`. - for _, oldFilter := range o.subjf { - if oldFilter.subject == newFilter { - fs.currentSeq = oldFilter.currentSeq - fs.nextSeq = oldFilter.nextSeq - fs.pmsg = oldFilter.pmsg - } - continue - } newSubjf = append(newSubjf, fs) } // Make sure we have correct signaling setup. @@ -1990,8 +1988,17 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error { // If filters were removed, set `o.subjf` to nil. if len(newSubjf) == 0 { o.subjf = nil + o.filters = nil } else { o.subjf = newSubjf + if len(o.subjf) == 1 { + o.filters = nil + } else { + o.filters = NewSublistWithCache() + for _, filter := range o.subjf { + o.filters.Insert(&subscription{subject: []byte(filter.subject)}) + } + } } } @@ -3590,100 +3597,39 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) { // Hold onto this since we release the lock. store := o.mset.store - // If no filters are specified, optimize to fetch just non-filtered messages. - if len(o.subjf) == 0 { - // Grab next message applicable to us. - // We will unlock here in case lots of contention, e.g. WQ. - o.mu.Unlock() - pmsg := getJSPubMsgFromPool() - sm, sseq, err := store.LoadNextMsg(_EMPTY_, false, o.sseq, &pmsg.StoreMsg) - if sm == nil { - pmsg.returnToPool() - pmsg = nil - } - o.mu.Lock() - if sseq >= o.sseq { - o.sseq = sseq + 1 - if err == ErrStoreEOF { - o.updateSkipped(o.sseq) - } - } - return pmsg, 1, err - } - - // if we have filters, iterate over filters and optimize by buffering found messages. - for _, filter := range o.subjf { - if filter.nextSeq < o.sseq { - // o.subjf should always point to the right starting point for reading messages - // if anything modified it, make sure our sequence do not start earlier. - filter.nextSeq = o.sseq - } - // if this subject didn't fetch any message before, do it now - if filter.pmsg == nil { - // We will unlock here in case lots of contention, e.g. WQ. - filterSubject, filterWC, nextSeq := filter.subject, filter.hasWildcard, filter.nextSeq - o.mu.Unlock() - pmsg := getJSPubMsgFromPool() - sm, sseq, err := store.LoadNextMsg(filterSubject, filterWC, nextSeq, &pmsg.StoreMsg) - o.mu.Lock() - - filter.err = err - - if sm != nil { - filter.pmsg = pmsg - } else { - pmsg.returnToPool() - pmsg = nil - } - if sseq >= filter.nextSeq { - filter.nextSeq = sseq + 1 - } + var sseq uint64 + var err error + var sm *StoreMsg + var pmsg = getJSPubMsgFromPool() - // If we're sure that this filter has continuous sequence of messages, skip looking up other filters. - if nextSeq == sseq && err != ErrStoreEOF { - break - } - } + // Grab next message applicable to us. + // We will unlock here in case lots of contention, e.g. WQ. + o.mu.Unlock() + // Check if we are multi-filtered or not. + if o.filters != nil { + sm, sseq, err = store.LoadNextMsgMulti(o.filters, o.sseq, &pmsg.StoreMsg) + } else if o.subjf != nil { // Means single filtered subject since o.filters means > 1. + filter, wc := o.subjf[0].subject, o.subjf[0].hasWildcard + sm, sseq, err = store.LoadNextMsg(filter, wc, o.sseq, &pmsg.StoreMsg) + } else { + // No filter here. + sm, sseq, err = store.LoadNextMsg(_EMPTY_, false, o.sseq, &pmsg.StoreMsg) } - - // Don't sort the o.subjf if it's only one entry - // Sort uses `reflect` and can noticeably slow down fetching, - // even if len == 0 or 1. - // TODO(tp): we should have sort based off generics for server - // to avoid reflection. - if len(o.subjf) > 1 { - sort.Slice(o.subjf, func(i, j int) bool { - if o.subjf[j].pmsg != nil && o.subjf[i].pmsg == nil { - return false - } - if o.subjf[i].pmsg != nil && o.subjf[j].pmsg == nil { - return true - } - return o.subjf[j].nextSeq > o.subjf[i].nextSeq - }) + if sm == nil { + pmsg.returnToPool() + pmsg = nil } - // Grab next message applicable to us. - // Sort sequences first, to grab the first message. - filter := o.subjf[0] - err := filter.err - // This means we got a message in this subject fetched. - if filter.pmsg != nil { - filter.currentSeq = filter.nextSeq - o.sseq = filter.currentSeq - returned := filter.pmsg - filter.pmsg = nil - return returned, 1, err - } - - // set o.sseq to the first subject sequence - if filter.nextSeq > o.sseq { - o.sseq = filter.nextSeq - if err == ErrStoreEOF { - o.updateSkipped(o.sseq) + o.mu.Lock() + // Check if we should move our o.sseq. + if sseq >= o.sseq { + // If we are moving step by step then sseq == o.sseq. + // If we have jumped we should update skipped for other replicas. + if sseq != o.sseq && err == ErrStoreEOF { + o.updateSkipped(sseq + 1) } + o.sseq = sseq + 1 } - - return nil, 0, err + return pmsg, 1, err } // Will check for expiration and lack of interest on waiting requests. @@ -4102,17 +4048,6 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) { wr.hbt = time.Now().Add(wr.hb) } } else { - if o.subjf != nil { - tsa := [32]string{} - tts := tokenizeSubjectIntoSlice(tsa[:0], pmsg.subj) - for i, filter := range o.subjf { - if isSubsetMatchTokenized(tts, filter.tokenizedSubject) { - o.subjf[i].currentSeq-- - o.subjf[i].nextSeq-- - break - } - } - } // We will redo this one. o.sseq-- if dc == 1 { @@ -4339,11 +4274,7 @@ func (o *consumer) calculateNumPending() (npc, npf uint64) { } // Consumer with filters. for _, filter := range o.subjf { - // We might loose state of o.subjf, so if we do recover from o.sseq - if filter.currentSeq < o.sseq { - filter.currentSeq = o.sseq - } - lnpc, lnpf := o.mset.store.NumPending(filter.currentSeq, filter.subject, false) + lnpc, lnpf := o.mset.store.NumPending(o.sseq, filter.subject, false) npc += lnpc if lnpf > npf { npf = lnpf // Always last @@ -4865,7 +4796,6 @@ func (o *consumer) selectStartingSeqNo() { // If we are partitioned here this will be properly set when we become leader. for _, filter := range o.subjf { ss := o.mset.store.FilteredState(1, filter.subject) - filter.nextSeq = ss.Last if ss.Last > o.sseq { o.sseq = ss.Last } @@ -4946,30 +4876,11 @@ func (o *consumer) selectStartingSeqNo() { if state.FirstSeq == 0 { o.sseq = 1 - for _, filter := range o.subjf { - filter.nextSeq = 1 - } } else if o.sseq < state.FirstSeq { o.sseq = state.FirstSeq } else if o.sseq > state.LastSeq { o.sseq = state.LastSeq + 1 } - for _, filter := range o.subjf { - if state.FirstSeq == 0 { - filter.nextSeq = 1 - } - if filter.nextSeq < state.FirstSeq { - filter.nextSeq = state.FirstSeq - } - if filter.nextSeq > state.LastSeq { - filter.nextSeq = state.LastSeq + 1 - } - } - } - if o.subjf != nil { - sort.Slice(o.subjf, func(i, j int) bool { - return o.subjf[j].nextSeq > o.subjf[i].nextSeq - }) } // Always set delivery sequence to 1. diff --git a/server/filestore.go b/server/filestore.go index 8552c84b110..642d354841d 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -2238,6 +2238,50 @@ func (fs *fileStore) GetSeqFromTime(t time.Time) uint64 { return 0 } +// Find the first matching message against a sublist. +func (mb *msgBlock) firstMatchingMulti(sl *Sublist, start uint64, sm *StoreMsg) (*StoreMsg, bool, error) { + mb.mu.Lock() + defer mb.mu.Unlock() + + // Will just do linear walk for now. + // TODO(dlc) - Be better at skipping blocks that will not match us regardless. + + var didLoad bool + // Need messages loaded from here on out. + if mb.cacheNotLoaded() { + if err := mb.loadMsgsWithLock(); err != nil { + return nil, false, err + } + didLoad = true + } + + // Make sure to start at mb.first.seq if fseq < mb.first.seq + if seq := atomic.LoadUint64(&mb.first.seq); seq > start { + start = seq + } + lseq := atomic.LoadUint64(&mb.last.seq) + + if sm == nil { + sm = new(StoreMsg) + } + + for seq := start; seq <= lseq; seq++ { + llseq := mb.llseq + fsm, err := mb.cacheLookup(seq, sm) + if err != nil { + continue + } + expireOk := seq == lseq && mb.llseq == seq + + if r := sl.Match(fsm.subj); len(r.psubs) > 0 { + return fsm, expireOk, nil + } + // If we are here we did not match, so put the llseq back. + mb.llseq = llseq + } + return nil, didLoad, ErrStoreMsgNotFound +} + // Find the first matching message. func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *StoreMsg) (*StoreMsg, bool, error) { mb.mu.Lock() @@ -2277,6 +2321,7 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor } // Only do linear scan if isAll or we are wildcarded and have to traverse more fss than actual messages. doLinearScan := isAll || (wc && len(mb.fss) > int(lseq-fseq)) + if !doLinearScan { // If we have a wildcard match against all tracked subjects we know about. if wc { @@ -6158,6 +6203,44 @@ func (fs *fileStore) LoadLastMsg(subject string, smv *StoreMsg) (sm *StoreMsg, e return sm, err } +// LoadNextMsgMulti will find the next message matching any entry in the sublist. +func (fs *fileStore) LoadNextMsgMulti(sl *Sublist, start uint64, smp *StoreMsg) (sm *StoreMsg, skip uint64, err error) { + if sl == nil { + return fs.LoadNextMsg(_EMPTY_, false, start, smp) + } + fs.mu.RLock() + defer fs.mu.RUnlock() + + if fs.closed { + return nil, 0, ErrStoreClosed + } + if fs.state.Msgs == 0 { + return nil, fs.state.LastSeq, ErrStoreEOF + } + if start < fs.state.FirstSeq { + start = fs.state.FirstSeq + } + + if bi, _ := fs.selectMsgBlockWithIndex(start); bi >= 0 { + for i := bi; i < len(fs.blks); i++ { + mb := fs.blks[i] + if sm, expireOk, err := mb.firstMatchingMulti(sl, start, smp); err == nil { + if expireOk { + mb.tryForceExpireCache() + } + return sm, sm.seq, nil + } else if err != ErrStoreMsgNotFound { + return nil, 0, err + } else if expireOk { + mb.tryForceExpireCache() + } + } + } + + return nil, fs.state.LastSeq, ErrStoreEOF + +} + func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *StoreMsg) (*StoreMsg, uint64, error) { fs.mu.RLock() defer fs.mu.RUnlock() @@ -6165,6 +6248,9 @@ func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *Store if fs.closed { return nil, 0, ErrStoreClosed } + if fs.state.Msgs == 0 { + return nil, fs.state.LastSeq, ErrStoreEOF + } if start < fs.state.FirstSeq { start = fs.state.FirstSeq } diff --git a/server/filestore_test.go b/server/filestore_test.go index de25e7728f7..936b3ef33fc 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -6755,6 +6755,67 @@ func TestFileStoreMsgBlockFirstAndLastSeqCorrupt(t *testing.T) { require_Equal(t, lseq, 10) } +func TestFileStoreMsgLoadNextMsgMulti(t *testing.T) { + fs, err := newFileStore( + FileStoreConfig{StoreDir: t.TempDir()}, + StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage}) + require_NoError(t, err) + defer fs.Stop() + + // Put 1k msgs in + for i := 0; i < 1000; i++ { + subj := fmt.Sprintf("foo.%d", i) + fs.StoreMsg(subj, nil, []byte("ZZZ")) + } + + var smv StoreMsg + // Do multi load next with 1 wc entry. + sl := NewSublistWithCache() + sl.Insert(&subscription{subject: []byte("foo.>")}) + for i, seq := 0, uint64(1); i < 1000; i++ { + sm, nseq, err := fs.LoadNextMsgMulti(sl, seq, &smv) + require_NoError(t, err) + require_True(t, sm.subj == fmt.Sprintf("foo.%d", i)) + require_Equal(t, nseq, seq) + seq++ + } + + // Now do multi load next with 1000 literal subjects. + sl = NewSublistWithCache() + for i := 0; i < 1000; i++ { + subj := fmt.Sprintf("foo.%d", i) + sl.Insert(&subscription{subject: []byte(subj)}) + } + for i, seq := 0, uint64(1); i < 1000; i++ { + sm, nseq, err := fs.LoadNextMsgMulti(sl, seq, &smv) + require_NoError(t, err) + require_True(t, sm.subj == fmt.Sprintf("foo.%d", i)) + require_Equal(t, nseq, seq) + seq++ + } + + // Check that we can pull out 3 individuals. + sl = NewSublistWithCache() + sl.Insert(&subscription{subject: []byte("foo.2")}) + sl.Insert(&subscription{subject: []byte("foo.222")}) + sl.Insert(&subscription{subject: []byte("foo.999")}) + sm, seq, err := fs.LoadNextMsgMulti(sl, 1, &smv) + require_NoError(t, err) + require_True(t, sm.subj == "foo.2") + require_Equal(t, seq, 3) + sm, seq, err = fs.LoadNextMsgMulti(sl, seq+1, &smv) + require_NoError(t, err) + require_True(t, sm.subj == "foo.222") + require_Equal(t, seq, 223) + sm, seq, err = fs.LoadNextMsgMulti(sl, seq+1, &smv) + require_NoError(t, err) + require_True(t, sm.subj == "foo.999") + require_Equal(t, seq, 1000) + _, seq, err = fs.LoadNextMsgMulti(sl, seq+1, &smv) + require_Error(t, err) + require_Equal(t, seq, 1000) +} + /////////////////////////////////////////////////////////////////////////// // Benchmarks /////////////////////////////////////////////////////////////////////////// diff --git a/server/jetstream_consumer_test.go b/server/jetstream_consumer_test.go index 8e4f0338b27..2bc4108246e 100644 --- a/server/jetstream_consumer_test.go +++ b/server/jetstream_consumer_test.go @@ -32,7 +32,6 @@ import ( ) func TestJetStreamConsumerMultipleFiltersRemoveFilters(t *testing.T) { - s := RunBasicJetStreamServer(t) defer s.Shutdown() @@ -78,7 +77,6 @@ func TestJetStreamConsumerMultipleFiltersRemoveFilters(t *testing.T) { msgs, err = consumer.Fetch(1) require_NoError(t, err) require_True(t, len(msgs) == 1) - } func TestJetStreamConsumerMultipleFiltersRace(t *testing.T) { @@ -274,7 +272,8 @@ func TestJetStreamConsumerMultipleConsumersSingleFilter(t *testing.T) { info, err := js.ConsumerInfo("TEST", consumer.name) require_NoError(t, err) if info.Delivered.Consumer != uint64(consumer.expectedMsgs) { - return fmt.Errorf("%v:expected consumer delivered seq %v, got %v. actually delivered: %v", consumer.name, consumer.expectedMsgs, info.Delivered.Consumer, consumer.delivered.Load()) + return fmt.Errorf("%v:expected consumer delivered seq %v, got %v. actually delivered: %v", + consumer.name, consumer.expectedMsgs, info.Delivered.Consumer, consumer.delivered.Load()) } if info.AckFloor.Consumer != uint64(consumer.expectedMsgs) { return fmt.Errorf("%v: expected consumer ack floor %v, got %v", consumer.name, totalMsgs, info.AckFloor.Consumer) @@ -399,7 +398,8 @@ func TestJetStreamConsumerMultipleConsumersMultipleFilters(t *testing.T) { info, err := js.ConsumerInfo("TEST", consumer.name) require_NoError(t, err) if info.Delivered.Consumer != uint64(consumer.expectedMsgs) { - return fmt.Errorf("%v:expected consumer delivered seq %v, got %v. actually delivered: %v", consumer.name, consumer.expectedMsgs, info.Delivered.Consumer, consumer.delivered.Load()) + return fmt.Errorf("%v:expected consumer delivered seq %v, got %v. actually delivered: %v", + consumer.name, consumer.expectedMsgs, info.Delivered.Consumer, consumer.delivered.Load()) } if info.AckFloor.Consumer != uint64(consumer.expectedMsgs) { return fmt.Errorf("%v: expected consumer ack floor %v, got %v", consumer.name, totalMsgs, info.AckFloor.Consumer) @@ -621,7 +621,6 @@ func TestJetStreamConsumerActionsOnWorkQueuePolicyStream(t *testing.T) { } func TestJetStreamConsumerActionsViaAPI(t *testing.T) { - s := RunBasicJetStreamServer(t) defer s.Shutdown() diff --git a/server/memstore.go b/server/memstore.go index 780f7b533c0..2af1d2c686c 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -1033,6 +1033,40 @@ func (ms *memStore) LoadLastMsg(subject string, smp *StoreMsg) (*StoreMsg, error return smp, nil } +// LoadNextMsgMulti will find the next message matching any entry in the sublist. +func (ms *memStore) LoadNextMsgMulti(sl *Sublist, start uint64, smp *StoreMsg) (sm *StoreMsg, skip uint64, err error) { + // TODO(dlc) - for now simple linear walk to get started. + ms.mu.RLock() + defer ms.mu.RUnlock() + + if start < ms.state.FirstSeq { + start = ms.state.FirstSeq + } + + // If past the end no results. + if start > ms.state.LastSeq || ms.state.Msgs == 0 { + return nil, ms.state.LastSeq, ErrStoreEOF + } + + // Initial setup. + fseq, lseq := start, ms.state.LastSeq + + for nseq := fseq; nseq <= lseq; nseq++ { + sm, ok := ms.msgs[nseq] + if !ok { + continue + } + if r := sl.Match(sm.subj); len(r.psubs) > 0 { + if smp == nil { + smp = new(StoreMsg) + } + sm.copy(smp) + return smp, nseq, nil + } + } + return nil, ms.state.LastSeq, ErrStoreEOF +} + // LoadNextMsg will find the next message matching the filter subject starting at the start sequence. // The filter subject can be a wildcard. func (ms *memStore) LoadNextMsg(filter string, wc bool, start uint64, smp *StoreMsg) (*StoreMsg, uint64, error) { @@ -1044,7 +1078,7 @@ func (ms *memStore) LoadNextMsg(filter string, wc bool, start uint64, smp *Store } // If past the end no results. - if start > ms.state.LastSeq { + if start > ms.state.LastSeq || ms.state.Msgs == 0 { return nil, ms.state.LastSeq, ErrStoreEOF } @@ -1053,7 +1087,7 @@ func (ms *memStore) LoadNextMsg(filter string, wc bool, start uint64, smp *Store } isAll := filter == fwcs - // Skip scan of ms.fss is number of messages in the block are less than + // Skip scan of ms.fss if number of messages in the block are less than // 1/2 the number of subjects in ms.fss. Or we have a wc and lots of fss entries. const linearScanMaxFSS = 256 doLinearScan := isAll || 2*int(ms.state.LastSeq-start) < ms.fss.Size() || (wc && ms.fss.Size() > linearScanMaxFSS) diff --git a/server/memstore_test.go b/server/memstore_test.go index 3c049233447..2d35262b855 100644 --- a/server/memstore_test.go +++ b/server/memstore_test.go @@ -1051,3 +1051,67 @@ func TestMemStorePurgeExWithDeletedMsgs(t *testing.T) { require_Equal(t, state.LastSeq, 10) require_Equal(t, state.Msgs, 1) } + +func TestMemStoreMsgLoadNextMsgMulti(t *testing.T) { + cfg := &StreamConfig{ + Name: "zzz", + Subjects: []string{"foo.*"}, + Storage: MemoryStorage, + } + ms, err := newMemStore(cfg) + require_NoError(t, err) + defer ms.Stop() + + // Put 1k msgs in + for i := 0; i < 1000; i++ { + subj := fmt.Sprintf("foo.%d", i) + ms.StoreMsg(subj, nil, []byte("ZZZ")) + } + + var smv StoreMsg + // Do multi load next with 1 wc entry. + sl := NewSublistWithCache() + sl.Insert(&subscription{subject: []byte("foo.>")}) + for i, seq := 0, uint64(1); i < 1000; i++ { + sm, nseq, err := ms.LoadNextMsgMulti(sl, seq, &smv) + require_NoError(t, err) + require_True(t, sm.subj == fmt.Sprintf("foo.%d", i)) + require_Equal(t, nseq, seq) + seq++ + } + + // Now do multi load next with 1000 literal subjects. + sl = NewSublistWithCache() + for i := 0; i < 1000; i++ { + subj := fmt.Sprintf("foo.%d", i) + sl.Insert(&subscription{subject: []byte(subj)}) + } + for i, seq := 0, uint64(1); i < 1000; i++ { + sm, nseq, err := ms.LoadNextMsgMulti(sl, seq, &smv) + require_NoError(t, err) + require_True(t, sm.subj == fmt.Sprintf("foo.%d", i)) + require_Equal(t, nseq, seq) + seq++ + } + + // Check that we can pull out 3 individuals. + sl = NewSublistWithCache() + sl.Insert(&subscription{subject: []byte("foo.2")}) + sl.Insert(&subscription{subject: []byte("foo.222")}) + sl.Insert(&subscription{subject: []byte("foo.999")}) + sm, seq, err := ms.LoadNextMsgMulti(sl, 1, &smv) + require_NoError(t, err) + require_True(t, sm.subj == "foo.2") + require_Equal(t, seq, 3) + sm, seq, err = ms.LoadNextMsgMulti(sl, seq+1, &smv) + require_NoError(t, err) + require_True(t, sm.subj == "foo.222") + require_Equal(t, seq, 223) + sm, seq, err = ms.LoadNextMsgMulti(sl, seq+1, &smv) + require_NoError(t, err) + require_True(t, sm.subj == "foo.999") + require_Equal(t, seq, 1000) + _, seq, err = ms.LoadNextMsgMulti(sl, seq+1, &smv) + require_Error(t, err) + require_Equal(t, seq, 1000) +} diff --git a/server/norace_test.go b/server/norace_test.go index 800ccb27606..e97e69c0457 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -10121,3 +10121,160 @@ func TestNoRaceConnectionObjectReleased(t *testing.T) { } } } + +func TestNoRaceFileStoreMsgLoadNextMsgMultiPerf(t *testing.T) { + fs, err := newFileStore( + FileStoreConfig{StoreDir: t.TempDir()}, + StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage}) + require_NoError(t, err) + defer fs.Stop() + + // Put 1k msgs in + for i := 0; i < 1000; i++ { + subj := fmt.Sprintf("foo.%d", i) + fs.StoreMsg(subj, nil, []byte("ZZZ")) + } + + var smv StoreMsg + + // Now do normal load next with no filter. + // This is baseline. + start := time.Now() + for i, seq := 0, uint64(1); i < 1000; i++ { + sm, nseq, err := fs.LoadNextMsg(_EMPTY_, false, seq, &smv) + require_NoError(t, err) + require_True(t, sm.subj == fmt.Sprintf("foo.%d", i)) + require_Equal(t, nseq, seq) + seq++ + } + baseline := time.Since(start) + t.Logf("Single - No filter %v", baseline) + + // Now do normal load next with wc filter. + start = time.Now() + for i, seq := 0, uint64(1); i < 1000; i++ { + sm, nseq, err := fs.LoadNextMsg("foo.>", true, seq, &smv) + require_NoError(t, err) + require_True(t, sm.subj == fmt.Sprintf("foo.%d", i)) + require_Equal(t, nseq, seq) + seq++ + } + elapsed := time.Since(start) + require_True(t, elapsed < 2*baseline) + t.Logf("Single - WC filter %v", elapsed) + + // Now do multi load next with 1 wc entry. + sl := NewSublistWithCache() + sl.Insert(&subscription{subject: []byte("foo.>")}) + start = time.Now() + for i, seq := 0, uint64(1); i < 1000; i++ { + sm, nseq, err := fs.LoadNextMsgMulti(sl, seq, &smv) + require_NoError(t, err) + require_True(t, sm.subj == fmt.Sprintf("foo.%d", i)) + require_Equal(t, nseq, seq) + seq++ + } + elapsed = time.Since(start) + require_True(t, elapsed < 2*baseline) + t.Logf("Multi - Single WC filter %v", elapsed) + + // Now do multi load next with 1000 literal subjects. + sl = NewSublistWithCache() + for i := 0; i < 1000; i++ { + subj := fmt.Sprintf("foo.%d", i) + sl.Insert(&subscription{subject: []byte(subj)}) + } + start = time.Now() + for i, seq := 0, uint64(1); i < 1000; i++ { + sm, nseq, err := fs.LoadNextMsgMulti(sl, seq, &smv) + require_NoError(t, err) + require_True(t, sm.subj == fmt.Sprintf("foo.%d", i)) + require_Equal(t, nseq, seq) + seq++ + } + elapsed = time.Since(start) + require_True(t, elapsed < 2*baseline) + t.Logf("Multi - 1000 filters %v", elapsed) +} + +func TestNoRaceWQAndMultiSubjectFilters(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"Z.>"}, + Retention: nats.WorkQueuePolicy, + }) + require_NoError(t, err) + + stopPubs := make(chan bool) + + publish := func(subject string) { + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + for { + select { + case <-stopPubs: + return + default: + _, _ = js.Publish(subject, []byte("hello")) + } + } + } + + go publish("Z.foo") + go publish("Z.bar") + go publish("Z.baz") + + // Cancel pubs after 10s. + time.AfterFunc(10*time.Second, func() { close(stopPubs) }) + + // Create a consumer + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "zzz", + AckPolicy: nats.AckExplicitPolicy, + AckWait: 5 * time.Second, + FilterSubjects: []string{"Z.foo", "Z.bar", "Z.baz"}, + }) + require_NoError(t, err) + + sub, err := js.PullSubscribe(_EMPTY_, "zzz", nats.Bind("TEST", "zzz")) + require_NoError(t, err) + + received := make([]uint64, 0, 256_000) + batchSize := 10 + + for running := true; running; { + msgs, err := sub.Fetch(batchSize, nats.MaxWait(2*time.Second)) + if err == nats.ErrTimeout { + running = false + } + for _, m := range msgs { + meta, err := m.Metadata() + require_NoError(t, err) + received = append(received, meta.Sequence.Stream) + m.Ack() + } + } + + sort.Slice(received, func(i, j int) bool { return received[i] < received[j] }) + + var pseq, gaps uint64 + for _, seq := range received { + if pseq != 0 && pseq != seq-1 { + gaps += seq - pseq + 1 + } + pseq = seq + } + si, err := js.StreamInfo("TEST") + require_NoError(t, err) + + if si.State.Msgs != 0 || gaps > 0 { + t.Fatalf("Orphaned msgs %d with %d gaps detected", si.State.Msgs, gaps) + } +} diff --git a/server/store.go b/server/store.go index e013ac1aa96..515b305871c 100644 --- a/server/store.go +++ b/server/store.go @@ -90,6 +90,7 @@ type StreamStore interface { SkipMsgs(seq uint64, num uint64) error LoadMsg(seq uint64, sm *StoreMsg) (*StoreMsg, error) LoadNextMsg(filter string, wc bool, start uint64, smp *StoreMsg) (sm *StoreMsg, skip uint64, err error) + LoadNextMsgMulti(sl *Sublist, start uint64, smp *StoreMsg) (sm *StoreMsg, skip uint64, err error) LoadLastMsg(subject string, sm *StoreMsg) (*StoreMsg, error) RemoveMsg(seq uint64) (bool, error) EraseMsg(seq uint64) (bool, error)