From d22ae09093cad48eeda339ec1c1d842783fb4c5e Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Wed, 5 Feb 2025 14:05:22 +0100 Subject: [PATCH] [FIXED] Desync after quit during catchup Signed-off-by: Maurice van Veen --- server/jetstream_cluster.go | 2 +- server/jetstream_cluster_4_test.go | 90 ++++++++++++++++++++++++++++++ 2 files changed, 91 insertions(+), 1 deletion(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index a358d663a69..db2455dc20a 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2511,7 +2511,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps ce.ReturnToPool() } else { // Our stream was closed out from underneath of us, simply return here. - if err == errStreamClosed { + if err == errStreamClosed || err == errCatchupStreamStopped || err == ErrServerNotRunning { aq.recycle(&ces) return } diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 0e4f1920ba8..ff0e167665c 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -4111,6 +4111,96 @@ func TestJetStreamClusterConsumerReplicasAfterScale(t *testing.T) { require_Equal(t, len(ci.Cluster.Replicas), 2) } +func TestJetStreamClusterDesyncAfterQuitDuringCatchup(t *testing.T) { + for title, test := range map[string]func(s *Server, rn RaftNode){ + "RAFT": func(s *Server, rn RaftNode) { + rn.Stop() + rn.WaitForStop() + }, + "server": func(s *Server, rn RaftNode) { + s.running.Store(false) + }, + } { + t.Run(title, func(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + // Wait for all servers to have applied everything up to this point. + checkFor(t, 5*time.Second, 500*time.Millisecond, func() error { + for _, s := range c.servers { + acc, err := s.lookupAccount(globalAccountName) + if err != nil { + return err + } + mset, err := acc.lookupStream("TEST") + if err != nil { + return err + } + _, _, applied := mset.raftNode().Progress() + if applied != 1 { + return fmt.Errorf("expected applied to be %d, got %d", 1, applied) + } + } + return nil + }) + + rs := c.randomNonStreamLeader(globalAccountName, "TEST") + acc, err := rs.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + + rn := mset.raftNode() + snap, err := json.Marshal(streamSnapshot{Msgs: 1, Bytes: 1, FirstSeq: 100, LastSeq: 100, Failed: 0, Deleted: nil}) + require_NoError(t, err) + esm := encodeStreamMsgAllowCompress("foo", _EMPTY_, nil, nil, 0, 0, false) + + // Lock stream so that we can go into processSnapshot but must wait for this to unlock. + mset.mu.Lock() + var unlocked bool + defer func() { + if !unlocked { + mset.mu.Unlock() + } + }() + + _, err = rn.ApplyQ().push(newCommittedEntry(100, []*Entry{newEntry(EntrySnapshot, snap)})) + require_NoError(t, err) + _, err = rn.ApplyQ().push(newCommittedEntry(101, []*Entry{newEntry(EntryNormal, esm)})) + require_NoError(t, err) + + // Waiting for the apply queue entry to be captured in monitorStream first. + time.Sleep(time.Second) + + // Set commit to a very high number, just so that we allow upping Applied() + n := rn.(*raft) + n.Lock() + n.commit = 1000 + n.Unlock() + + // Now stop the underlying RAFT node/server so processSnapshot must exit because of it. + test(rs, rn) + mset.mu.Unlock() + unlocked = true + + // Allow some time for the applied number to be updated, in which case it's an error. + time.Sleep(time.Second) + _, _, applied := mset.raftNode().Progress() + require_Equal(t, applied, 1) + }) + } +} + func TestJetStreamClusterDesyncAfterErrorDuringCatchup(t *testing.T) { tests := []struct { title string