From d3ab9f83dfc65b0dde8a5975da55ad046ab2de35 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Fri, 30 Aug 2024 16:00:20 +0200 Subject: [PATCH 01/12] (2.11) ADR-44: JetStream Asset Versioning in Metadata Signed-off-by: Maurice van Veen --- server/jetstream.go | 1 + server/jetstream_api.go | 24 +- server/jetstream_cluster.go | 11 + server/jetstream_versioning.go | 99 +++++ server/jetstream_versioning_test.go | 386 ++++++++++++++++++ .../restore_empty_R1F_stream/backup.json | 35 ++ .../restore_empty_R1F_stream/stream.tar.s2 | Bin 0 -> 861 bytes .../restore_empty_R3F_stream/backup.json | 35 ++ .../restore_empty_R3F_stream/stream.tar.s2 | Bin 0 -> 861 bytes 9 files changed, 589 insertions(+), 2 deletions(-) create mode 100644 server/jetstream_versioning.go create mode 100644 server/jetstream_versioning_test.go create mode 100644 test/configs/jetstream/restore_empty_R1F_stream/backup.json create mode 100644 test/configs/jetstream/restore_empty_R1F_stream/stream.tar.s2 create mode 100644 test/configs/jetstream/restore_empty_R3F_stream/backup.json create mode 100644 test/configs/jetstream/restore_empty_R3F_stream/stream.tar.s2 diff --git a/server/jetstream.go b/server/jetstream.go index 3a986cccd4a..3b9132be4f6 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -467,6 +467,7 @@ func (s *Server) enableJetStream(cfg JetStreamConfig) error { s.Noticef(" TPM File: %q, Pcr: %d", opts.JetStreamTpm.KeysFile, opts.JetStreamTpm.Pcr) } + s.Noticef(" API Level: %s", JSApiLevel) s.Noticef("-------------------------------------------") // Setup our internal subscriptions. diff --git a/server/jetstream_api.go b/server/jetstream_api.go index f33c4b3953f..dbd93015d24 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1422,6 +1422,9 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account, return } + // Initialize asset version metadata. + setStreamAssetVersionMetadata(&cfg.StreamConfig, nil) + streamName := streamNameFromSubject(subject) if streamName != cfg.Name { resp.Error = NewJSStreamMismatchError() @@ -1557,6 +1560,9 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account, return } + // Update asset version metadata. + setStreamAssetVersionMetadata(&cfg, &mset.cfg) + if err := mset.updatePedantic(&cfg, ncfg.Pedantic); err != nil { resp.Error = NewJSStreamUpdateError(err, Unless(err)) s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) @@ -4028,12 +4034,17 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun return } - // If the consumer already exists then don't allow updating the PauseUntil, just set - // it back to whatever the current configured value is. + var oldCfg *ConsumerConfig if o := stream.lookupConsumer(consumerName); o != nil { + oldCfg = &o.cfg + // If the consumer already exists then don't allow updating the PauseUntil, just set + // it back to whatever the current configured value is. req.Config.PauseUntil = o.cfg.PauseUntil } + // Initialize/update asset version metadata. + setConsumerAssetVersionMetadata(&req.Config, oldCfg) + o, err := stream.addConsumerWithAction(&req.Config, req.Action, req.Pedantic) if err != nil { @@ -4589,6 +4600,11 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account } else { nca.Config.PauseUntil = nil } + + // Update asset version metadata due to updating pause/resume. + // Only PauseUntil is updated above, so reuse config for both. + setConsumerAssetVersionMetadata(nca.Config, nca.Config) + eca := encodeAddConsumerAssignment(&nca) cc.meta.Propose(eca) @@ -4622,6 +4638,10 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account ncfg.PauseUntil = nil } + // Update asset version metadata due to updating pause/resume. + // Only PauseUntil is updated above, so reuse config for both. + setConsumerAssetVersionMetadata(&ncfg, &ncfg) + if err := obs.updateConfig(&ncfg); err != nil { // The only type of error that should be returned here is from o.store, // so use a store failed error type. diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index a59ae021234..64f142e0942 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -6244,6 +6244,10 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } + + // Update asset version metadata. + setStreamAssetVersionMetadata(cfg, osa.Config) + var newCfg *StreamConfig if jsa := js.accounts[acc.Name]; jsa != nil { js.mu.Unlock() @@ -7296,6 +7300,13 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec } } + // Initialize/update asset version metadata. + var oldCfg *ConsumerConfig + if ca != nil { + oldCfg = ca.Config + } + setConsumerAssetVersionMetadata(cfg, oldCfg) + // If this is new consumer. if ca == nil { if action == ActionUpdate { diff --git a/server/jetstream_versioning.go b/server/jetstream_versioning.go new file mode 100644 index 00000000000..57ca0506509 --- /dev/null +++ b/server/jetstream_versioning.go @@ -0,0 +1,99 @@ +// Copyright 2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +const ( + // JSApiLevel is the maximum supported JetStream API level for this server. + JSApiLevel = "1" + + JSCreatedVersionMetadataKey = "_nats.created.server.version" + JSCreatedLevelMetadataKey = "_nats.created.server.api_level" + JSRequiredLevelMetadataKey = "_nats.server.require.api_level" +) + +// setStreamAssetVersionMetadata sets JetStream stream metadata, like the server version and API level. +// Given: +// - cfg!=nil, prevCfg==nil add stream: adds created and required metadata +// - cfg!=nil, prevCfg!=nil update stream: created metadata is preserved, required metadata is updated +func setStreamAssetVersionMetadata(cfg *StreamConfig, prevCfg *StreamConfig) { + if cfg.Metadata == nil { + cfg.Metadata = make(map[string]string) + } + + var prevMetadata map[string]string + if prevCfg != nil { + prevMetadata = prevCfg.Metadata + if prevMetadata == nil { + // Initialize to empty to indicate we had a previous config but metadata was missing. + prevMetadata = make(map[string]string) + } + } + preserveAssetCreatedVersionMetadata(cfg.Metadata, prevMetadata) + + requiredApiLevel := "0" + cfg.Metadata[JSRequiredLevelMetadataKey] = requiredApiLevel +} + +// setConsumerAssetVersionMetadata sets JetStream consumer metadata, like the server version and API level. +// Given: +// - cfg!=nil, prevCfg==nil add consumer: adds created and required metadata +// - cfg!=nil, prevCfg!=nil update consumer: created metadata is preserved, required metadata is updated +func setConsumerAssetVersionMetadata(cfg *ConsumerConfig, prevCfg *ConsumerConfig) { + if cfg.Metadata == nil { + cfg.Metadata = make(map[string]string) + } + + var prevMetadata map[string]string + if prevCfg != nil { + prevMetadata = prevCfg.Metadata + if prevMetadata == nil { + // Initialize to empty to indicate we had a previous config but metadata was missing. + prevMetadata = make(map[string]string) + } + } + preserveAssetCreatedVersionMetadata(cfg.Metadata, prevMetadata) + + requiredApiLevel := "0" + + // Added in 2.11, absent | zero is the feature is not used. + // one could be stricter and say even if its set but the time + // has already passed it is also not needed to restore the consumer + if cfg.PauseUntil != nil && !cfg.PauseUntil.IsZero() { + requiredApiLevel = "1" + } + + cfg.Metadata[JSRequiredLevelMetadataKey] = requiredApiLevel +} + +// preserveAssetCreatedVersionMetadata sets metadata to contain which version and API level the asset was created on. +// Preserves previous metadata, if not set it initializes versions for the metadata. +func preserveAssetCreatedVersionMetadata(metadata, prevMetadata map[string]string) { + if prevMetadata == nil { + metadata[JSCreatedVersionMetadataKey] = VERSION + metadata[JSCreatedLevelMetadataKey] = JSApiLevel + return + } + + // Preserve previous metadata if it was set, but delete if not since it could be user-provided. + if v := prevMetadata[JSCreatedVersionMetadataKey]; v != _EMPTY_ { + metadata[JSCreatedVersionMetadataKey] = v + } else { + delete(metadata, JSCreatedVersionMetadataKey) + } + if v := prevMetadata[JSCreatedLevelMetadataKey]; v != _EMPTY_ { + metadata[JSCreatedLevelMetadataKey] = v + } else { + delete(metadata, JSCreatedLevelMetadataKey) + } +} diff --git a/server/jetstream_versioning_test.go b/server/jetstream_versioning_test.go new file mode 100644 index 00000000000..2a4dbca1da1 --- /dev/null +++ b/server/jetstream_versioning_test.go @@ -0,0 +1,386 @@ +// Copyright 2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "encoding/json" + "fmt" + "github.com/nats-io/nats.go" + "io" + "os" + "testing" + "time" +) + +func metadataAllSet(featureLevel string) map[string]string { + return map[string]string{ + JSCreatedVersionMetadataKey: VERSION, + JSCreatedLevelMetadataKey: JSApiLevel, + JSRequiredLevelMetadataKey: featureLevel, + } +} + +func metadataPrevious() map[string]string { + return map[string]string{ + JSCreatedVersionMetadataKey: "previous-version", + JSCreatedLevelMetadataKey: "previous-level", + JSRequiredLevelMetadataKey: "previous-level", + } +} + +func metadataUpdatedPrevious(featureLevel string) map[string]string { + return map[string]string{ + JSCreatedVersionMetadataKey: "previous-version", + JSCreatedLevelMetadataKey: "previous-level", + JSRequiredLevelMetadataKey: featureLevel, + } +} + +func metadataEmpty() map[string]string { + return map[string]string{ + JSRequiredLevelMetadataKey: "0", + } +} + +func TestJetStreamSetStreamAssetVersionMetadata(t *testing.T) { + for _, test := range []struct { + desc string + cfg *StreamConfig + prev *StreamConfig + expectedMetadata map[string]string + }{ + { + desc: "create", + cfg: &StreamConfig{}, + prev: nil, + expectedMetadata: metadataAllSet("0"), + }, + { + desc: "create/overwrite-user-provided", + cfg: &StreamConfig{Metadata: metadataPrevious()}, + prev: nil, + expectedMetadata: metadataAllSet("0"), + }, + { + desc: "update", + cfg: &StreamConfig{}, + prev: &StreamConfig{Metadata: metadataPrevious()}, + expectedMetadata: metadataUpdatedPrevious("0"), + }, + { + desc: "update/empty-prev-metadata", + cfg: &StreamConfig{}, + prev: &StreamConfig{}, + expectedMetadata: metadataEmpty(), + }, + { + desc: "update/empty-prev-metadata/delete-user-provided", + cfg: &StreamConfig{Metadata: metadataPrevious()}, + prev: &StreamConfig{}, + expectedMetadata: metadataEmpty(), + }, + } { + t.Run(test.desc, func(t *testing.T) { + setStreamAssetVersionMetadata(test.cfg, test.prev) + require_Equal(t, test.cfg.Metadata[JSCreatedVersionMetadataKey], test.expectedMetadata[JSCreatedVersionMetadataKey]) + require_Equal(t, test.cfg.Metadata[JSCreatedLevelMetadataKey], test.expectedMetadata[JSCreatedLevelMetadataKey]) + require_Equal(t, test.cfg.Metadata[JSRequiredLevelMetadataKey], test.expectedMetadata[JSRequiredLevelMetadataKey]) + }) + } +} + +func TestJetStreamSetConsumerAssetVersionMetadata(t *testing.T) { + pauseUntil := time.Unix(0, 0) + pauseUntilZero := time.Time{} + for _, test := range []struct { + desc string + cfg *ConsumerConfig + prev *ConsumerConfig + expectedMetadata map[string]string + }{ + { + desc: "create", + cfg: &ConsumerConfig{}, + prev: nil, + expectedMetadata: metadataAllSet("0"), + }, + { + desc: "create/PauseUntil/zero", + cfg: &ConsumerConfig{PauseUntil: &pauseUntilZero}, + prev: nil, + expectedMetadata: metadataAllSet("0"), + }, + { + desc: "create/PauseUntil", + cfg: &ConsumerConfig{PauseUntil: &pauseUntil}, + prev: nil, + expectedMetadata: metadataAllSet("1"), + }, + { + desc: "create/overwrite-user-provided", + cfg: &ConsumerConfig{Metadata: metadataPrevious()}, + prev: nil, + expectedMetadata: metadataAllSet("0"), + }, + { + desc: "update", + cfg: &ConsumerConfig{}, + prev: &ConsumerConfig{Metadata: metadataPrevious()}, + expectedMetadata: metadataUpdatedPrevious("0"), + }, + { + desc: "create/PauseUntil/zero", + cfg: &ConsumerConfig{PauseUntil: &pauseUntilZero}, + prev: &ConsumerConfig{Metadata: metadataPrevious()}, + expectedMetadata: metadataUpdatedPrevious("0"), + }, + { + desc: "update/PauseUntil", + cfg: &ConsumerConfig{PauseUntil: &pauseUntil}, + prev: &ConsumerConfig{Metadata: metadataPrevious()}, + expectedMetadata: metadataUpdatedPrevious("1"), + }, + { + desc: "update/empty-prev-metadata", + cfg: &ConsumerConfig{}, + prev: &ConsumerConfig{}, + expectedMetadata: metadataEmpty(), + }, + { + desc: "update/empty-prev-metadata/delete-user-provided", + cfg: &ConsumerConfig{Metadata: metadataPrevious()}, + prev: &ConsumerConfig{}, + expectedMetadata: metadataEmpty(), + }, + } { + t.Run(test.desc, func(t *testing.T) { + setConsumerAssetVersionMetadata(test.cfg, test.prev) + require_Equal(t, test.cfg.Metadata[JSCreatedVersionMetadataKey], test.expectedMetadata[JSCreatedVersionMetadataKey]) + require_Equal(t, test.cfg.Metadata[JSCreatedLevelMetadataKey], test.expectedMetadata[JSCreatedLevelMetadataKey]) + require_Equal(t, test.cfg.Metadata[JSRequiredLevelMetadataKey], test.expectedMetadata[JSRequiredLevelMetadataKey]) + }) + } +} + +type server struct { + replicas int + js nats.JetStreamContext + nc *nats.Conn +} + +const ( + streamName = "STREAM" + consumerName = "CONSUMER" +) + +func TestJetStreamAssetVersionMetadataMutations(t *testing.T) { + single := RunBasicJetStreamServer(t) + defer single.Shutdown() + nc, js := jsClientConnect(t, single) + defer nc.Close() + + cluster := createJetStreamClusterExplicit(t, "R3S", 3) + defer cluster.shutdown() + cnc, cjs := jsClientConnect(t, cluster.randomServer()) + defer cnc.Close() + + // Test for both single server and clustered mode. + for _, s := range []server{ + {1, js, nc}, + {3, cjs, cnc}, + } { + t.Run(fmt.Sprintf("R%d", s.replicas), func(t *testing.T) { + streamAssetVersionChecks(t, s) + consumerAssetVersionChecks(t, s) + }) + } +} + +func streamAssetVersionChecks(t *testing.T, s server) { + // Add stream. + sc := nats.StreamConfig{Name: streamName, Replicas: s.replicas} + si, err := s.js.AddStream(&sc) + require_NoError(t, err) + require_Equal(t, si.Config.Metadata[JSCreatedVersionMetadataKey], VERSION) + require_Equal(t, si.Config.Metadata[JSCreatedLevelMetadataKey], JSApiLevel) + require_Equal(t, si.Config.Metadata[JSRequiredLevelMetadataKey], "0") + + // Stream info. + si, err = s.js.StreamInfo(streamName) + require_NoError(t, err) + require_Equal(t, si.Config.Metadata[JSCreatedVersionMetadataKey], VERSION) + require_Equal(t, si.Config.Metadata[JSCreatedLevelMetadataKey], JSApiLevel) + require_Equal(t, si.Config.Metadata[JSRequiredLevelMetadataKey], "0") + + // Update stream. + // Metadata set on creation should be preserved, even if not included in update. + si, err = s.js.UpdateStream(&sc) + require_NoError(t, err) + require_Equal(t, si.Config.Metadata[JSCreatedVersionMetadataKey], VERSION) + require_Equal(t, si.Config.Metadata[JSCreatedLevelMetadataKey], JSApiLevel) + require_Equal(t, si.Config.Metadata[JSRequiredLevelMetadataKey], "0") +} + +func consumerAssetVersionChecks(t *testing.T, s server) { + // Add consumer. + cc := nats.ConsumerConfig{Name: consumerName, Replicas: s.replicas} + ci, err := s.js.AddConsumer(streamName, &cc) + require_NoError(t, err) + require_Equal(t, ci.Config.Metadata[JSCreatedVersionMetadataKey], VERSION) + require_Equal(t, ci.Config.Metadata[JSCreatedLevelMetadataKey], JSApiLevel) + require_Equal(t, ci.Config.Metadata[JSRequiredLevelMetadataKey], "0") + + // Consumer info. + ci, err = s.js.ConsumerInfo(streamName, consumerName) + require_NoError(t, err) + require_Equal(t, ci.Config.Metadata[JSCreatedVersionMetadataKey], VERSION) + require_Equal(t, ci.Config.Metadata[JSCreatedLevelMetadataKey], JSApiLevel) + require_Equal(t, ci.Config.Metadata[JSRequiredLevelMetadataKey], "0") + + // Update consumer. + // Metadata set on creation should be preserved, even if not included in update. + ci, err = s.js.UpdateConsumer(streamName, &cc) + require_NoError(t, err) + require_Equal(t, ci.Config.Metadata[JSCreatedVersionMetadataKey], VERSION) + require_Equal(t, ci.Config.Metadata[JSCreatedLevelMetadataKey], JSApiLevel) + require_Equal(t, ci.Config.Metadata[JSRequiredLevelMetadataKey], "0") + + // Use pause advisories to know when pause/resume is applied. + pauseCh := make(chan *nats.Msg, 10) + _, err = s.nc.ChanSubscribe(JSAdvisoryConsumerPausePre+".STREAM.CONSUMER", pauseCh) + require_NoError(t, err) + + // Pause consumer, should up required API level. + jsTestPause_PauseConsumer(t, s.nc, streamName, consumerName, time.Now().Add(time.Second*3)) + require_ChanRead(t, pauseCh, time.Second*2) + require_Len(t, len(pauseCh), 0) + + ci, err = s.js.ConsumerInfo(streamName, consumerName) + require_NoError(t, err) + require_Equal(t, ci.Config.Metadata[JSCreatedVersionMetadataKey], VERSION) + require_Equal(t, ci.Config.Metadata[JSCreatedLevelMetadataKey], JSApiLevel) + require_Equal(t, ci.Config.Metadata[JSRequiredLevelMetadataKey], "1") + + // Unpause consumer, should lower required API level. + subj := fmt.Sprintf("$JS.API.CONSUMER.PAUSE.%s.%s", streamName, consumerName) + _, err = s.nc.Request(subj, nil, time.Second) + require_NoError(t, err) + require_ChanRead(t, pauseCh, time.Second*2) + require_Len(t, len(pauseCh), 0) + + ci, err = s.js.ConsumerInfo(streamName, consumerName) + require_NoError(t, err) + require_Equal(t, ci.Config.Metadata[JSCreatedVersionMetadataKey], VERSION) + require_Equal(t, ci.Config.Metadata[JSCreatedLevelMetadataKey], JSApiLevel) + require_Equal(t, ci.Config.Metadata[JSRequiredLevelMetadataKey], "0") +} + +func TestJetStreamAssetVersionMetadataStreamRestoreAndRestart(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + path := "../test/configs/jetstream/restore_empty_R1F_stream" + restoreStreamFromPath(t, nc, path) + + // Stream restore should result in empty metadata to be preserved. + si, err := js.StreamInfo(streamName) + require_NoError(t, err) + require_Equal(t, len(si.Config.Metadata), 0) + + // Restart server. + port := s.opts.Port + sd := s.StoreDir() + nc.Close() + s.Shutdown() + s = RunJetStreamServerOnPort(port, sd) + defer s.Shutdown() + nc, js = jsClientConnect(t, s) + defer nc.Close() + + // After restart (or upgrade) metadata data should remain empty. + si, err = js.StreamInfo(streamName) + require_NoError(t, err) + require_Equal(t, len(si.Config.Metadata), 0) +} + +func TestJetStreamAssetVersionMetadataStreamRestoreAndRestartCluster(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + path := "../test/configs/jetstream/restore_empty_R3F_stream" + restoreStreamFromPath(t, nc, path) + + // Stream restore should result in empty metadata to be preserved. + si, err := js.StreamInfo(streamName) + require_NoError(t, err) + require_Equal(t, len(si.Config.Metadata), 0) + + // Restart cluster. + c.stopAll() + c.restartAllSamePorts() + defer c.shutdown() + c.waitOnAllCurrent() + c.waitOnStreamLeader("$G", streamName) + nc, js = jsClientConnect(t, c.randomServer()) + defer nc.Close() + + // After restart (or upgrade) metadata data should remain empty. + si, err = js.StreamInfo(streamName) + require_NoError(t, err) + require_Equal(t, len(si.Config.Metadata), 0) +} + +func restoreStreamFromPath(t *testing.T, nc *nats.Conn, path string) { + var rreq JSApiStreamRestoreRequest + buf, err := os.ReadFile(fmt.Sprintf("%s/backup.json", path)) + require_NoError(t, err) + err = json.Unmarshal(buf, &rreq) + require_NoError(t, err) + + data, err := os.Open(fmt.Sprintf("%s/stream.tar.s2", path)) + require_NoError(t, err) + defer data.Close() + + var rresp JSApiStreamRestoreResponse + msg, err := nc.Request(fmt.Sprintf(JSApiStreamRestoreT, rreq.Config.Name), buf, 5*time.Second) + require_NoError(t, err) + json.Unmarshal(msg.Data, &rresp) + if rresp.Error != nil { + t.Fatalf("Error on restore: %+v", rresp.Error) + } + + var chunk [1024]byte + for { + n, err := data.Read(chunk[:]) + if err == io.EOF { + break + } + require_NoError(t, err) + + msg, err = nc.Request(rresp.DeliverSubject, chunk[:n], 5*time.Second) + require_NoError(t, err) + json.Unmarshal(msg.Data, &rresp) + if rresp.Error != nil { + t.Fatalf("Error on restore: %+v", rresp.Error) + } + } + msg, err = nc.Request(rresp.DeliverSubject, nil, 5*time.Second) + require_NoError(t, err) + err = json.Unmarshal(msg.Data, &rresp) + require_NoError(t, err) +} diff --git a/test/configs/jetstream/restore_empty_R1F_stream/backup.json b/test/configs/jetstream/restore_empty_R1F_stream/backup.json new file mode 100644 index 00000000000..ebc48e723c4 --- /dev/null +++ b/test/configs/jetstream/restore_empty_R1F_stream/backup.json @@ -0,0 +1,35 @@ +{ + "config": { + "name": "STREAM", + "subjects": [ + "stream" + ], + "retention": "limits", + "max_consumers": -1, + "max_msgs_per_subject": -1, + "max_msgs": -1, + "max_bytes": -1, + "max_age": 0, + "max_msg_size": -1, + "storage": "file", + "discard": "old", + "num_replicas": 1, + "duplicate_window": 120000000000, + "sealed": false, + "deny_delete": false, + "deny_purge": false, + "allow_rollup_hdrs": false, + "allow_direct": true, + "mirror_direct": false, + "consumer_limits": {} + }, + "state": { + "messages": 0, + "bytes": 0, + "first_seq": 0, + "first_ts": "0001-01-01T00:00:00Z", + "last_seq": 0, + "last_ts": "0001-01-01T00:00:00Z", + "consumer_count": 0 + } +} \ No newline at end of file diff --git a/test/configs/jetstream/restore_empty_R1F_stream/stream.tar.s2 b/test/configs/jetstream/restore_empty_R1F_stream/stream.tar.s2 new file mode 100644 index 0000000000000000000000000000000000000000..a0e9234d609bdb7cd8cc5d767438ac54723b5b6f GIT binary patch literal 861 zcmY+Czi-n(6vtmeLLC}tBI?1VL6t>Ok&KZ7hq$u&PR3T&TT#&AOo%@|!zfG?sP9KT$Mv|JO1ER) zWzwd{dBjx}wW%*;Eg1Egs}4=LEL?Y7=F4%dVaRv7d|BI@yFS4q-|u#u6_*QjLJvE~Za{qR22lorlW}7(cVI&o014P@t~CN;>)B8Jx@9kxdB81<7SBV+#}w z4#Ls)r2G&{Yq(_?CdD=Db*h-x-~$w7SmH3wDx-2(wpMkUQpMgLn1TFBDCDt)q z$gTu1i;raPgOVXwg@L?D5YnItnZZ=$rIJcPb}*YQ%Vh@h$$^ZZW literal 0 HcmV?d00001 diff --git a/test/configs/jetstream/restore_empty_R3F_stream/backup.json b/test/configs/jetstream/restore_empty_R3F_stream/backup.json new file mode 100644 index 00000000000..f9bdb2d49dc --- /dev/null +++ b/test/configs/jetstream/restore_empty_R3F_stream/backup.json @@ -0,0 +1,35 @@ +{ + "config": { + "name": "STREAM", + "subjects": [ + "stream" + ], + "retention": "limits", + "max_consumers": -1, + "max_msgs_per_subject": -1, + "max_msgs": -1, + "max_bytes": -1, + "max_age": 0, + "max_msg_size": -1, + "storage": "file", + "discard": "old", + "num_replicas": 3, + "duplicate_window": 120000000000, + "sealed": false, + "deny_delete": false, + "deny_purge": false, + "allow_rollup_hdrs": false, + "allow_direct": true, + "mirror_direct": false, + "consumer_limits": {} + }, + "state": { + "messages": 0, + "bytes": 0, + "first_seq": 0, + "first_ts": "0001-01-01T00:00:00Z", + "last_seq": 0, + "last_ts": "0001-01-01T00:00:00Z", + "consumer_count": 0 + } +} \ No newline at end of file diff --git a/test/configs/jetstream/restore_empty_R3F_stream/stream.tar.s2 b/test/configs/jetstream/restore_empty_R3F_stream/stream.tar.s2 new file mode 100644 index 0000000000000000000000000000000000000000..2775b7461ce8640229e054747c0df9547fe90698 GIT binary patch literal 861 zcmY*XyKmD#82=Iy;y@sYS`RJ_iY%x=O49TBm6(@kp+Ln;ZRh|(#)&V*;77P~lm=7; zNQfDUftdjbF|op*0IX~*EC?h}5DQ`{=N1OW@9uZM*L@q?0hTdq4==!lZE(KW{&w#~ z`=YaUnm8_@tbt|&5<3kq>_Ist!cjnZ0LbSu2=X8bC4^K1vXIa01EeU1ra^VB{D=cr z&cXD6#SR68L^x8O7vWi~qG}L(0Xd?WSZnE~iS>rAtGZ$~Rc(g|mGXWGUcs%OBAmh# zY0i$2F&ZBg;2JWK4vJRb+MnhMohPei!%7z&{iGZ7~5R^j2B%E#U!l&3^k zAK$+t&r{;W#FZ@>Bdpa?vyNMBrD^H9r8FDZ)HT%9H1o1tlS3yU9JkzFI(hsYpV^>u zgLGpiTUTTjbDluHT9YY>Nf>)k$f3R$c>KO52hN(^jY2jEcyF>*SGJAxntn1LkXym(zdK}TsCvwm8Nn&ph*p&2r zuj_CLwI;iR%^@cC&69&ikqoM73l&6xdm<|ZH z=}{U{QA9n*XMASF@rf%I54a>8*)H*UTw@Xn{Q+H-Ci2$~(8%`(efzpgnK*U!UugSp z71o{g^{>n61rP*Y#;5bJ(t`bG*7sN?SONh|39hG;AD+Yc%v}ybm@h~^V<_K2kYEN5 z6V+^XNKfze^p@U5#CV-57bN%qC0-Vr#aUsRhb3zbx5HViac&L@3s5Rx199?hTHvTe zU>+aJJOCl1VG(i#RYORE4rF$w>Mw;<3bH%1*&1JFP@n$(F&sUAzpGfzO7IrA^ODl& q_&1gBd~?5S%>63v6@ Date: Fri, 30 Aug 2024 17:16:23 +0200 Subject: [PATCH 02/12] Lint imports Signed-off-by: Maurice van Veen --- server/jetstream_versioning_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/jetstream_versioning_test.go b/server/jetstream_versioning_test.go index 2a4dbca1da1..2b72acb047d 100644 --- a/server/jetstream_versioning_test.go +++ b/server/jetstream_versioning_test.go @@ -16,11 +16,12 @@ package server import ( "encoding/json" "fmt" - "github.com/nats-io/nats.go" "io" "os" "testing" "time" + + "github.com/nats-io/nats.go" ) func metadataAllSet(featureLevel string) map[string]string { From 7fa3b13a1f829746d439d7822226e5546ff47c19 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Fri, 30 Aug 2024 17:23:37 +0200 Subject: [PATCH 03/12] Fix for TestJetStreamClusterConsumerActions Signed-off-by: Maurice van Veen --- server/jetstream_cluster.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 64f142e0942..cc1f07379c2 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -7286,6 +7286,9 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec // if name was set by the user. if oname != _EMPTY_ { if ca = sa.consumers[oname]; ca != nil && !ca.deleted { + // Provided config might miss metadata, include from existing config. + setConsumerAssetVersionMetadata(cfg, ca.Config) + if action == ActionCreate && !reflect.DeepEqual(cfg, ca.Config) { resp.Error = NewJSConsumerAlreadyExistsError() s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) From 2c3f9984d57f455748187eb10cbc94a08810b2b5 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Fri, 30 Aug 2024 17:26:56 +0200 Subject: [PATCH 04/12] Skip JS versioning tests when running other tests from the server package Signed-off-by: Maurice van Veen --- server/jetstream_versioning_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/jetstream_versioning_test.go b/server/jetstream_versioning_test.go index 2b72acb047d..0bf9634aec6 100644 --- a/server/jetstream_versioning_test.go +++ b/server/jetstream_versioning_test.go @@ -11,6 +11,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build !skip_js_tests +// +build !skip_js_tests + package server import ( From 1b70d9cdb1c352e69a81eb75d08c60a5e0caf434 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Fri, 30 Aug 2024 18:07:12 +0200 Subject: [PATCH 05/12] More comprehensive fix when doing equality check on cfg and prevCfg Signed-off-by: Maurice van Veen --- server/jetstream_cluster.go | 4 +- server/jetstream_versioning.go | 37 +++++++++++++ server/jetstream_versioning_test.go | 80 +++++++++++++++++++++++++++-- 3 files changed, 114 insertions(+), 7 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index cc1f07379c2..a9863207a23 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -7286,8 +7286,8 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec // if name was set by the user. if oname != _EMPTY_ { if ca = sa.consumers[oname]; ca != nil && !ca.deleted { - // Provided config might miss metadata, include from existing config. - setConsumerAssetVersionMetadata(cfg, ca.Config) + // Provided config might miss metadata, copy from existing config. + copyConsumerAssetVersionMetadata(cfg, ca.Config) if action == ActionCreate && !reflect.DeepEqual(cfg, ca.Config) { resp.Error = NewJSConsumerAlreadyExistsError() diff --git a/server/jetstream_versioning.go b/server/jetstream_versioning.go index 57ca0506509..e0e995ac94d 100644 --- a/server/jetstream_versioning.go +++ b/server/jetstream_versioning.go @@ -76,6 +76,43 @@ func setConsumerAssetVersionMetadata(cfg *ConsumerConfig, prevCfg *ConsumerConfi cfg.Metadata[JSRequiredLevelMetadataKey] = requiredApiLevel } +// copyConsumerAssetVersionMetadata copies versioning fields from metadata of prevCfg into cfg. +// Removes versioning fields if no previous metadata, updates if set, and removes fields if it doesn't exist in prevCfg. +// +// Note: useful when doing equality checks on cfg and prevCfg, but ignoring any versioning metadata differences. +// MUST be followed up with a call to setConsumerAssetVersionMetadata to fix potentially lost metadata. +func copyConsumerAssetVersionMetadata(cfg *ConsumerConfig, prevCfg *ConsumerConfig) { + // Remove fields when no previous metadata. + if prevCfg == nil || prevCfg.Metadata == nil { + if cfg.Metadata != nil { + delete(cfg.Metadata, JSCreatedVersionMetadataKey) + delete(cfg.Metadata, JSCreatedLevelMetadataKey) + delete(cfg.Metadata, JSRequiredLevelMetadataKey) + if len(cfg.Metadata) == 0 { + cfg.Metadata = nil + } + } + return + } + + // Set if exists, delete otherwise. + setOrDeleteInMetadata(cfg, prevCfg, JSCreatedVersionMetadataKey) + setOrDeleteInMetadata(cfg, prevCfg, JSCreatedLevelMetadataKey) + setOrDeleteInMetadata(cfg, prevCfg, JSRequiredLevelMetadataKey) +} + +// setOrDeleteInMetadata sets field with key/value in metadata of cfg if set, deletes otherwise. +func setOrDeleteInMetadata(cfg *ConsumerConfig, prevCfg *ConsumerConfig, key string) { + if value, ok := prevCfg.Metadata[key]; ok { + if cfg.Metadata == nil { + cfg.Metadata = make(map[string]string) + } + cfg.Metadata[key] = value + } else { + delete(cfg.Metadata, key) + } +} + // preserveAssetCreatedVersionMetadata sets metadata to contain which version and API level the asset was created on. // Preserves previous metadata, if not set it initializes versions for the metadata. func preserveAssetCreatedVersionMetadata(metadata, prevMetadata map[string]string) { diff --git a/server/jetstream_versioning_test.go b/server/jetstream_versioning_test.go index 0bf9634aec6..e2659516324 100644 --- a/server/jetstream_versioning_test.go +++ b/server/jetstream_versioning_test.go @@ -51,7 +51,7 @@ func metadataUpdatedPrevious(featureLevel string) map[string]string { } } -func metadataEmpty() map[string]string { +func metadataOnlyRequired() map[string]string { return map[string]string{ JSRequiredLevelMetadataKey: "0", } @@ -86,13 +86,13 @@ func TestJetStreamSetStreamAssetVersionMetadata(t *testing.T) { desc: "update/empty-prev-metadata", cfg: &StreamConfig{}, prev: &StreamConfig{}, - expectedMetadata: metadataEmpty(), + expectedMetadata: metadataOnlyRequired(), }, { desc: "update/empty-prev-metadata/delete-user-provided", cfg: &StreamConfig{Metadata: metadataPrevious()}, prev: &StreamConfig{}, - expectedMetadata: metadataEmpty(), + expectedMetadata: metadataOnlyRequired(), }, } { t.Run(test.desc, func(t *testing.T) { @@ -159,13 +159,13 @@ func TestJetStreamSetConsumerAssetVersionMetadata(t *testing.T) { desc: "update/empty-prev-metadata", cfg: &ConsumerConfig{}, prev: &ConsumerConfig{}, - expectedMetadata: metadataEmpty(), + expectedMetadata: metadataOnlyRequired(), }, { desc: "update/empty-prev-metadata/delete-user-provided", cfg: &ConsumerConfig{Metadata: metadataPrevious()}, prev: &ConsumerConfig{}, - expectedMetadata: metadataEmpty(), + expectedMetadata: metadataOnlyRequired(), }, } { t.Run(test.desc, func(t *testing.T) { @@ -177,6 +177,76 @@ func TestJetStreamSetConsumerAssetVersionMetadata(t *testing.T) { } } +func TestJetStreamCopyConsumerAssetVersionMetadata(t *testing.T) { + for _, test := range []struct { + desc string + cfg *ConsumerConfig + prev *ConsumerConfig + }{ + { + desc: "no-previous-ignore", + cfg: &ConsumerConfig{Metadata: metadataAllSet("-1")}, + prev: nil, + }, + { + desc: "nil-previous-metadata-ignore", + cfg: &ConsumerConfig{Metadata: metadataAllSet("-1")}, + prev: &ConsumerConfig{Metadata: nil}, + }, + { + desc: "nil-current-metadata-ignore", + cfg: &ConsumerConfig{Metadata: nil}, + prev: &ConsumerConfig{Metadata: metadataPrevious()}, + }, + { + desc: "copy-previous", + cfg: &ConsumerConfig{Metadata: metadataAllSet("-1")}, + prev: &ConsumerConfig{Metadata: metadataPrevious()}, + }, + { + desc: "delete-missing-fields", + cfg: &ConsumerConfig{Metadata: metadataAllSet("-1")}, + prev: &ConsumerConfig{Metadata: make(map[string]string)}, + }, + } { + t.Run(test.desc, func(t *testing.T) { + copyConsumerAssetVersionMetadata(test.cfg, test.prev) + + var expectedMetadata map[string]string + if test.prev != nil { + expectedMetadata = test.prev.Metadata + } + + value, ok := expectedMetadata[JSCreatedVersionMetadataKey] + if ok { + require_Equal(t, test.cfg.Metadata[JSCreatedVersionMetadataKey], value) + } else { + // Key shouldn't exist. + _, ok = test.cfg.Metadata[JSCreatedVersionMetadataKey] + require_False(t, ok) + } + + value, ok = expectedMetadata[JSCreatedLevelMetadataKey] + if ok { + require_Equal(t, test.cfg.Metadata[JSCreatedLevelMetadataKey], value) + } else { + // Key shouldn't exist. + _, ok = test.cfg.Metadata[JSCreatedLevelMetadataKey] + require_False(t, ok) + } + + value, ok = expectedMetadata[JSRequiredLevelMetadataKey] + if ok { + require_Equal(t, test.cfg.Metadata[JSRequiredLevelMetadataKey], value) + } else { + // Key shouldn't exist. + _, ok = test.cfg.Metadata[JSRequiredLevelMetadataKey] + require_False(t, ok) + } + }) + } +} + type server struct { replicas int js nats.JetStreamContext From e34a60c7b20c986ce6c031f96d09ec13d784e61a Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Fri, 30 Aug 2024 19:26:50 +0200 Subject: [PATCH 06/12] Lint Signed-off-by: Maurice van Veen --- server/jetstream_versioning_test.go | 36 ++++++++++++++--------------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/server/jetstream_versioning_test.go b/server/jetstream_versioning_test.go index e2659516324..468cafac2e2 100644 --- a/server/jetstream_versioning_test.go +++ b/server/jetstream_versioning_test.go @@ -179,34 +179,34 @@ func TestJetStreamSetConsumerAssetVersionMetadata(t *testing.T) { func TestJetStreamCopyConsumerAssetVersionMetadata(t *testing.T) { for _, test := range []struct { - desc string - cfg *ConsumerConfig - prev *ConsumerConfig + desc string + cfg *ConsumerConfig + prev *ConsumerConfig }{ { - desc: "no-previous-ignore", - cfg: &ConsumerConfig{Metadata: metadataAllSet("-1")}, - prev: nil, + desc: "no-previous-ignore", + cfg: &ConsumerConfig{Metadata: metadataAllSet("-1")}, + prev: nil, }, { - desc: "nil-previous-metadata-ignore", - cfg: &ConsumerConfig{Metadata: metadataAllSet("-1")}, - prev: &ConsumerConfig{Metadata: nil}, + desc: "nil-previous-metadata-ignore", + cfg: &ConsumerConfig{Metadata: metadataAllSet("-1")}, + prev: &ConsumerConfig{Metadata: nil}, }, { - desc: "nil-current-metadata-ignore", - cfg: &ConsumerConfig{Metadata: nil}, - prev: &ConsumerConfig{Metadata: metadataPrevious()}, + desc: "nil-current-metadata-ignore", + cfg: &ConsumerConfig{Metadata: nil}, + prev: &ConsumerConfig{Metadata: metadataPrevious()}, }, { - desc: "copy-previous", - cfg: &ConsumerConfig{Metadata: metadataAllSet("-1")}, - prev: &ConsumerConfig{Metadata: metadataPrevious()}, + desc: "copy-previous", + cfg: &ConsumerConfig{Metadata: metadataAllSet("-1")}, + prev: &ConsumerConfig{Metadata: metadataPrevious()}, }, { - desc: "delete-missing-fields", - cfg: &ConsumerConfig{Metadata: metadataAllSet("-1")}, - prev: &ConsumerConfig{Metadata: make(map[string]string)}, + desc: "delete-missing-fields", + cfg: &ConsumerConfig{Metadata: metadataAllSet("-1")}, + prev: &ConsumerConfig{Metadata: make(map[string]string)}, }, } { t.Run(test.desc, func(t *testing.T) { From 491aab5e32c230d3603da806697876ee0fbd55f2 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Tue, 3 Sep 2024 13:42:20 +0200 Subject: [PATCH 07/12] Simplify validation Signed-off-by: Maurice van Veen --- server/jetstream_versioning_test.go | 38 +++++++++++------------------ 1 file changed, 14 insertions(+), 24 deletions(-) diff --git a/server/jetstream_versioning_test.go b/server/jetstream_versioning_test.go index 468cafac2e2..cd4bad3d1d0 100644 --- a/server/jetstream_versioning_test.go +++ b/server/jetstream_versioning_test.go @@ -281,29 +281,29 @@ func TestJetStreamAssetVersionMetadataMutations(t *testing.T) { } } +func validateMetadata(metadata map[string]string, expectedFeatureLevel string) bool { + return metadata[JSCreatedVersionMetadataKey] == VERSION || + metadata[JSCreatedLevelMetadataKey] == JSApiLevel || + metadata[JSRequiredLevelMetadataKey] == expectedFeatureLevel +} + func streamAssetVersionChecks(t *testing.T, s server) { // Add stream. sc := nats.StreamConfig{Name: streamName, Replicas: s.replicas} si, err := s.js.AddStream(&sc) require_NoError(t, err) - require_Equal(t, si.Config.Metadata[JSCreatedVersionMetadataKey], VERSION) - require_Equal(t, si.Config.Metadata[JSCreatedLevelMetadataKey], JSApiLevel) - require_Equal(t, si.Config.Metadata[JSRequiredLevelMetadataKey], "0") + require_True(t, validateMetadata(si.Config.Metadata, "0")) // Stream info. si, err = s.js.StreamInfo(streamName) require_NoError(t, err) - require_Equal(t, si.Config.Metadata[JSCreatedVersionMetadataKey], VERSION) - require_Equal(t, si.Config.Metadata[JSCreatedLevelMetadataKey], JSApiLevel) - require_Equal(t, si.Config.Metadata[JSRequiredLevelMetadataKey], "0") + require_True(t, validateMetadata(si.Config.Metadata, "0")) // Update stream. // Metadata set on creation should be preserved, even if not included in update. si, err = s.js.UpdateStream(&sc) require_NoError(t, err) - require_Equal(t, si.Config.Metadata[JSCreatedVersionMetadataKey], VERSION) - require_Equal(t, si.Config.Metadata[JSCreatedLevelMetadataKey], JSApiLevel) - require_Equal(t, si.Config.Metadata[JSRequiredLevelMetadataKey], "0") + require_True(t, validateMetadata(si.Config.Metadata, "0")) } func consumerAssetVersionChecks(t *testing.T, s server) { @@ -311,24 +311,18 @@ func consumerAssetVersionChecks(t *testing.T, s server) { cc := nats.ConsumerConfig{Name: consumerName, Replicas: s.replicas} ci, err := s.js.AddConsumer(streamName, &cc) require_NoError(t, err) - require_Equal(t, ci.Config.Metadata[JSCreatedVersionMetadataKey], VERSION) - require_Equal(t, ci.Config.Metadata[JSCreatedLevelMetadataKey], JSApiLevel) - require_Equal(t, ci.Config.Metadata[JSRequiredLevelMetadataKey], "0") + require_True(t, validateMetadata(ci.Config.Metadata, "0")) // Consumer info. ci, err = s.js.ConsumerInfo(streamName, consumerName) require_NoError(t, err) - require_Equal(t, ci.Config.Metadata[JSCreatedVersionMetadataKey], VERSION) - require_Equal(t, ci.Config.Metadata[JSCreatedLevelMetadataKey], JSApiLevel) - require_Equal(t, ci.Config.Metadata[JSRequiredLevelMetadataKey], "0") + require_True(t, validateMetadata(ci.Config.Metadata, "0")) // Update consumer. // Metadata set on creation should be preserved, even if not included in update. ci, err = s.js.UpdateConsumer(streamName, &cc) require_NoError(t, err) - require_Equal(t, ci.Config.Metadata[JSCreatedVersionMetadataKey], VERSION) - require_Equal(t, ci.Config.Metadata[JSCreatedLevelMetadataKey], JSApiLevel) - require_Equal(t, ci.Config.Metadata[JSRequiredLevelMetadataKey], "0") + require_True(t, validateMetadata(ci.Config.Metadata, "0")) // Use pause advisories to know when pause/resume is applied. pauseCh := make(chan *nats.Msg, 10) @@ -342,9 +336,7 @@ func consumerAssetVersionChecks(t *testing.T, s server) { ci, err = s.js.ConsumerInfo(streamName, consumerName) require_NoError(t, err) - require_Equal(t, ci.Config.Metadata[JSCreatedVersionMetadataKey], VERSION) - require_Equal(t, ci.Config.Metadata[JSCreatedLevelMetadataKey], JSApiLevel) - require_Equal(t, ci.Config.Metadata[JSRequiredLevelMetadataKey], "1") + require_True(t, validateMetadata(ci.Config.Metadata, "1")) // Unpause consumer, should lower required API level. subj := fmt.Sprintf("$JS.API.CONSUMER.PAUSE.%s.%s", streamName, consumerName) @@ -355,9 +347,7 @@ func consumerAssetVersionChecks(t *testing.T, s server) { ci, err = s.js.ConsumerInfo(streamName, consumerName) require_NoError(t, err) - require_Equal(t, ci.Config.Metadata[JSCreatedVersionMetadataKey], VERSION) - require_Equal(t, ci.Config.Metadata[JSCreatedLevelMetadataKey], JSApiLevel) - require_Equal(t, ci.Config.Metadata[JSRequiredLevelMetadataKey], "0") + require_True(t, validateMetadata(ci.Config.Metadata, "0")) } func TestJetStreamAssetVersionMetadataStreamRestoreAndRestart(t *testing.T) { From 604cc0e20207f52f9602a15112a8c590a709b150 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Thu, 5 Sep 2024 10:09:50 +0200 Subject: [PATCH 08/12] Shorten function names Signed-off-by: Maurice van Veen --- server/jetstream_api.go | 10 +++++----- server/jetstream_cluster.go | 6 +++--- server/jetstream_versioning.go | 22 +++++++++++----------- server/jetstream_versioning_test.go | 26 +++++++++++++------------- 4 files changed, 32 insertions(+), 32 deletions(-) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index dbd93015d24..c56a529ec1e 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1423,7 +1423,7 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account, } // Initialize asset version metadata. - setStreamAssetVersionMetadata(&cfg.StreamConfig, nil) + setStaticStreamMetadata(&cfg.StreamConfig, nil) streamName := streamNameFromSubject(subject) if streamName != cfg.Name { @@ -1561,7 +1561,7 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account, } // Update asset version metadata. - setStreamAssetVersionMetadata(&cfg, &mset.cfg) + setStaticStreamMetadata(&cfg, &mset.cfg) if err := mset.updatePedantic(&cfg, ncfg.Pedantic); err != nil { resp.Error = NewJSStreamUpdateError(err, Unless(err)) @@ -4043,7 +4043,7 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun } // Initialize/update asset version metadata. - setConsumerAssetVersionMetadata(&req.Config, oldCfg) + setStaticConsumerMetadata(&req.Config, oldCfg) o, err := stream.addConsumerWithAction(&req.Config, req.Action, req.Pedantic) @@ -4603,7 +4603,7 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account // Update asset version metadata due to updating pause/resume. // Only PauseUntil is updated above, so reuse config for both. - setConsumerAssetVersionMetadata(nca.Config, nca.Config) + setStaticConsumerMetadata(nca.Config, nca.Config) eca := encodeAddConsumerAssignment(&nca) cc.meta.Propose(eca) @@ -4640,7 +4640,7 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account // Update asset version metadata due to updating pause/resume. // Only PauseUntil is updated above, so reuse config for both. - setConsumerAssetVersionMetadata(&ncfg, &ncfg) + setStaticConsumerMetadata(&ncfg, &ncfg) if err := obs.updateConfig(&ncfg); err != nil { // The only type of error that should be returned here is from o.store, diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index a9863207a23..16dfabc32b7 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -6246,7 +6246,7 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su } // Update asset version metadata. - setStreamAssetVersionMetadata(cfg, osa.Config) + setStaticStreamMetadata(cfg, osa.Config) var newCfg *StreamConfig if jsa := js.accounts[acc.Name]; jsa != nil { @@ -7287,7 +7287,7 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec if oname != _EMPTY_ { if ca = sa.consumers[oname]; ca != nil && !ca.deleted { // Provided config might miss metadata, copy from existing config. - copyConsumerAssetVersionMetadata(cfg, ca.Config) + copyConsumerMetadata(cfg, ca.Config) if action == ActionCreate && !reflect.DeepEqual(cfg, ca.Config) { resp.Error = NewJSConsumerAlreadyExistsError() @@ -7308,7 +7308,7 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec if ca != nil { oldCfg = ca.Config } - setConsumerAssetVersionMetadata(cfg, oldCfg) + setStaticConsumerMetadata(cfg, oldCfg) // If this is new consumer. if ca == nil { diff --git a/server/jetstream_versioning.go b/server/jetstream_versioning.go index e0e995ac94d..60b0658cc3f 100644 --- a/server/jetstream_versioning.go +++ b/server/jetstream_versioning.go @@ -22,11 +22,11 @@ const ( JSRequiredLevelMetadataKey = "_nats.server.require.api_level" ) -// setStreamAssetVersionMetadata sets JetStream stream metadata, like the server version and API level. +// setStaticStreamMetadata sets JetStream stream metadata, like the server version and API level. // Given: // - cfg!=nil, prevCfg==nil add stream: adds created and required metadata // - cfg!=nil, prevCfg!=nil update stream: created metadata is preserved, required metadata is updated -func setStreamAssetVersionMetadata(cfg *StreamConfig, prevCfg *StreamConfig) { +func setStaticStreamMetadata(cfg *StreamConfig, prevCfg *StreamConfig) { if cfg.Metadata == nil { cfg.Metadata = make(map[string]string) } @@ -39,17 +39,17 @@ func setStreamAssetVersionMetadata(cfg *StreamConfig, prevCfg *StreamConfig) { prevMetadata = make(map[string]string) } } - preserveAssetCreatedVersionMetadata(cfg.Metadata, prevMetadata) + preserveCreatedMetadata(cfg.Metadata, prevMetadata) requiredApiLevel := "0" cfg.Metadata[JSRequiredLevelMetadataKey] = requiredApiLevel } -// setConsumerAssetVersionMetadata sets JetStream consumer metadata, like the server version and API level. +// setStaticConsumerMetadata sets JetStream consumer metadata, like the server version and API level. // Given: // - cfg!=nil, prevCfg==nil add consumer: adds created and required metadata // - cfg!=nil, prevCfg!=nil update consumer: created metadata is preserved, required metadata is updated -func setConsumerAssetVersionMetadata(cfg *ConsumerConfig, prevCfg *ConsumerConfig) { +func setStaticConsumerMetadata(cfg *ConsumerConfig, prevCfg *ConsumerConfig) { if cfg.Metadata == nil { cfg.Metadata = make(map[string]string) } @@ -62,7 +62,7 @@ func setConsumerAssetVersionMetadata(cfg *ConsumerConfig, prevCfg *ConsumerConfi prevMetadata = make(map[string]string) } } - preserveAssetCreatedVersionMetadata(cfg.Metadata, prevMetadata) + preserveCreatedMetadata(cfg.Metadata, prevMetadata) requiredApiLevel := "0" @@ -76,12 +76,12 @@ func setConsumerAssetVersionMetadata(cfg *ConsumerConfig, prevCfg *ConsumerConfi cfg.Metadata[JSRequiredLevelMetadataKey] = requiredApiLevel } -// copyConsumerAssetVersionMetadata copies versioning fields from metadata of prevCfg into cfg. +// copyConsumerMetadata copies versioning fields from metadata of prevCfg into cfg. // Removes versioning fields if no previous metadata, updates if set, and removes fields if it doesn't exist in prevCfg. // // Note: useful when doing equality checks on cfg and prevCfg, but ignoring any versioning metadata differences. -// MUST be followed up with a call to setConsumerAssetVersionMetadata to fix potentially lost metadata. -func copyConsumerAssetVersionMetadata(cfg *ConsumerConfig, prevCfg *ConsumerConfig) { +// MUST be followed up with a call to setStaticConsumerMetadata to fix potentially lost metadata. +func copyConsumerMetadata(cfg *ConsumerConfig, prevCfg *ConsumerConfig) { // Remove fields when no previous metadata. if prevCfg == nil || prevCfg.Metadata == nil { if cfg.Metadata != nil { @@ -113,9 +113,9 @@ func setOrDeleteInMetadata(cfg *ConsumerConfig, prevCfg *ConsumerConfig, key str } } -// preserveAssetCreatedVersionMetadata sets metadata to contain which version and API level the asset was created on. +// preserveCreatedMetadata sets metadata to contain which version and API level the asset was created on. // Preserves previous metadata, if not set it initializes versions for the metadata. -func preserveAssetCreatedVersionMetadata(metadata, prevMetadata map[string]string) { +func preserveCreatedMetadata(metadata, prevMetadata map[string]string) { if prevMetadata == nil { metadata[JSCreatedVersionMetadataKey] = VERSION metadata[JSCreatedLevelMetadataKey] = JSApiLevel diff --git a/server/jetstream_versioning_test.go b/server/jetstream_versioning_test.go index cd4bad3d1d0..7b2b5ac0909 100644 --- a/server/jetstream_versioning_test.go +++ b/server/jetstream_versioning_test.go @@ -57,7 +57,7 @@ func metadataOnlyRequired() map[string]string { } } -func TestJetStreamSetStreamAssetVersionMetadata(t *testing.T) { +func TestJetStreamSetStaticStreamMetadata(t *testing.T) { for _, test := range []struct { desc string cfg *StreamConfig @@ -96,7 +96,7 @@ func TestJetStreamSetStreamAssetVersionMetadata(t *testing.T) { }, } { t.Run(test.desc, func(t *testing.T) { - setStreamAssetVersionMetadata(test.cfg, test.prev) + setStaticStreamMetadata(test.cfg, test.prev) require_Equal(t, test.cfg.Metadata[JSCreatedVersionMetadataKey], test.expectedMetadata[JSCreatedVersionMetadataKey]) require_Equal(t, test.cfg.Metadata[JSCreatedLevelMetadataKey], test.expectedMetadata[JSCreatedLevelMetadataKey]) require_Equal(t, test.cfg.Metadata[JSRequiredLevelMetadataKey], test.expectedMetadata[JSRequiredLevelMetadataKey]) @@ -104,7 +104,7 @@ func TestJetStreamSetStreamAssetVersionMetadata(t *testing.T) { } } -func TestJetStreamSetConsumerAssetVersionMetadata(t *testing.T) { +func TestJetStreamSetStaticConsumerMetadata(t *testing.T) { pauseUntil := time.Unix(0, 0) pauseUntilZero := time.Time{} for _, test := range []struct { @@ -169,7 +169,7 @@ func TestJetStreamSetConsumerAssetVersionMetadata(t *testing.T) { }, } { t.Run(test.desc, func(t *testing.T) { - setConsumerAssetVersionMetadata(test.cfg, test.prev) + setStaticConsumerMetadata(test.cfg, test.prev) require_Equal(t, test.cfg.Metadata[JSCreatedVersionMetadataKey], test.expectedMetadata[JSCreatedVersionMetadataKey]) require_Equal(t, test.cfg.Metadata[JSCreatedLevelMetadataKey], test.expectedMetadata[JSCreatedLevelMetadataKey]) require_Equal(t, test.cfg.Metadata[JSRequiredLevelMetadataKey], test.expectedMetadata[JSRequiredLevelMetadataKey]) @@ -177,7 +177,7 @@ func TestJetStreamSetConsumerAssetVersionMetadata(t *testing.T) { } } -func TestJetStreamCopyConsumerAssetVersionMetadata(t *testing.T) { +func TestJetStreamCopyConsumerMetadata(t *testing.T) { for _, test := range []struct { desc string cfg *ConsumerConfig @@ -210,7 +210,7 @@ func TestJetStreamCopyConsumerAssetVersionMetadata(t *testing.T) { }, } { t.Run(test.desc, func(t *testing.T) { - copyConsumerAssetVersionMetadata(test.cfg, test.prev) + copyConsumerMetadata(test.cfg, test.prev) var expectedMetadata map[string]string if test.prev != nil { @@ -258,7 +258,7 @@ const ( consumerName = "CONSUMER" ) -func TestJetStreamAssetVersionMetadataMutations(t *testing.T) { +func TestJetStreamMetadataMutations(t *testing.T) { single := RunBasicJetStreamServer(t) defer single.Shutdown() nc, js := jsClientConnect(t, single) @@ -275,8 +275,8 @@ func TestJetStreamAssetVersionMetadataMutations(t *testing.T) { {3, cjs, cnc}, } { t.Run(fmt.Sprintf("R%d", s.replicas), func(t *testing.T) { - streamAssetVersionChecks(t, s) - consumerAssetVersionChecks(t, s) + streamMetadataChecks(t, s) + consumerMetadataChecks(t, s) }) } } @@ -287,7 +287,7 @@ func validateMetadata(metadata map[string]string, expectedFeatureLevel string) b metadata[JSRequiredLevelMetadataKey] == expectedFeatureLevel } -func streamAssetVersionChecks(t *testing.T, s server) { +func streamMetadataChecks(t *testing.T, s server) { // Add stream. sc := nats.StreamConfig{Name: streamName, Replicas: s.replicas} si, err := s.js.AddStream(&sc) @@ -306,7 +306,7 @@ func streamAssetVersionChecks(t *testing.T, s server) { require_True(t, validateMetadata(si.Config.Metadata, "0")) } -func consumerAssetVersionChecks(t *testing.T, s server) { +func consumerMetadataChecks(t *testing.T, s server) { // Add consumer. cc := nats.ConsumerConfig{Name: consumerName, Replicas: s.replicas} ci, err := s.js.AddConsumer(streamName, &cc) @@ -350,7 +350,7 @@ func consumerAssetVersionChecks(t *testing.T, s server) { require_True(t, validateMetadata(ci.Config.Metadata, "0")) } -func TestJetStreamAssetVersionMetadataStreamRestoreAndRestart(t *testing.T) { +func TestJetStreamMetadataStreamRestoreAndRestart(t *testing.T) { s := RunBasicJetStreamServer(t) defer s.Shutdown() nc, js := jsClientConnect(t, s) @@ -380,7 +380,7 @@ func TestJetStreamAssetVersionMetadataStreamRestoreAndRestart(t *testing.T) { require_Equal(t, len(si.Config.Metadata), 0) } -func TestJetStreamAssetVersionMetadataStreamRestoreAndRestartCluster(t *testing.T) { +func TestJetStreamMetadataStreamRestoreAndRestartCluster(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() nc, js := jsClientConnect(t, c.randomServer()) From ca9ec9513932da2bc1ec57d40dc2325f43d4c660 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Thu, 5 Sep 2024 12:52:20 +0200 Subject: [PATCH 09/12] Use int for JSApiLevel Signed-off-by: Maurice van Veen --- server/jetstream_versioning.go | 16 +++++++++------- server/jetstream_versioning_test.go | 5 +++-- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/server/jetstream_versioning.go b/server/jetstream_versioning.go index 60b0658cc3f..1c49468650e 100644 --- a/server/jetstream_versioning.go +++ b/server/jetstream_versioning.go @@ -13,9 +13,11 @@ package server +import "strconv" + const ( // JSApiLevel is the maximum supported JetStream API level for this server. - JSApiLevel = "1" + JSApiLevel int = 1 JSCreatedVersionMetadataKey = "_nats.created.server.version" JSCreatedLevelMetadataKey = "_nats.created.server.api_level" @@ -41,8 +43,8 @@ func setStaticStreamMetadata(cfg *StreamConfig, prevCfg *StreamConfig) { } preserveCreatedMetadata(cfg.Metadata, prevMetadata) - requiredApiLevel := "0" - cfg.Metadata[JSRequiredLevelMetadataKey] = requiredApiLevel + var requiredApiLevel int + cfg.Metadata[JSRequiredLevelMetadataKey] = strconv.Itoa(requiredApiLevel) } // setStaticConsumerMetadata sets JetStream consumer metadata, like the server version and API level. @@ -64,16 +66,16 @@ func setStaticConsumerMetadata(cfg *ConsumerConfig, prevCfg *ConsumerConfig) { } preserveCreatedMetadata(cfg.Metadata, prevMetadata) - requiredApiLevel := "0" + var requiredApiLevel int // Added in 2.11, absent | zero is the feature is not used. // one could be stricter and say even if its set but the time // has already passed it is also not needed to restore the consumer if cfg.PauseUntil != nil && !cfg.PauseUntil.IsZero() { - requiredApiLevel = "1" + requiredApiLevel = 1 } - cfg.Metadata[JSRequiredLevelMetadataKey] = requiredApiLevel + cfg.Metadata[JSRequiredLevelMetadataKey] = strconv.Itoa(requiredApiLevel) } // copyConsumerMetadata copies versioning fields from metadata of prevCfg into cfg. @@ -118,7 +120,7 @@ func setOrDeleteInMetadata(cfg *ConsumerConfig, prevCfg *ConsumerConfig, key str func preserveCreatedMetadata(metadata, prevMetadata map[string]string) { if prevMetadata == nil { metadata[JSCreatedVersionMetadataKey] = VERSION - metadata[JSCreatedLevelMetadataKey] = JSApiLevel + metadata[JSCreatedLevelMetadataKey] = strconv.Itoa(JSApiLevel) return } diff --git a/server/jetstream_versioning_test.go b/server/jetstream_versioning_test.go index 7b2b5ac0909..ca77f22e3c4 100644 --- a/server/jetstream_versioning_test.go +++ b/server/jetstream_versioning_test.go @@ -21,6 +21,7 @@ import ( "fmt" "io" "os" + "strconv" "testing" "time" @@ -30,7 +31,7 @@ import ( func metadataAllSet(featureLevel string) map[string]string { return map[string]string{ JSCreatedVersionMetadataKey: VERSION, - JSCreatedLevelMetadataKey: JSApiLevel, + JSCreatedLevelMetadataKey: strconv.Itoa(JSApiLevel), JSRequiredLevelMetadataKey: featureLevel, } } @@ -283,7 +284,7 @@ func TestJetStreamMetadataMutations(t *testing.T) { func validateMetadata(metadata map[string]string, expectedFeatureLevel string) bool { return metadata[JSCreatedVersionMetadataKey] == VERSION || - metadata[JSCreatedLevelMetadataKey] == JSApiLevel || + metadata[JSCreatedLevelMetadataKey] == strconv.Itoa(JSApiLevel) || metadata[JSRequiredLevelMetadataKey] == expectedFeatureLevel } From 285f3a65b621f8a81681138e7d24955e54ce4f18 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Thu, 5 Sep 2024 13:06:09 +0200 Subject: [PATCH 10/12] Fix format Signed-off-by: Maurice van Veen --- server/jetstream.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/jetstream.go b/server/jetstream.go index 3b9132be4f6..b0e22ef6eb7 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -467,7 +467,7 @@ func (s *Server) enableJetStream(cfg JetStreamConfig) error { s.Noticef(" TPM File: %q, Pcr: %d", opts.JetStreamTpm.KeysFile, opts.JetStreamTpm.Pcr) } - s.Noticef(" API Level: %s", JSApiLevel) + s.Noticef(" API Level: %d", JSApiLevel) s.Noticef("-------------------------------------------") // Setup our internal subscriptions. From b942af446a0516bf305b74f16e2c65f70724b0d4 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Fri, 6 Sep 2024 22:44:10 +0200 Subject: [PATCH 11/12] Manually construct empty backup Signed-off-by: Maurice van Veen --- server/jetstream_versioning_test.go | 76 +++++++++++------- .../restore_empty_R1F_stream/backup.json | 35 -------- .../restore_empty_R1F_stream/stream.tar.s2 | Bin 861 -> 0 bytes .../restore_empty_R3F_stream/backup.json | 35 -------- .../restore_empty_R3F_stream/stream.tar.s2 | Bin 861 -> 0 bytes 5 files changed, 47 insertions(+), 99 deletions(-) delete mode 100644 test/configs/jetstream/restore_empty_R1F_stream/backup.json delete mode 100644 test/configs/jetstream/restore_empty_R1F_stream/stream.tar.s2 delete mode 100644 test/configs/jetstream/restore_empty_R3F_stream/backup.json delete mode 100644 test/configs/jetstream/restore_empty_R3F_stream/stream.tar.s2 diff --git a/server/jetstream_versioning_test.go b/server/jetstream_versioning_test.go index ca77f22e3c4..c0a3f5d2676 100644 --- a/server/jetstream_versioning_test.go +++ b/server/jetstream_versioning_test.go @@ -17,10 +17,11 @@ package server import ( + "archive/tar" + "bytes" "encoding/json" "fmt" - "io" - "os" + "github.com/klauspost/compress/s2" "strconv" "testing" "time" @@ -357,8 +358,7 @@ func TestJetStreamMetadataStreamRestoreAndRestart(t *testing.T) { nc, js := jsClientConnect(t, s) defer nc.Close() - path := "../test/configs/jetstream/restore_empty_R1F_stream" - restoreStreamFromPath(t, nc, path) + restoreEmptyStream(t, nc, 1) // Stream restore should result in empty metadata to be preserved. si, err := js.StreamInfo(streamName) @@ -387,8 +387,7 @@ func TestJetStreamMetadataStreamRestoreAndRestartCluster(t *testing.T) { nc, js := jsClientConnect(t, c.randomServer()) defer nc.Close() - path := "../test/configs/jetstream/restore_empty_R3F_stream" - restoreStreamFromPath(t, nc, path) + restoreEmptyStream(t, nc, 3) // Stream restore should result in empty metadata to be preserved. si, err := js.StreamInfo(streamName) @@ -410,16 +409,17 @@ func TestJetStreamMetadataStreamRestoreAndRestartCluster(t *testing.T) { require_Equal(t, len(si.Config.Metadata), 0) } -func restoreStreamFromPath(t *testing.T, nc *nats.Conn, path string) { - var rreq JSApiStreamRestoreRequest - buf, err := os.ReadFile(fmt.Sprintf("%s/backup.json", path)) - require_NoError(t, err) - err = json.Unmarshal(buf, &rreq) - require_NoError(t, err) - - data, err := os.Open(fmt.Sprintf("%s/stream.tar.s2", path)) +func restoreEmptyStream(t *testing.T, nc *nats.Conn, replicas int) { + rreq := JSApiStreamRestoreRequest{ + Config: StreamConfig{ + Name: "STREAM", + Retention: LimitsPolicy, + Storage: FileStorage, + Replicas: replicas, + }, + } + buf, err := json.Marshal(rreq) require_NoError(t, err) - defer data.Close() var rresp JSApiStreamRestoreResponse msg, err := nc.Request(fmt.Sprintf(JSApiStreamRestoreT, rreq.Config.Name), buf, 5*time.Second) @@ -429,21 +429,39 @@ func restoreStreamFromPath(t *testing.T, nc *nats.Conn, path string) { t.Fatalf("Error on restore: %+v", rresp.Error) } - var chunk [1024]byte - for { - n, err := data.Read(chunk[:]) - if err == io.EOF { - break - } - require_NoError(t, err) - - msg, err = nc.Request(rresp.DeliverSubject, chunk[:n], 5*time.Second) - require_NoError(t, err) - json.Unmarshal(msg.Data, &rresp) - if rresp.Error != nil { - t.Fatalf("Error on restore: %+v", rresp.Error) - } + // Construct empty stream.tar.s2 (only containing meta.inf). + fsi := FileStreamInfo{StreamConfig: rreq.Config} + fsij, err := json.Marshal(fsi) + require_NoError(t, err) + + hdr := &tar.Header{ + Name: JetStreamMetaFile, + Mode: 0600, + Uname: "nats", + Gname: "nats", + Size: int64(len(fsij)), + Format: tar.FormatPAX, + } + var buffer bytes.Buffer + enc := s2.NewWriter(&buffer) + tw := tar.NewWriter(enc) + err = tw.WriteHeader(hdr) + require_NoError(t, err) + _, err = tw.Write(fsij) + require_NoError(t, err) + err = tw.Close() + require_NoError(t, err) + err = enc.Close() + require_NoError(t, err) + + data := buffer.Bytes() + msg, err = nc.Request(rresp.DeliverSubject, data, 5*time.Second) + require_NoError(t, err) + json.Unmarshal(msg.Data, &rresp) + if rresp.Error != nil { + t.Fatalf("Error on restore: %+v", rresp.Error) } + msg, err = nc.Request(rresp.DeliverSubject, nil, 5*time.Second) require_NoError(t, err) err = json.Unmarshal(msg.Data, &rresp) diff --git a/test/configs/jetstream/restore_empty_R1F_stream/backup.json b/test/configs/jetstream/restore_empty_R1F_stream/backup.json deleted file mode 100644 index ebc48e723c4..00000000000 --- a/test/configs/jetstream/restore_empty_R1F_stream/backup.json +++ /dev/null @@ -1,35 +0,0 @@ -{ - "config": { - "name": "STREAM", - "subjects": [ - "stream" - ], - "retention": "limits", - "max_consumers": -1, - "max_msgs_per_subject": -1, - "max_msgs": -1, - "max_bytes": -1, - "max_age": 0, - "max_msg_size": -1, - "storage": "file", - "discard": "old", - "num_replicas": 1, - "duplicate_window": 120000000000, - "sealed": false, - "deny_delete": false, - "deny_purge": false, - "allow_rollup_hdrs": false, - "allow_direct": true, - "mirror_direct": false, - "consumer_limits": {} - }, - "state": { - "messages": 0, - "bytes": 0, - "first_seq": 0, - "first_ts": "0001-01-01T00:00:00Z", - "last_seq": 0, - "last_ts": "0001-01-01T00:00:00Z", - "consumer_count": 0 - } -} \ No newline at end of file diff --git a/test/configs/jetstream/restore_empty_R1F_stream/stream.tar.s2 b/test/configs/jetstream/restore_empty_R1F_stream/stream.tar.s2 deleted file mode 100644 index a0e9234d609bdb7cd8cc5d767438ac54723b5b6f..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 861 zcmY+Czi-n(6vtmeLLC}tBI?1VL6t>Ok&KZ7hq$u&PR3T&TT#&AOo%@|!zfG?sP9KT$Mv|JO1ER) zWzwd{dBjx}wW%*;Eg1Egs}4=LEL?Y7=F4%dVaRv7d|BI@yFS4q-|u#u6_*QjLJvE~Za{qR22lorlW}7(cVI&o014P@t~CN;>)B8Jx@9kxdB81<7SBV+#}w z4#Ls)r2G&{Yq(_?CdD=Db*h-x-~$w7SmH3wDx-2(wpMkUQpMgLn1TFBDCDt)q z$gTu1i;raPgOVXwg@L?D5YnItnZZ=$rIJcPb}*YQ%Vh@h$$^ZZW diff --git a/test/configs/jetstream/restore_empty_R3F_stream/backup.json b/test/configs/jetstream/restore_empty_R3F_stream/backup.json deleted file mode 100644 index f9bdb2d49dc..00000000000 --- a/test/configs/jetstream/restore_empty_R3F_stream/backup.json +++ /dev/null @@ -1,35 +0,0 @@ -{ - "config": { - "name": "STREAM", - "subjects": [ - "stream" - ], - "retention": "limits", - "max_consumers": -1, - "max_msgs_per_subject": -1, - "max_msgs": -1, - "max_bytes": -1, - "max_age": 0, - "max_msg_size": -1, - "storage": "file", - "discard": "old", - "num_replicas": 3, - "duplicate_window": 120000000000, - "sealed": false, - "deny_delete": false, - "deny_purge": false, - "allow_rollup_hdrs": false, - "allow_direct": true, - "mirror_direct": false, - "consumer_limits": {} - }, - "state": { - "messages": 0, - "bytes": 0, - "first_seq": 0, - "first_ts": "0001-01-01T00:00:00Z", - "last_seq": 0, - "last_ts": "0001-01-01T00:00:00Z", - "consumer_count": 0 - } -} \ No newline at end of file diff --git a/test/configs/jetstream/restore_empty_R3F_stream/stream.tar.s2 b/test/configs/jetstream/restore_empty_R3F_stream/stream.tar.s2 deleted file mode 100644 index 2775b7461ce8640229e054747c0df9547fe90698..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 861 zcmY*XyKmD#82=Iy;y@sYS`RJ_iY%x=O49TBm6(@kp+Ln;ZRh|(#)&V*;77P~lm=7; zNQfDUftdjbF|op*0IX~*EC?h}5DQ`{=N1OW@9uZM*L@q?0hTdq4==!lZE(KW{&w#~ z`=YaUnm8_@tbt|&5<3kq>_Ist!cjnZ0LbSu2=X8bC4^K1vXIa01EeU1ra^VB{D=cr z&cXD6#SR68L^x8O7vWi~qG}L(0Xd?WSZnE~iS>rAtGZ$~Rc(g|mGXWGUcs%OBAmh# zY0i$2F&ZBg;2JWK4vJRb+MnhMohPei!%7z&{iGZ7~5R^j2B%E#U!l&3^k zAK$+t&r{;W#FZ@>Bdpa?vyNMBrD^H9r8FDZ)HT%9H1o1tlS3yU9JkzFI(hsYpV^>u zgLGpiTUTTjbDluHT9YY>Nf>)k$f3R$c>KO52hN(^jY2jEcyF>*SGJAxntn1LkXym(zdK}TsCvwm8Nn&ph*p&2r zuj_CLwI;iR%^@cC&69&ikqoM73l&6xdm<|ZH z=}{U{QA9n*XMASF@rf%I54a>8*)H*UTw@Xn{Q+H-Ci2$~(8%`(efzpgnK*U!UugSp z71o{g^{>n61rP*Y#;5bJ(t`bG*7sN?SONh|39hG;AD+Yc%v}ybm@h~^V<_K2kYEN5 z6V+^XNKfze^p@U5#CV-57bN%qC0-Vr#aUsRhb3zbx5HViac&L@3s5Rx199?hTHvTe zU>+aJJOCl1VG(i#RYORE4rF$w>Mw;<3bH%1*&1JFP@n$(F&sUAzpGfzO7IrA^ODl& q_&1gBd~?5S%>63v6@ Date: Sat, 7 Sep 2024 00:09:24 +0200 Subject: [PATCH 12/12] Lint Signed-off-by: Maurice van Veen --- server/jetstream_versioning_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/jetstream_versioning_test.go b/server/jetstream_versioning_test.go index c0a3f5d2676..6a87348041b 100644 --- a/server/jetstream_versioning_test.go +++ b/server/jetstream_versioning_test.go @@ -21,11 +21,12 @@ import ( "bytes" "encoding/json" "fmt" - "github.com/klauspost/compress/s2" "strconv" "testing" "time" + "github.com/klauspost/compress/s2" + "github.com/nats-io/nats.go" )