Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] Multi-Filtered Consumers #5274

Merged
merged 2 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 53 additions & 142 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ type consumer struct {
stream string
sseq uint64 // next stream sequence
subjf subjectFilters // subject filters and their sequences
filters *Sublist // When we have multiple filters we will use LoadNextMsgMulti and pass this in.
dseq uint64 // delivered consumer sequence
adflr uint64 // ack delivery floor
asflr uint64 // ack store floor
Expand Down Expand Up @@ -401,12 +402,8 @@ type consumer struct {
// A single subject filter.
type subjectFilter struct {
subject string
nextSeq uint64
currentSeq uint64
pmsg *jsPubMsg
err error
hasWildcard bool
tokenizedSubject []string
hasWildcard bool
}

type subjectFilters []*subjectFilter
Expand Down Expand Up @@ -947,8 +944,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
o.store = store
}

subjects := gatherSubjectFilters(o.cfg.FilterSubject, o.cfg.FilterSubjects)
for _, filter := range subjects {
for _, filter := range gatherSubjectFilters(o.cfg.FilterSubject, o.cfg.FilterSubjects) {
sub := &subjectFilter{
subject: filter,
hasWildcard: subjectHasWildcard(filter),
Expand All @@ -957,6 +953,18 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
o.subjf = append(o.subjf, sub)
}

// If we have multiple filter subjects, create a sublist which we will use
// in calling store.LoadNextMsgMulti.
if len(o.cfg.FilterSubjects) > 0 {
o.filters = NewSublistWithCache()
for _, filter := range o.cfg.FilterSubjects {
o.filters.Insert(&subscription{subject: []byte(filter)})
}
} else {
// Make sure this is nil otherwise.
o.filters = nil
}

if o.store != nil && o.store.HasState() {
// Restore our saved state.
o.mu.Lock()
Expand Down Expand Up @@ -1967,16 +1975,6 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
hasWildcard: subjectHasWildcard(newFilter),
tokenizedSubject: tokenizeSubjectIntoSlice(nil, newFilter),
}
// If given subject was present, we will retain its fields values
// so `getNextMgs` can take advantage of already buffered `pmsgs`.
for _, oldFilter := range o.subjf {
if oldFilter.subject == newFilter {
fs.currentSeq = oldFilter.currentSeq
fs.nextSeq = oldFilter.nextSeq
fs.pmsg = oldFilter.pmsg
}
continue
}
newSubjf = append(newSubjf, fs)
}
// Make sure we have correct signaling setup.
Expand All @@ -1990,8 +1988,17 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
// If filters were removed, set `o.subjf` to nil.
if len(newSubjf) == 0 {
o.subjf = nil
o.filters = nil
} else {
o.subjf = newSubjf
if len(o.subjf) == 1 {
o.filters = nil
} else {
o.filters = NewSublistWithCache()
for _, filter := range o.subjf {
o.filters.Insert(&subscription{subject: []byte(filter.subject)})
}
}
}
}

Expand Down Expand Up @@ -3590,100 +3597,39 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) {
// Hold onto this since we release the lock.
store := o.mset.store

// If no filters are specified, optimize to fetch just non-filtered messages.
if len(o.subjf) == 0 {
// Grab next message applicable to us.
// We will unlock here in case lots of contention, e.g. WQ.
o.mu.Unlock()
pmsg := getJSPubMsgFromPool()
sm, sseq, err := store.LoadNextMsg(_EMPTY_, false, o.sseq, &pmsg.StoreMsg)
if sm == nil {
pmsg.returnToPool()
pmsg = nil
}
o.mu.Lock()
if sseq >= o.sseq {
o.sseq = sseq + 1
if err == ErrStoreEOF {
o.updateSkipped(o.sseq)
}
}
return pmsg, 1, err
}

// if we have filters, iterate over filters and optimize by buffering found messages.
for _, filter := range o.subjf {
if filter.nextSeq < o.sseq {
// o.subjf should always point to the right starting point for reading messages
// if anything modified it, make sure our sequence do not start earlier.
filter.nextSeq = o.sseq
}
// if this subject didn't fetch any message before, do it now
if filter.pmsg == nil {
// We will unlock here in case lots of contention, e.g. WQ.
filterSubject, filterWC, nextSeq := filter.subject, filter.hasWildcard, filter.nextSeq
o.mu.Unlock()
pmsg := getJSPubMsgFromPool()
sm, sseq, err := store.LoadNextMsg(filterSubject, filterWC, nextSeq, &pmsg.StoreMsg)
o.mu.Lock()

filter.err = err

if sm != nil {
filter.pmsg = pmsg
} else {
pmsg.returnToPool()
pmsg = nil
}
if sseq >= filter.nextSeq {
filter.nextSeq = sseq + 1
}
var sseq uint64
var err error
var sm *StoreMsg
var pmsg = getJSPubMsgFromPool()

// If we're sure that this filter has continuous sequence of messages, skip looking up other filters.
if nextSeq == sseq && err != ErrStoreEOF {
break
}
}
// Grab next message applicable to us.
// We will unlock here in case lots of contention, e.g. WQ.
o.mu.Unlock()
// Check if we are multi-filtered or not.
if o.filters != nil {
sm, sseq, err = store.LoadNextMsgMulti(o.filters, o.sseq, &pmsg.StoreMsg)
} else if o.subjf != nil { // Means single filtered subject since o.filters means > 1.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are quite few implicit assumptions and relations here:

  1. That o.filters == nil means single or none filtesrs.
  2. That o.subjf != nil means signel filter

Thoes are fine, but it's easy to introduce bugs in the future by changing how we update consumers, filters etc.

filter, wc := o.subjf[0].subject, o.subjf[0].hasWildcard
sm, sseq, err = store.LoadNextMsg(filter, wc, o.sseq, &pmsg.StoreMsg)
} else {
// No filter here.
sm, sseq, err = store.LoadNextMsg(_EMPTY_, false, o.sseq, &pmsg.StoreMsg)
}

// Don't sort the o.subjf if it's only one entry
// Sort uses `reflect` and can noticeably slow down fetching,
// even if len == 0 or 1.
// TODO(tp): we should have sort based off generics for server
// to avoid reflection.
if len(o.subjf) > 1 {
sort.Slice(o.subjf, func(i, j int) bool {
if o.subjf[j].pmsg != nil && o.subjf[i].pmsg == nil {
return false
}
if o.subjf[i].pmsg != nil && o.subjf[j].pmsg == nil {
return true
}
return o.subjf[j].nextSeq > o.subjf[i].nextSeq
})
if sm == nil {
pmsg.returnToPool()
pmsg = nil
}
// Grab next message applicable to us.
// Sort sequences first, to grab the first message.
filter := o.subjf[0]
err := filter.err
// This means we got a message in this subject fetched.
if filter.pmsg != nil {
filter.currentSeq = filter.nextSeq
o.sseq = filter.currentSeq
returned := filter.pmsg
filter.pmsg = nil
return returned, 1, err
}

// set o.sseq to the first subject sequence
if filter.nextSeq > o.sseq {
o.sseq = filter.nextSeq
if err == ErrStoreEOF {
o.updateSkipped(o.sseq)
o.mu.Lock()
// Check if we should move our o.sseq.
if sseq >= o.sseq {
// If we are moving step by step then sseq == o.sseq.
// If we have jumped we should update skipped for other replicas.
if sseq != o.sseq && err == ErrStoreEOF {
o.updateSkipped(sseq + 1)
}
o.sseq = sseq + 1
}

return nil, 0, err
return pmsg, 1, err
}

// Will check for expiration and lack of interest on waiting requests.
Expand Down Expand Up @@ -4102,17 +4048,6 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
wr.hbt = time.Now().Add(wr.hb)
}
} else {
if o.subjf != nil {
tsa := [32]string{}
tts := tokenizeSubjectIntoSlice(tsa[:0], pmsg.subj)
for i, filter := range o.subjf {
if isSubsetMatchTokenized(tts, filter.tokenizedSubject) {
o.subjf[i].currentSeq--
o.subjf[i].nextSeq--
break
}
}
}
// We will redo this one.
o.sseq--
if dc == 1 {
Expand Down Expand Up @@ -4339,11 +4274,7 @@ func (o *consumer) calculateNumPending() (npc, npf uint64) {
}
// Consumer with filters.
for _, filter := range o.subjf {
// We might loose state of o.subjf, so if we do recover from o.sseq
if filter.currentSeq < o.sseq {
filter.currentSeq = o.sseq
}
lnpc, lnpf := o.mset.store.NumPending(filter.currentSeq, filter.subject, false)
lnpc, lnpf := o.mset.store.NumPending(o.sseq, filter.subject, false)
npc += lnpc
if lnpf > npf {
npf = lnpf // Always last
Expand Down Expand Up @@ -4865,7 +4796,6 @@ func (o *consumer) selectStartingSeqNo() {
// If we are partitioned here this will be properly set when we become leader.
for _, filter := range o.subjf {
ss := o.mset.store.FilteredState(1, filter.subject)
filter.nextSeq = ss.Last
if ss.Last > o.sseq {
o.sseq = ss.Last
}
Expand Down Expand Up @@ -4946,30 +4876,11 @@ func (o *consumer) selectStartingSeqNo() {

if state.FirstSeq == 0 {
o.sseq = 1
for _, filter := range o.subjf {
filter.nextSeq = 1
}
} else if o.sseq < state.FirstSeq {
o.sseq = state.FirstSeq
} else if o.sseq > state.LastSeq {
o.sseq = state.LastSeq + 1
}
for _, filter := range o.subjf {
if state.FirstSeq == 0 {
filter.nextSeq = 1
}
if filter.nextSeq < state.FirstSeq {
filter.nextSeq = state.FirstSeq
}
if filter.nextSeq > state.LastSeq {
filter.nextSeq = state.LastSeq + 1
}
}
}
if o.subjf != nil {
sort.Slice(o.subjf, func(i, j int) bool {
return o.subjf[j].nextSeq > o.subjf[i].nextSeq
})
}

// Always set delivery sequence to 1.
Expand Down
86 changes: 86 additions & 0 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2238,6 +2238,50 @@ func (fs *fileStore) GetSeqFromTime(t time.Time) uint64 {
return 0
}

// Find the first matching message against a sublist.
func (mb *msgBlock) firstMatchingMulti(sl *Sublist, start uint64, sm *StoreMsg) (*StoreMsg, bool, error) {
mb.mu.Lock()
defer mb.mu.Unlock()

// Will just do linear walk for now.
// TODO(dlc) - Be better at skipping blocks that will not match us regardless.

var didLoad bool
// Need messages loaded from here on out.
if mb.cacheNotLoaded() {
if err := mb.loadMsgsWithLock(); err != nil {
return nil, false, err
}
didLoad = true
}

// Make sure to start at mb.first.seq if fseq < mb.first.seq
if seq := atomic.LoadUint64(&mb.first.seq); seq > start {
start = seq
}
lseq := atomic.LoadUint64(&mb.last.seq)

if sm == nil {
sm = new(StoreMsg)
}

for seq := start; seq <= lseq; seq++ {
llseq := mb.llseq
fsm, err := mb.cacheLookup(seq, sm)
if err != nil {
continue
}
expireOk := seq == lseq && mb.llseq == seq

if r := sl.Match(fsm.subj); len(r.psubs) > 0 {
return fsm, expireOk, nil
}
// If we are here we did not match, so put the llseq back.
mb.llseq = llseq
}
return nil, didLoad, ErrStoreMsgNotFound
}

// Find the first matching message.
func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *StoreMsg) (*StoreMsg, bool, error) {
mb.mu.Lock()
Expand Down Expand Up @@ -2277,6 +2321,7 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor
}
// Only do linear scan if isAll or we are wildcarded and have to traverse more fss than actual messages.
doLinearScan := isAll || (wc && len(mb.fss) > int(lseq-fseq))

if !doLinearScan {
// If we have a wildcard match against all tracked subjects we know about.
if wc {
Expand Down Expand Up @@ -6158,13 +6203,54 @@ func (fs *fileStore) LoadLastMsg(subject string, smv *StoreMsg) (sm *StoreMsg, e
return sm, err
}

// LoadNextMsgMulti will find the next message matching any entry in the sublist.
func (fs *fileStore) LoadNextMsgMulti(sl *Sublist, start uint64, smp *StoreMsg) (sm *StoreMsg, skip uint64, err error) {
if sl == nil {
return fs.LoadNextMsg(_EMPTY_, false, start, smp)
}
fs.mu.RLock()
defer fs.mu.RUnlock()

if fs.closed {
return nil, 0, ErrStoreClosed
}
if fs.state.Msgs == 0 {
return nil, fs.state.LastSeq, ErrStoreEOF
}
if start < fs.state.FirstSeq {
start = fs.state.FirstSeq
}

if bi, _ := fs.selectMsgBlockWithIndex(start); bi >= 0 {
for i := bi; i < len(fs.blks); i++ {
mb := fs.blks[i]
if sm, expireOk, err := mb.firstMatchingMulti(sl, start, smp); err == nil {
if expireOk {
mb.tryForceExpireCache()
}
return sm, sm.seq, nil
} else if err != ErrStoreMsgNotFound {
return nil, 0, err
} else if expireOk {
mb.tryForceExpireCache()
}
}
}

return nil, fs.state.LastSeq, ErrStoreEOF

}

func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *StoreMsg) (*StoreMsg, uint64, error) {
fs.mu.RLock()
defer fs.mu.RUnlock()

if fs.closed {
return nil, 0, ErrStoreClosed
}
if fs.state.Msgs == 0 {
return nil, fs.state.LastSeq, ErrStoreEOF
}
if start < fs.state.FirstSeq {
start = fs.state.FirstSeq
}
Expand Down
Loading