Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] (2.11) Replicated consumer skipped redeliveries #6566

Merged
merged 4 commits into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 27 additions & 24 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1373,8 +1373,14 @@ func (o *consumer) setLeader(isLeader bool) {
o.rdq = nil
o.rdqi.Empty()

// Restore our saved state. During non-leader status we just update our underlying store.
o.readStoredState(lseq)
// Restore our saved state.
// During non-leader status we just update our underlying store when not clustered.
// If clustered we need to propose our initial (possibly skipped ahead) o.sseq to the group.
if o.node == nil || o.dseq > 1 || (o.store != nil && o.store.HasState()) {
o.readStoredState(lseq)
} else if o.node != nil && o.sseq >= 1 {
o.updateSkipped(o.sseq)
}

// Setup initial num pending.
o.streamNumPending()
Expand All @@ -1384,11 +1390,6 @@ func (o *consumer) setLeader(isLeader bool) {
o.lss = nil
}

// Update the group on the our starting sequence if we are starting but we skipped some in the stream.
if o.dseq == 1 && o.sseq > 1 {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is replaced by the change above ^.

o.updateSkipped(o.sseq)
}

// Do info sub.
if o.infoSub == nil && jsa != nil {
isubj := fmt.Sprintf(clusterConsumerInfoT, jsa.acc(), stream, o.name)
Expand Down Expand Up @@ -2811,10 +2812,7 @@ func (o *consumer) applyState(state *ConsumerState) {
return
}

// If o.sseq is greater don't update. Don't go backwards on o.sseq if leader.
if !o.isLeader() || o.sseq <= state.Delivered.Stream {
o.sseq = state.Delivered.Stream + 1
}
o.sseq = state.Delivered.Stream + 1
o.dseq = state.Delivered.Consumer + 1
o.adflr = state.AckFloor.Consumer
o.asflr = state.AckFloor.Stream
Expand Down Expand Up @@ -2972,9 +2970,13 @@ func (o *consumer) infoWithSnapAndReply(snap bool, reply string) *ConsumerInfo {
}
// If we are the leader we could have o.sseq that is skipped ahead.
// To maintain consistency in reporting (e.g. jsz) we always take the state for our delivered/ackfloor stream sequence.
info.Delivered.Consumer, info.Delivered.Stream = state.Delivered.Consumer, state.Delivered.Stream
// Only use skipped ahead o.sseq if we're a new consumer and have not yet replicated this state yet.
leader := o.isLeader()
if !leader || o.store.HasState() {
info.Delivered.Consumer, info.Delivered.Stream = state.Delivered.Consumer, state.Delivered.Stream
}
info.AckFloor.Consumer, info.AckFloor.Stream = state.AckFloor.Consumer, state.AckFloor.Stream
if !o.isLeader() {
if !leader {
info.NumAckPending = len(state.Pending)
info.NumRedelivered = len(state.Redelivered)
}
Expand Down Expand Up @@ -4821,16 +4823,16 @@ func (o *consumer) deliverMsg(dsubj, ackReply string, pmsg *jsPubMsg, dc uint64,
// Update delivered first.
o.updateDelivered(dseq, seq, dc, ts)

// Send message.
o.outq.send(pmsg)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could send a message to the client before o.trackPending captures the sequence below, but it likely doesn't happen in practice. Moving it to be below o.trackPending doesn't hurt.


if ap == AckExplicit || ap == AckAll {
o.trackPending(seq, dseq)
} else if ap == AckNone {
o.adflr = dseq
o.asflr = seq
}

// Send message.
o.outq.send(pmsg)

// Flow control.
if o.maxpb > 0 && o.needFlowControl(psz) {
o.sendFlowControl()
Expand Down Expand Up @@ -5291,13 +5293,13 @@ func (o *consumer) selectStartingSeqNo() {
} else if o.cfg.DeliverPolicy == DeliverLast {
if o.subjf == nil {
o.sseq = state.LastSeq
return
}
// If we are partitioned here this will be properly set when we become leader.
for _, filter := range o.subjf {
ss := o.mset.store.FilteredState(1, filter.subject)
if ss.Last > o.sseq {
o.sseq = ss.Last
} else {
// If we are partitioned here this will be properly set when we become leader.
for _, filter := range o.subjf {
ss := o.mset.store.FilteredState(1, filter.subject)
if ss.Last > o.sseq {
o.sseq = ss.Last
}
}
}
} else if o.cfg.DeliverPolicy == DeliverLastPerSubject {
Expand Down Expand Up @@ -5397,7 +5399,8 @@ func (o *consumer) selectStartingSeqNo() {
// Set ack store floor to store-1
o.asflr = o.sseq - 1
// Set our starting sequence state.
if o.store != nil && o.sseq > 0 {
// But only if we're not clustered, if clustered we propose upon becoming leader.
if o.store != nil && o.sseq > 0 && o.cfg.replicas(&o.mset.cfg) == 1 {
o.store.SetStarting(o.sseq - 1)
}
}
Expand Down
29 changes: 22 additions & 7 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -9884,9 +9884,30 @@ func (o *consumerFileStore) SetStarting(sseq uint64) error {
return o.writeState(buf)
}

// UpdateStarting updates our starting stream sequence.
func (o *consumerFileStore) UpdateStarting(sseq uint64) {
o.mu.Lock()
defer o.mu.Unlock()

if sseq > o.state.Delivered.Stream {
o.state.Delivered.Stream = sseq
// For AckNone just update delivered and ackfloor at the same time.
if o.cfg.AckPolicy == AckNone {
o.state.AckFloor.Stream = sseq
}
}
// Make sure we flush to disk.
o.kickFlusher()
}

// HasState returns if this store has a recorded state.
func (o *consumerFileStore) HasState() bool {
o.mu.Lock()
// We have a running state, or stored on disk but not yet initialized.
if o.state.Delivered.Consumer != 0 || o.state.Delivered.Stream != 0 {
o.mu.Unlock()
return true
}
_, err := os.Stat(o.ifn)
o.mu.Unlock()
return err == nil
Expand Down Expand Up @@ -9939,7 +9960,7 @@ func (o *consumerFileStore) UpdateDelivered(dseq, sseq, dc uint64, ts int64) err
if o.state.Redelivered == nil {
o.state.Redelivered = make(map[uint64]uint64)
}
// Only update if greater then what we already have.
// Only update if greater than what we already have.
if o.state.Redelivered[sseq] < dc-1 {
o.state.Redelivered[sseq] = dc - 1
}
Expand Down Expand Up @@ -9975,12 +9996,6 @@ func (o *consumerFileStore) UpdateAcks(dseq, sseq uint64) error {
return nil
}

// Match leader logic on checking if ack is ahead of delivered.
// This could happen on a cooperative takeover with high speed deliveries.
if sseq > o.state.Delivered.Stream {
o.state.Delivered.Stream = sseq + 1
}

if len(o.state.Pending) == 0 || o.state.Pending[sseq] == nil {
delete(o.state.Redelivered, sseq)
return ErrStoreMsgNotFound
Expand Down
12 changes: 7 additions & 5 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -5056,11 +5056,13 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea
}
case updateSkipOp:
o.mu.Lock()
if !o.isLeader() {
var le = binary.LittleEndian
if sseq := le.Uint64(buf[1:]); sseq > o.sseq {
o.sseq = sseq
}
var le = binary.LittleEndian
sseq := le.Uint64(buf[1:])
if !o.isLeader() && sseq > o.sseq {
o.sseq = sseq
}
if o.store != nil {
o.store.UpdateStarting(sseq - 1)
}
o.mu.Unlock()
case addPendingRequest:
Expand Down
30 changes: 27 additions & 3 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6093,24 +6093,48 @@ func TestJetStreamClusterConsumerDeliveredSyncReporting(t *testing.T) {
require_NoError(t, err)
}

opts := &JSzOptions{Accounts: true, Streams: true, Consumer: true}
for _, s := range c.servers {
jsz, err := s.Jsz(opts)
require_NoError(t, err)
ci := jsz.AccountDetails[0].Streams[0].Consumer[0]
require_Equal(t, ci.Delivered.Consumer, 0)
require_Equal(t, ci.Delivered.Stream, 0)
}

msgs, err := sub.Fetch(1)
require_NoError(t, err)
require_Equal(t, len(msgs), 1)
meta, err := msgs[0].Metadata()
require_NoError(t, err)
require_Equal(t, meta.Sequence.Consumer, 1)
require_Equal(t, meta.Sequence.Stream, 1)

// Allow some time for the state to propagate.
maxWait := 200 * time.Millisecond
time.Sleep(maxWait)

for _, s := range c.servers {
jsz, err := s.Jsz(opts)
require_NoError(t, err)
ci := jsz.AccountDetails[0].Streams[0].Consumer[0]
require_Equal(t, ci.Delivered.Consumer, 1)
require_Equal(t, ci.Delivered.Stream, 1)
}

// Now we want to make sure that jsz reporting will show the same
// state, including delivered, which will have skipped to the end.
// The skip can happen on several factors, but for here we just send
// another pull request which we will let fail.
_, err = sub.Fetch(1, nats.MaxWait(200*time.Millisecond))
_, err = sub.Fetch(1, nats.MaxWait(maxWait))
require_Error(t, err)

opts := &JSzOptions{Accounts: true, Streams: true, Consumer: true}
for _, s := range c.servers {
jsz, err := s.Jsz(opts)
require_NoError(t, err)
ci := jsz.AccountDetails[0].Streams[0].Consumer[0]
require_Equal(t, ci.Delivered.Consumer, 1)
require_Equal(t, ci.Delivered.Stream, 1)
require_Equal(t, ci.Delivered.Stream, 11)
}
}

Expand Down
Loading