Skip to content

Commit e79e505

Browse files
NRG (2.11): Don't run catchup when behind on applies (#6216)
When a server restarts and is behind enough it will require to be caught up from a snapshot. If after receiving a snapshot from the leader the leader itself shuts down, the remaining server (in a R3 scenario) will become leader. If this new leader is behind on applies it should not fulfill the catchup request. Messages that would be returned as part of the catchup might be deleted as part of the unapplied append entries. And sending these messages over to the follower would mean the follower wouldn't be able to remove them as part of the append entries if they were meant to be deleted. Either way, the new leader is temporarily unable to fulfill the catchup request and must wait for its applies to reach the minimum required for the catchup response to be valid. Signed-off-by: Maurice van Veen <[email protected]>
2 parents de35c8b + 3392957 commit e79e505

4 files changed

+115
-9
lines changed

server/jetstream_cluster.go

+23-6
Original file line numberDiff line numberDiff line change
@@ -3303,7 +3303,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
33033303
}
33043304

33053305
if isRecovering || !mset.IsLeader() {
3306-
if err := mset.processSnapshot(ss); err != nil {
3306+
if err := mset.processSnapshot(ss, ce.Index); err != nil {
33073307
return err
33083308
}
33093309
}
@@ -8343,11 +8343,12 @@ type streamSyncRequest struct {
83438343
FirstSeq uint64 `json:"first_seq"`
83448344
LastSeq uint64 `json:"last_seq"`
83458345
DeleteRangesOk bool `json:"delete_ranges"`
8346+
MinApplied uint64 `json:"min_applied"`
83468347
}
83478348

83488349
// Given a stream state that represents a snapshot, calculate the sync request based on our current state.
83498350
// Stream lock must be held.
8350-
func (mset *stream) calculateSyncRequest(state *StreamState, snap *StreamReplicatedState) *streamSyncRequest {
8351+
func (mset *stream) calculateSyncRequest(state *StreamState, snap *StreamReplicatedState, index uint64) *streamSyncRequest {
83518352
// Shouldn't happen, but consequences are pretty bad if we have the lock held and
83528353
// our caller tries to take the lock again on panic defer, as in processSnapshot.
83538354
if state == nil || snap == nil || mset.node == nil {
@@ -8357,7 +8358,7 @@ func (mset *stream) calculateSyncRequest(state *StreamState, snap *StreamReplica
83578358
if state.LastSeq >= snap.LastSeq {
83588359
return nil
83598360
}
8360-
return &streamSyncRequest{FirstSeq: state.LastSeq + 1, LastSeq: snap.LastSeq, Peer: mset.node.ID(), DeleteRangesOk: true}
8361+
return &streamSyncRequest{FirstSeq: state.LastSeq + 1, LastSeq: snap.LastSeq, Peer: mset.node.ID(), DeleteRangesOk: true, MinApplied: index}
83618362
}
83628363

83638364
// processSnapshotDeletes will update our current store based on the snapshot
@@ -8493,15 +8494,15 @@ var (
84938494
)
84948495

84958496
// Process a stream snapshot.
8496-
func (mset *stream) processSnapshot(snap *StreamReplicatedState) (e error) {
8497+
func (mset *stream) processSnapshot(snap *StreamReplicatedState, index uint64) (e error) {
84978498
// Update any deletes, etc.
84988499
mset.processSnapshotDeletes(snap)
84998500
mset.setCLFS(snap.Failed)
85008501

85018502
mset.mu.Lock()
85028503
var state StreamState
85038504
mset.store.FastState(&state)
8504-
sreq := mset.calculateSyncRequest(&state, snap)
8505+
sreq := mset.calculateSyncRequest(&state, snap, index)
85058506

85068507
s, js, subject, n, st := mset.srv, mset.js, mset.sa.Sync, mset.node, mset.cfg.Storage
85078508
qname := fmt.Sprintf("[ACC:%s] stream '%s' snapshot", mset.acc.Name, mset.cfg.Name)
@@ -8639,7 +8640,7 @@ RETRY:
86398640
mset.mu.RLock()
86408641
var state StreamState
86418642
mset.store.FastState(&state)
8642-
sreq = mset.calculateSyncRequest(&state, snap)
8643+
sreq = mset.calculateSyncRequest(&state, snap, index)
86438644
mset.mu.RUnlock()
86448645
if sreq == nil {
86458646
return nil
@@ -9187,6 +9188,22 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
91879188

91889189
// Setup sequences to walk through.
91899190
seq, last := sreq.FirstSeq, sreq.LastSeq
9191+
9192+
// The follower received a snapshot from another leader, and we've become leader since.
9193+
// We have an up-to-date log but could be behind on applies. We must wait until we've reached the minimum required.
9194+
// The follower will automatically retry after a timeout, so we can safely return here.
9195+
if node := mset.raftNode(); node != nil {
9196+
index, _, applied := node.Progress()
9197+
// Only skip if our log has enough entries, and they could be applied in the future.
9198+
if index >= sreq.MinApplied && applied < sreq.MinApplied {
9199+
return
9200+
}
9201+
// We know here we've either applied enough entries, or our log doesn't have enough entries.
9202+
// In the latter case the request expects us to have more. Just continue and value availability here.
9203+
// This should only be possible if the logs have already desynced, and we shouldn't have become leader
9204+
// in the first place. Not much we can do here in this (hypothetical) scenario.
9205+
}
9206+
91909207
mset.setCatchupPeer(sreq.Peer, last-seq)
91919208

91929209
// Check if we can compress during this.

server/jetstream_cluster_1_test.go

+88
Original file line numberDiff line numberDiff line change
@@ -6815,6 +6815,94 @@ func TestJetStreamClusterCatchupLoadNextMsgTooManyDeletes(t *testing.T) {
68156815
}
68166816
}
68176817

6818+
func TestJetStreamClusterCatchupMustStallWhenBehindOnApplies(t *testing.T) {
6819+
c := createJetStreamClusterExplicit(t, "R3S", 3)
6820+
defer c.shutdown()
6821+
6822+
nc, js := jsClientConnect(t, c.randomServer())
6823+
defer nc.Close()
6824+
6825+
_, err := js.AddStream(&nats.StreamConfig{
6826+
Name: "TEST",
6827+
Subjects: []string{"foo"},
6828+
Replicas: 3,
6829+
})
6830+
require_NoError(t, err)
6831+
6832+
_, err = js.Publish("foo", nil)
6833+
require_NoError(t, err)
6834+
6835+
// Reconnect to stream leader.
6836+
l := c.streamLeader(globalAccountName, "TEST")
6837+
nc.Close()
6838+
nc, _ = jsClientConnect(t, l, nats.UserInfo("admin", "s3cr3t!"))
6839+
defer nc.Close()
6840+
6841+
// Setup wiretap and grab stream.
6842+
sendSubject := "test-wiretap"
6843+
sub, err := nc.SubscribeSync(sendSubject)
6844+
require_NoError(t, err)
6845+
err = nc.Flush() // Must flush, otherwise our subscription could be too late.
6846+
require_NoError(t, err)
6847+
acc, err := l.lookupAccount(globalAccountName)
6848+
require_NoError(t, err)
6849+
mset, err := acc.lookupStream("TEST")
6850+
require_NoError(t, err)
6851+
6852+
// We have a message at sequence 1, so expect a successful catchup.
6853+
sreq1 := &streamSyncRequest{Peer: "peer", FirstSeq: 1, LastSeq: 1, DeleteRangesOk: true}
6854+
require_True(t, mset.srv.startGoRoutine(func() { mset.runCatchup(sendSubject, sreq1) }))
6855+
// Expect the message at sequence 1.
6856+
msg, err := sub.NextMsg(time.Second)
6857+
require_NoError(t, err)
6858+
require_Equal(t, entryOp(msg.Data[0]), streamMsgOp)
6859+
subj, _, _, _, seq, _, err := decodeStreamMsg(msg.Data[1:])
6860+
require_NoError(t, err)
6861+
require_Equal(t, seq, 1)
6862+
require_Equal(t, subj, "foo")
6863+
// And end with EOF.
6864+
msg, err = sub.NextMsg(time.Second)
6865+
require_NoError(t, err)
6866+
require_Len(t, len(msg.Data), 0)
6867+
6868+
// Add one additional entry into the log that's not applied yet.
6869+
n := mset.node.(*raft)
6870+
n.Lock()
6871+
ae := n.buildAppendEntry(nil)
6872+
err = n.storeToWAL(ae)
6873+
n.Unlock()
6874+
index, commit, applied := n.Progress()
6875+
require_NoError(t, err)
6876+
require_LessThan(t, applied, index)
6877+
require_Equal(t, commit, applied)
6878+
// We have a message at sequence 1, but we haven't applied as many append entries.
6879+
// We can't fulfill the request right now as we don't know yet if
6880+
// that message will be deleted as part of upcoming append entries.
6881+
sreq2 := &streamSyncRequest{Peer: "peer", FirstSeq: 1, LastSeq: 1, DeleteRangesOk: true, MinApplied: index}
6882+
require_True(t, mset.srv.startGoRoutine(func() { mset.runCatchup(sendSubject, sreq2) }))
6883+
_, err = sub.NextMsg(time.Second)
6884+
require_Error(t, err, nats.ErrTimeout)
6885+
6886+
// We have a message at sequence 1, but we haven't applied as many append entries.
6887+
// Also, we seem to have a log that doesn't contain enough entries, even though we became leader.
6888+
// Something has already gone wrong and got the logs to desync.
6889+
// Value availability here and just fulfill the request.
6890+
sreq3 := &streamSyncRequest{Peer: "peer", FirstSeq: 1, LastSeq: 1, DeleteRangesOk: true, MinApplied: 100}
6891+
require_True(t, mset.srv.startGoRoutine(func() { mset.runCatchup(sendSubject, sreq3) }))
6892+
// Expect the message at sequence 1.
6893+
msg, err = sub.NextMsg(time.Second)
6894+
require_NoError(t, err)
6895+
require_Equal(t, entryOp(msg.Data[0]), streamMsgOp)
6896+
subj, _, _, _, seq, _, err = decodeStreamMsg(msg.Data[1:])
6897+
require_NoError(t, err)
6898+
require_Equal(t, seq, 1)
6899+
require_Equal(t, subj, "foo")
6900+
// And end with EOF.
6901+
msg, err = sub.NextMsg(time.Second)
6902+
require_NoError(t, err)
6903+
require_Len(t, len(msg.Data), 0)
6904+
}
6905+
68186906
//
68196907
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
68206908
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.

server/jetstream_cluster_4_test.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -3997,8 +3997,9 @@ func TestJetStreamClusterDesyncAfterErrorDuringCatchup(t *testing.T) {
39973997
// Processing a snapshot while there's no leader elected is considered a cluster reset.
39983998
// If a leader is temporarily unavailable we shouldn't blow away our state.
39993999
var snap StreamReplicatedState
4000-
snap.LastSeq = 1_000 // ensure we can catchup based on the snapshot
4001-
err := mset.processSnapshot(&snap)
4000+
snap.LastSeq = 1_000 // ensure we can catchup based on the snapshot
4001+
appliedIndex := uint64(0) // incorrect index, but doesn't matter for this test
4002+
err := mset.processSnapshot(&snap, appliedIndex)
40024003
require_True(t, errors.Is(err, errCatchupAbortedNoLeader))
40034004
require_True(t, isClusterResetErr(err))
40044005
mset.resetClusteredState(err)

server/raft.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1651,7 +1651,7 @@ func (n *raft) State() RaftState {
16511651
func (n *raft) Progress() (index, commit, applied uint64) {
16521652
n.RLock()
16531653
defer n.RUnlock()
1654-
return n.pindex + 1, n.commit, n.applied
1654+
return n.pindex, n.commit, n.applied
16551655
}
16561656

16571657
// Size returns number of entries and total bytes for our WAL.

0 commit comments

Comments
 (0)