Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NRG (2.11): Don't run catchup when behind on applies #6216

Merged
merged 1 commit into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3303,7 +3303,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
}

if isRecovering || !mset.IsLeader() {
if err := mset.processSnapshot(ss); err != nil {
if err := mset.processSnapshot(ss, ce.Index); err != nil {
return err
}
}
Expand Down Expand Up @@ -8343,11 +8343,12 @@ type streamSyncRequest struct {
FirstSeq uint64 `json:"first_seq"`
LastSeq uint64 `json:"last_seq"`
DeleteRangesOk bool `json:"delete_ranges"`
MinApplied uint64 `json:"min_applied"`
}

// Given a stream state that represents a snapshot, calculate the sync request based on our current state.
// Stream lock must be held.
func (mset *stream) calculateSyncRequest(state *StreamState, snap *StreamReplicatedState) *streamSyncRequest {
func (mset *stream) calculateSyncRequest(state *StreamState, snap *StreamReplicatedState, index uint64) *streamSyncRequest {
// Shouldn't happen, but consequences are pretty bad if we have the lock held and
// our caller tries to take the lock again on panic defer, as in processSnapshot.
if state == nil || snap == nil || mset.node == nil {
Expand All @@ -8357,7 +8358,7 @@ func (mset *stream) calculateSyncRequest(state *StreamState, snap *StreamReplica
if state.LastSeq >= snap.LastSeq {
return nil
}
return &streamSyncRequest{FirstSeq: state.LastSeq + 1, LastSeq: snap.LastSeq, Peer: mset.node.ID(), DeleteRangesOk: true}
return &streamSyncRequest{FirstSeq: state.LastSeq + 1, LastSeq: snap.LastSeq, Peer: mset.node.ID(), DeleteRangesOk: true, MinApplied: index}
}

// processSnapshotDeletes will update our current store based on the snapshot
Expand Down Expand Up @@ -8493,15 +8494,15 @@ var (
)

// Process a stream snapshot.
func (mset *stream) processSnapshot(snap *StreamReplicatedState) (e error) {
func (mset *stream) processSnapshot(snap *StreamReplicatedState, index uint64) (e error) {
// Update any deletes, etc.
mset.processSnapshotDeletes(snap)
mset.setCLFS(snap.Failed)

mset.mu.Lock()
var state StreamState
mset.store.FastState(&state)
sreq := mset.calculateSyncRequest(&state, snap)
sreq := mset.calculateSyncRequest(&state, snap, index)

s, js, subject, n, st := mset.srv, mset.js, mset.sa.Sync, mset.node, mset.cfg.Storage
qname := fmt.Sprintf("[ACC:%s] stream '%s' snapshot", mset.acc.Name, mset.cfg.Name)
Expand Down Expand Up @@ -8639,7 +8640,7 @@ RETRY:
mset.mu.RLock()
var state StreamState
mset.store.FastState(&state)
sreq = mset.calculateSyncRequest(&state, snap)
sreq = mset.calculateSyncRequest(&state, snap, index)
mset.mu.RUnlock()
if sreq == nil {
return nil
Expand Down Expand Up @@ -9187,6 +9188,22 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {

// Setup sequences to walk through.
seq, last := sreq.FirstSeq, sreq.LastSeq

// The follower received a snapshot from another leader, and we've become leader since.
// We have an up-to-date log but could be behind on applies. We must wait until we've reached the minimum required.
// The follower will automatically retry after a timeout, so we can safely return here.
if node := mset.raftNode(); node != nil {
index, _, applied := node.Progress()
// Only skip if our log has enough entries, and they could be applied in the future.
if index >= sreq.MinApplied && applied < sreq.MinApplied {
return
}
// We know here we've either applied enough entries, or our log doesn't have enough entries.
// In the latter case the request expects us to have more. Just continue and value availability here.
// This should only be possible if the logs have already desynced, and we shouldn't have become leader
// in the first place. Not much we can do here in this (hypothetical) scenario.
}

mset.setCatchupPeer(sreq.Peer, last-seq)

// Check if we can compress during this.
Expand Down
88 changes: 88 additions & 0 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6815,6 +6815,94 @@ func TestJetStreamClusterCatchupLoadNextMsgTooManyDeletes(t *testing.T) {
}
}

func TestJetStreamClusterCatchupMustStallWhenBehindOnApplies(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)

_, err = js.Publish("foo", nil)
require_NoError(t, err)

// Reconnect to stream leader.
l := c.streamLeader(globalAccountName, "TEST")
nc.Close()
nc, _ = jsClientConnect(t, l, nats.UserInfo("admin", "s3cr3t!"))
defer nc.Close()

// Setup wiretap and grab stream.
sendSubject := "test-wiretap"
sub, err := nc.SubscribeSync(sendSubject)
require_NoError(t, err)
err = nc.Flush() // Must flush, otherwise our subscription could be too late.
require_NoError(t, err)
acc, err := l.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NoError(t, err)

// We have a message at sequence 1, so expect a successful catchup.
sreq1 := &streamSyncRequest{Peer: "peer", FirstSeq: 1, LastSeq: 1, DeleteRangesOk: true}
require_True(t, mset.srv.startGoRoutine(func() { mset.runCatchup(sendSubject, sreq1) }))
// Expect the message at sequence 1.
msg, err := sub.NextMsg(time.Second)
require_NoError(t, err)
require_Equal(t, entryOp(msg.Data[0]), streamMsgOp)
subj, _, _, _, seq, _, err := decodeStreamMsg(msg.Data[1:])
require_NoError(t, err)
require_Equal(t, seq, 1)
require_Equal(t, subj, "foo")
// And end with EOF.
msg, err = sub.NextMsg(time.Second)
require_NoError(t, err)
require_Len(t, len(msg.Data), 0)

// Add one additional entry into the log that's not applied yet.
n := mset.node.(*raft)
n.Lock()
ae := n.buildAppendEntry(nil)
err = n.storeToWAL(ae)
n.Unlock()
index, commit, applied := n.Progress()
require_NoError(t, err)
require_LessThan(t, applied, index)
require_Equal(t, commit, applied)
// We have a message at sequence 1, but we haven't applied as many append entries.
// We can't fulfill the request right now as we don't know yet if
// that message will be deleted as part of upcoming append entries.
sreq2 := &streamSyncRequest{Peer: "peer", FirstSeq: 1, LastSeq: 1, DeleteRangesOk: true, MinApplied: index}
require_True(t, mset.srv.startGoRoutine(func() { mset.runCatchup(sendSubject, sreq2) }))
_, err = sub.NextMsg(time.Second)
require_Error(t, err, nats.ErrTimeout)

// We have a message at sequence 1, but we haven't applied as many append entries.
// Also, we seem to have a log that doesn't contain enough entries, even though we became leader.
// Something has already gone wrong and got the logs to desync.
// Value availability here and just fulfill the request.
sreq3 := &streamSyncRequest{Peer: "peer", FirstSeq: 1, LastSeq: 1, DeleteRangesOk: true, MinApplied: 100}
require_True(t, mset.srv.startGoRoutine(func() { mset.runCatchup(sendSubject, sreq3) }))
// Expect the message at sequence 1.
msg, err = sub.NextMsg(time.Second)
require_NoError(t, err)
require_Equal(t, entryOp(msg.Data[0]), streamMsgOp)
subj, _, _, _, seq, _, err = decodeStreamMsg(msg.Data[1:])
require_NoError(t, err)
require_Equal(t, seq, 1)
require_Equal(t, subj, "foo")
// And end with EOF.
msg, err = sub.NextMsg(time.Second)
require_NoError(t, err)
require_Len(t, len(msg.Data), 0)
}

//
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.
Expand Down
5 changes: 3 additions & 2 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3997,8 +3997,9 @@ func TestJetStreamClusterDesyncAfterErrorDuringCatchup(t *testing.T) {
// Processing a snapshot while there's no leader elected is considered a cluster reset.
// If a leader is temporarily unavailable we shouldn't blow away our state.
var snap StreamReplicatedState
snap.LastSeq = 1_000 // ensure we can catchup based on the snapshot
err := mset.processSnapshot(&snap)
snap.LastSeq = 1_000 // ensure we can catchup based on the snapshot
appliedIndex := uint64(0) // incorrect index, but doesn't matter for this test
err := mset.processSnapshot(&snap, appliedIndex)
require_True(t, errors.Is(err, errCatchupAbortedNoLeader))
require_True(t, isClusterResetErr(err))
mset.resetClusteredState(err)
Expand Down
2 changes: 1 addition & 1 deletion server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1651,7 +1651,7 @@ func (n *raft) State() RaftState {
func (n *raft) Progress() (index, commit, applied uint64) {
n.RLock()
defer n.RUnlock()
return n.pindex + 1, n.commit, n.applied
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was an off-by-one, but this value was not used within the code. Now we started to use its value.

n.pindex == n.commit == n.appplied means we have a log where every entry is applied.
If we have n.pindex 1 greater than n.commit that means we have one uncommitted entry. If we have n.pindex 1 greater than n.applied but equal to n.commit, that means we have a committed entry that's not applied yet.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Chalk up another ;)

return n.pindex, n.commit, n.applied
}

// Size returns number of entries and total bytes for our WAL.
Expand Down
Loading