Skip to content

Commit a4711ee

Browse files
NRG: Don't run catchup when behind on applies
Signed-off-by: Maurice van Veen <[email protected]>
1 parent 436340a commit a4711ee

4 files changed

+85
-8
lines changed

server/jetstream_cluster.go

+15-6
Original file line numberDiff line numberDiff line change
@@ -3303,7 +3303,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
33033303
}
33043304

33053305
if isRecovering || !mset.IsLeader() {
3306-
if err := mset.processSnapshot(ss); err != nil {
3306+
if err := mset.processSnapshot(ss, ce.Index); err != nil {
33073307
return err
33083308
}
33093309
}
@@ -8343,11 +8343,12 @@ type streamSyncRequest struct {
83438343
FirstSeq uint64 `json:"first_seq"`
83448344
LastSeq uint64 `json:"last_seq"`
83458345
DeleteRangesOk bool `json:"delete_ranges"`
8346+
MinApplied uint64 `json:"min_applied"`
83468347
}
83478348

83488349
// Given a stream state that represents a snapshot, calculate the sync request based on our current state.
83498350
// Stream lock must be held.
8350-
func (mset *stream) calculateSyncRequest(state *StreamState, snap *StreamReplicatedState) *streamSyncRequest {
8351+
func (mset *stream) calculateSyncRequest(state *StreamState, snap *StreamReplicatedState, index uint64) *streamSyncRequest {
83518352
// Shouldn't happen, but consequences are pretty bad if we have the lock held and
83528353
// our caller tries to take the lock again on panic defer, as in processSnapshot.
83538354
if state == nil || snap == nil || mset.node == nil {
@@ -8357,7 +8358,7 @@ func (mset *stream) calculateSyncRequest(state *StreamState, snap *StreamReplica
83578358
if state.LastSeq >= snap.LastSeq {
83588359
return nil
83598360
}
8360-
return &streamSyncRequest{FirstSeq: state.LastSeq + 1, LastSeq: snap.LastSeq, Peer: mset.node.ID(), DeleteRangesOk: true}
8361+
return &streamSyncRequest{FirstSeq: state.LastSeq + 1, LastSeq: snap.LastSeq, Peer: mset.node.ID(), DeleteRangesOk: true, MinApplied: index}
83618362
}
83628363

83638364
// processSnapshotDeletes will update our current store based on the snapshot
@@ -8493,15 +8494,15 @@ var (
84938494
)
84948495

84958496
// Process a stream snapshot.
8496-
func (mset *stream) processSnapshot(snap *StreamReplicatedState) (e error) {
8497+
func (mset *stream) processSnapshot(snap *StreamReplicatedState, index uint64) (e error) {
84978498
// Update any deletes, etc.
84988499
mset.processSnapshotDeletes(snap)
84998500
mset.setCLFS(snap.Failed)
85008501

85018502
mset.mu.Lock()
85028503
var state StreamState
85038504
mset.store.FastState(&state)
8504-
sreq := mset.calculateSyncRequest(&state, snap)
8505+
sreq := mset.calculateSyncRequest(&state, snap, index)
85058506

85068507
s, js, subject, n, st := mset.srv, mset.js, mset.sa.Sync, mset.node, mset.cfg.Storage
85078508
qname := fmt.Sprintf("[ACC:%s] stream '%s' snapshot", mset.acc.Name, mset.cfg.Name)
@@ -8639,7 +8640,7 @@ RETRY:
86398640
mset.mu.RLock()
86408641
var state StreamState
86418642
mset.store.FastState(&state)
8642-
sreq = mset.calculateSyncRequest(&state, snap)
8643+
sreq = mset.calculateSyncRequest(&state, snap, index)
86438644
mset.mu.RUnlock()
86448645
if sreq == nil {
86458646
return nil
@@ -9187,6 +9188,14 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
91879188

91889189
// Setup sequences to walk through.
91899190
seq, last := sreq.FirstSeq, sreq.LastSeq
9191+
9192+
// The follower received a snapshot from another leader, and we've become leader since.
9193+
// We have an up-to-date log but are behind on applies. We must wait until we've reached the minimum required.
9194+
// The follower will automatically retry after a timeout, so we can safely return here.
9195+
if node := mset.raftNode(); node != nil && !node.HasApplied(sreq.MinApplied) {
9196+
return
9197+
}
9198+
91909199
mset.setCatchupPeer(sreq.Peer, last-seq)
91919200

91929201
// Check if we can compress during this.

server/jetstream_cluster_1_test.go

+59
Original file line numberDiff line numberDiff line change
@@ -6815,6 +6815,65 @@ func TestJetStreamClusterCatchupLoadNextMsgTooManyDeletes(t *testing.T) {
68156815
}
68166816
}
68176817

6818+
func TestJetStreamClusterCatchupMustStallWhenBehindOnApplies(t *testing.T) {
6819+
c := createJetStreamClusterExplicit(t, "R3S", 3)
6820+
defer c.shutdown()
6821+
6822+
nc, js := jsClientConnect(t, c.randomServer())
6823+
defer nc.Close()
6824+
6825+
_, err := js.AddStream(&nats.StreamConfig{
6826+
Name: "TEST",
6827+
Subjects: []string{"foo"},
6828+
Replicas: 3,
6829+
})
6830+
require_NoError(t, err)
6831+
6832+
_, err = js.Publish("foo", nil)
6833+
require_NoError(t, err)
6834+
6835+
// Reconnect to stream leader.
6836+
l := c.streamLeader(globalAccountName, "TEST")
6837+
nc.Close()
6838+
nc, _ = jsClientConnect(t, l, nats.UserInfo("admin", "s3cr3t!"))
6839+
defer nc.Close()
6840+
6841+
// Setup wiretap and grab stream.
6842+
sendSubject := "test-wiretap"
6843+
sub, err := nc.SubscribeSync(sendSubject)
6844+
require_NoError(t, err)
6845+
err = nc.Flush() // Must flush, otherwise our subscription could be too late.
6846+
require_NoError(t, err)
6847+
acc, err := l.lookupAccount(globalAccountName)
6848+
require_NoError(t, err)
6849+
mset, err := acc.lookupStream("TEST")
6850+
require_NoError(t, err)
6851+
6852+
// We have a message at sequence 1, so expect a successful catchup.
6853+
sreq1 := &streamSyncRequest{Peer: "peer", FirstSeq: 1, LastSeq: 1, DeleteRangesOk: true}
6854+
require_True(t, mset.srv.startGoRoutine(func() { mset.runCatchup(sendSubject, sreq1) }))
6855+
// Expect the message at sequence 1.
6856+
msg, err := sub.NextMsg(time.Second)
6857+
require_NoError(t, err)
6858+
require_Equal(t, entryOp(msg.Data[0]), streamMsgOp)
6859+
subj, _, _, _, seq, _, err := decodeStreamMsg(msg.Data[1:])
6860+
require_NoError(t, err)
6861+
require_Equal(t, seq, 1)
6862+
require_Equal(t, subj, "foo")
6863+
// And end with EOF.
6864+
msg, err = sub.NextMsg(time.Second)
6865+
require_NoError(t, err)
6866+
require_Len(t, len(msg.Data), 0)
6867+
6868+
// We have a message at sequence 1, but we haven't applied as many append entries.
6869+
// We can't fulfill the request right now as we don't know yet if
6870+
// that message will be deleted as part of upcoming append entries.
6871+
sreq3 := &streamSyncRequest{Peer: "peer", FirstSeq: 1, LastSeq: 1, DeleteRangesOk: true, MinApplied: 100}
6872+
require_True(t, mset.srv.startGoRoutine(func() { mset.runCatchup(sendSubject, sreq3) }))
6873+
_, err = sub.NextMsg(time.Second)
6874+
require_Error(t, err, nats.ErrTimeout)
6875+
}
6876+
68186877
//
68196878
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
68206879
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.

server/jetstream_cluster_4_test.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -3997,8 +3997,9 @@ func TestJetStreamClusterDesyncAfterErrorDuringCatchup(t *testing.T) {
39973997
// Processing a snapshot while there's no leader elected is considered a cluster reset.
39983998
// If a leader is temporarily unavailable we shouldn't blow away our state.
39993999
var snap StreamReplicatedState
4000-
snap.LastSeq = 1_000 // ensure we can catchup based on the snapshot
4001-
err := mset.processSnapshot(&snap)
4000+
snap.LastSeq = 1_000 // ensure we can catchup based on the snapshot
4001+
appliedIndex := uint64(0) // incorrect index, but doesn't matter for this test
4002+
err := mset.processSnapshot(&snap, appliedIndex)
40024003
require_True(t, errors.Is(err, errCatchupAbortedNoLeader))
40034004
require_True(t, isClusterResetErr(err))
40044005
mset.resetClusteredState(err)

server/raft.go

+8
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ type RaftNode interface {
4444
SendSnapshot(snap []byte) error
4545
NeedSnapshot() bool
4646
Applied(index uint64) (entries uint64, bytes uint64)
47+
HasApplied(index uint64) bool
4748
State() RaftState
4849
Size() (entries, bytes uint64)
4950
Progress() (index, commit, applied uint64)
@@ -1084,6 +1085,13 @@ func (n *raft) Applied(index uint64) (entries uint64, bytes uint64) {
10841085
return entries, bytes
10851086
}
10861087

1088+
// HasApplied returns whether a minimum index has been applied.
1089+
func (n *raft) HasApplied(index uint64) bool {
1090+
n.RLock()
1091+
defer n.RUnlock()
1092+
return n.applied >= index
1093+
}
1094+
10871095
// For capturing data needed by snapshot.
10881096
type snapshot struct {
10891097
lastTerm uint64

0 commit comments

Comments
 (0)