From d3ab9f83dfc65b0dde8a5975da55ad046ab2de35 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Fri, 30 Aug 2024 16:00:20 +0200 Subject: [PATCH 01/14] (2.11) ADR-44: JetStream Asset Versioning in Metadata Signed-off-by: Maurice van Veen --- server/jetstream.go | 1 + server/jetstream_api.go | 24 +- server/jetstream_cluster.go | 11 + server/jetstream_versioning.go | 99 +++++ server/jetstream_versioning_test.go | 386 ++++++++++++++++++ .../restore_empty_R1F_stream/backup.json | 35 ++ .../restore_empty_R1F_stream/stream.tar.s2 | Bin 0 -> 861 bytes .../restore_empty_R3F_stream/backup.json | 35 ++ .../restore_empty_R3F_stream/stream.tar.s2 | Bin 0 -> 861 bytes 9 files changed, 589 insertions(+), 2 deletions(-) create mode 100644 server/jetstream_versioning.go create mode 100644 server/jetstream_versioning_test.go create mode 100644 test/configs/jetstream/restore_empty_R1F_stream/backup.json create mode 100644 test/configs/jetstream/restore_empty_R1F_stream/stream.tar.s2 create mode 100644 test/configs/jetstream/restore_empty_R3F_stream/backup.json create mode 100644 test/configs/jetstream/restore_empty_R3F_stream/stream.tar.s2 diff --git a/server/jetstream.go b/server/jetstream.go index 3a986cccd4a..3b9132be4f6 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -467,6 +467,7 @@ func (s *Server) enableJetStream(cfg JetStreamConfig) error { s.Noticef(" TPM File: %q, Pcr: %d", opts.JetStreamTpm.KeysFile, opts.JetStreamTpm.Pcr) } + s.Noticef(" API Level: %s", JSApiLevel) s.Noticef("-------------------------------------------") // Setup our internal subscriptions. diff --git a/server/jetstream_api.go b/server/jetstream_api.go index f33c4b3953f..dbd93015d24 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1422,6 +1422,9 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account, return } + // Initialize asset version metadata. + setStreamAssetVersionMetadata(&cfg.StreamConfig, nil) + streamName := streamNameFromSubject(subject) if streamName != cfg.Name { resp.Error = NewJSStreamMismatchError() @@ -1557,6 +1560,9 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account, return } + // Update asset version metadata. + setStreamAssetVersionMetadata(&cfg, &mset.cfg) + if err := mset.updatePedantic(&cfg, ncfg.Pedantic); err != nil { resp.Error = NewJSStreamUpdateError(err, Unless(err)) s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) @@ -4028,12 +4034,17 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun return } - // If the consumer already exists then don't allow updating the PauseUntil, just set - // it back to whatever the current configured value is. + var oldCfg *ConsumerConfig if o := stream.lookupConsumer(consumerName); o != nil { + oldCfg = &o.cfg + // If the consumer already exists then don't allow updating the PauseUntil, just set + // it back to whatever the current configured value is. req.Config.PauseUntil = o.cfg.PauseUntil } + // Initialize/update asset version metadata. + setConsumerAssetVersionMetadata(&req.Config, oldCfg) + o, err := stream.addConsumerWithAction(&req.Config, req.Action, req.Pedantic) if err != nil { @@ -4589,6 +4600,11 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account } else { nca.Config.PauseUntil = nil } + + // Update asset version metadata due to updating pause/resume. + // Only PauseUntil is updated above, so reuse config for both. + setConsumerAssetVersionMetadata(nca.Config, nca.Config) + eca := encodeAddConsumerAssignment(&nca) cc.meta.Propose(eca) @@ -4622,6 +4638,10 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account ncfg.PauseUntil = nil } + // Update asset version metadata due to updating pause/resume. + // Only PauseUntil is updated above, so reuse config for both. + setConsumerAssetVersionMetadata(&ncfg, &ncfg) + if err := obs.updateConfig(&ncfg); err != nil { // The only type of error that should be returned here is from o.store, // so use a store failed error type. diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index a59ae021234..64f142e0942 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -6244,6 +6244,10 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) return } + + // Update asset version metadata. + setStreamAssetVersionMetadata(cfg, osa.Config) + var newCfg *StreamConfig if jsa := js.accounts[acc.Name]; jsa != nil { js.mu.Unlock() @@ -7296,6 +7300,13 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec } } + // Initialize/update asset version metadata. + var oldCfg *ConsumerConfig + if ca != nil { + oldCfg = ca.Config + } + setConsumerAssetVersionMetadata(cfg, oldCfg) + // If this is new consumer. if ca == nil { if action == ActionUpdate { diff --git a/server/jetstream_versioning.go b/server/jetstream_versioning.go new file mode 100644 index 00000000000..57ca0506509 --- /dev/null +++ b/server/jetstream_versioning.go @@ -0,0 +1,99 @@ +// Copyright 2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +const ( + // JSApiLevel is the maximum supported JetStream API level for this server. + JSApiLevel = "1" + + JSCreatedVersionMetadataKey = "_nats.created.server.version" + JSCreatedLevelMetadataKey = "_nats.created.server.api_level" + JSRequiredLevelMetadataKey = "_nats.server.require.api_level" +) + +// setStreamAssetVersionMetadata sets JetStream stream metadata, like the server version and API level. +// Given: +// - cfg!=nil, prevCfg==nil add stream: adds created and required metadata +// - cfg!=nil, prevCfg!=nil update stream: created metadata is preserved, required metadata is updated +func setStreamAssetVersionMetadata(cfg *StreamConfig, prevCfg *StreamConfig) { + if cfg.Metadata == nil { + cfg.Metadata = make(map[string]string) + } + + var prevMetadata map[string]string + if prevCfg != nil { + prevMetadata = prevCfg.Metadata + if prevMetadata == nil { + // Initialize to empty to indicate we had a previous config but metadata was missing. + prevMetadata = make(map[string]string) + } + } + preserveAssetCreatedVersionMetadata(cfg.Metadata, prevMetadata) + + requiredApiLevel := "0" + cfg.Metadata[JSRequiredLevelMetadataKey] = requiredApiLevel +} + +// setConsumerAssetVersionMetadata sets JetStream consumer metadata, like the server version and API level. +// Given: +// - cfg!=nil, prevCfg==nil add consumer: adds created and required metadata +// - cfg!=nil, prevCfg!=nil update consumer: created metadata is preserved, required metadata is updated +func setConsumerAssetVersionMetadata(cfg *ConsumerConfig, prevCfg *ConsumerConfig) { + if cfg.Metadata == nil { + cfg.Metadata = make(map[string]string) + } + + var prevMetadata map[string]string + if prevCfg != nil { + prevMetadata = prevCfg.Metadata + if prevMetadata == nil { + // Initialize to empty to indicate we had a previous config but metadata was missing. + prevMetadata = make(map[string]string) + } + } + preserveAssetCreatedVersionMetadata(cfg.Metadata, prevMetadata) + + requiredApiLevel := "0" + + // Added in 2.11, absent | zero is the feature is not used. + // one could be stricter and say even if its set but the time + // has already passed it is also not needed to restore the consumer + if cfg.PauseUntil != nil && !cfg.PauseUntil.IsZero() { + requiredApiLevel = "1" + } + + cfg.Metadata[JSRequiredLevelMetadataKey] = requiredApiLevel +} + +// preserveAssetCreatedVersionMetadata sets metadata to contain which version and API level the asset was created on. +// Preserves previous metadata, if not set it initializes versions for the metadata. +func preserveAssetCreatedVersionMetadata(metadata, prevMetadata map[string]string) { + if prevMetadata == nil { + metadata[JSCreatedVersionMetadataKey] = VERSION + metadata[JSCreatedLevelMetadataKey] = JSApiLevel + return + } + + // Preserve previous metadata if it was set, but delete if not since it could be user-provided. + if v := prevMetadata[JSCreatedVersionMetadataKey]; v != _EMPTY_ { + metadata[JSCreatedVersionMetadataKey] = v + } else { + delete(metadata, JSCreatedVersionMetadataKey) + } + if v := prevMetadata[JSCreatedLevelMetadataKey]; v != _EMPTY_ { + metadata[JSCreatedLevelMetadataKey] = v + } else { + delete(metadata, JSCreatedLevelMetadataKey) + } +} diff --git a/server/jetstream_versioning_test.go b/server/jetstream_versioning_test.go new file mode 100644 index 00000000000..2a4dbca1da1 --- /dev/null +++ b/server/jetstream_versioning_test.go @@ -0,0 +1,386 @@ +// Copyright 2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "encoding/json" + "fmt" + "github.com/nats-io/nats.go" + "io" + "os" + "testing" + "time" +) + +func metadataAllSet(featureLevel string) map[string]string { + return map[string]string{ + JSCreatedVersionMetadataKey: VERSION, + JSCreatedLevelMetadataKey: JSApiLevel, + JSRequiredLevelMetadataKey: featureLevel, + } +} + +func metadataPrevious() map[string]string { + return map[string]string{ + JSCreatedVersionMetadataKey: "previous-version", + JSCreatedLevelMetadataKey: "previous-level", + JSRequiredLevelMetadataKey: "previous-level", + } +} + +func metadataUpdatedPrevious(featureLevel string) map[string]string { + return map[string]string{ + JSCreatedVersionMetadataKey: "previous-version", + JSCreatedLevelMetadataKey: "previous-level", + JSRequiredLevelMetadataKey: featureLevel, + } +} + +func metadataEmpty() map[string]string { + return map[string]string{ + JSRequiredLevelMetadataKey: "0", + } +} + +func TestJetStreamSetStreamAssetVersionMetadata(t *testing.T) { + for _, test := range []struct { + desc string + cfg *StreamConfig + prev *StreamConfig + expectedMetadata map[string]string + }{ + { + desc: "create", + cfg: &StreamConfig{}, + prev: nil, + expectedMetadata: metadataAllSet("0"), + }, + { + desc: "create/overwrite-user-provided", + cfg: &StreamConfig{Metadata: metadataPrevious()}, + prev: nil, + expectedMetadata: metadataAllSet("0"), + }, + { + desc: "update", + cfg: &StreamConfig{}, + prev: &StreamConfig{Metadata: metadataPrevious()}, + expectedMetadata: metadataUpdatedPrevious("0"), + }, + { + desc: "update/empty-prev-metadata", + cfg: &StreamConfig{}, + prev: &StreamConfig{}, + expectedMetadata: metadataEmpty(), + }, + { + desc: "update/empty-prev-metadata/delete-user-provided", + cfg: &StreamConfig{Metadata: metadataPrevious()}, + prev: &StreamConfig{}, + expectedMetadata: metadataEmpty(), + }, + } { + t.Run(test.desc, func(t *testing.T) { + setStreamAssetVersionMetadata(test.cfg, test.prev) + require_Equal(t, test.cfg.Metadata[JSCreatedVersionMetadataKey], test.expectedMetadata[JSCreatedVersionMetadataKey]) + require_Equal(t, test.cfg.Metadata[JSCreatedLevelMetadataKey], test.expectedMetadata[JSCreatedLevelMetadataKey]) + require_Equal(t, test.cfg.Metadata[JSRequiredLevelMetadataKey], test.expectedMetadata[JSRequiredLevelMetadataKey]) + }) + } +} + +func TestJetStreamSetConsumerAssetVersionMetadata(t *testing.T) { + pauseUntil := time.Unix(0, 0) + pauseUntilZero := time.Time{} + for _, test := range []struct { + desc string + cfg *ConsumerConfig + prev *ConsumerConfig + expectedMetadata map[string]string + }{ + { + desc: "create", + cfg: &ConsumerConfig{}, + prev: nil, + expectedMetadata: metadataAllSet("0"), + }, + { + desc: "create/PauseUntil/zero", + cfg: &ConsumerConfig{PauseUntil: &pauseUntilZero}, + prev: nil, + expectedMetadata: metadataAllSet("0"), + }, + { + desc: "create/PauseUntil", + cfg: &ConsumerConfig{PauseUntil: &pauseUntil}, + prev: nil, + expectedMetadata: metadataAllSet("1"), + }, + { + desc: "create/overwrite-user-provided", + cfg: &ConsumerConfig{Metadata: metadataPrevious()}, + prev: nil, + expectedMetadata: metadataAllSet("0"), + }, + { + desc: "update", + cfg: &ConsumerConfig{}, + prev: &ConsumerConfig{Metadata: metadataPrevious()}, + expectedMetadata: metadataUpdatedPrevious("0"), + }, + { + desc: "create/PauseUntil/zero", + cfg: &ConsumerConfig{PauseUntil: &pauseUntilZero}, + prev: &ConsumerConfig{Metadata: metadataPrevious()}, + expectedMetadata: metadataUpdatedPrevious("0"), + }, + { + desc: "update/PauseUntil", + cfg: &ConsumerConfig{PauseUntil: &pauseUntil}, + prev: &ConsumerConfig{Metadata: metadataPrevious()}, + expectedMetadata: metadataUpdatedPrevious("1"), + }, + { + desc: "update/empty-prev-metadata", + cfg: &ConsumerConfig{}, + prev: &ConsumerConfig{}, + expectedMetadata: metadataEmpty(), + }, + { + desc: "update/empty-prev-metadata/delete-user-provided", + cfg: &ConsumerConfig{Metadata: metadataPrevious()}, + prev: &ConsumerConfig{}, + expectedMetadata: metadataEmpty(), + }, + } { + t.Run(test.desc, func(t *testing.T) { + setConsumerAssetVersionMetadata(test.cfg, test.prev) + require_Equal(t, test.cfg.Metadata[JSCreatedVersionMetadataKey], test.expectedMetadata[JSCreatedVersionMetadataKey]) + require_Equal(t, test.cfg.Metadata[JSCreatedLevelMetadataKey], test.expectedMetadata[JSCreatedLevelMetadataKey]) + require_Equal(t, test.cfg.Metadata[JSRequiredLevelMetadataKey], test.expectedMetadata[JSRequiredLevelMetadataKey]) + }) + } +} + +type server struct { + replicas int + js nats.JetStreamContext + nc *nats.Conn +} + +const ( + streamName = "STREAM" + consumerName = "CONSUMER" +) + +func TestJetStreamAssetVersionMetadataMutations(t *testing.T) { + single := RunBasicJetStreamServer(t) + defer single.Shutdown() + nc, js := jsClientConnect(t, single) + defer nc.Close() + + cluster := createJetStreamClusterExplicit(t, "R3S", 3) + defer cluster.shutdown() + cnc, cjs := jsClientConnect(t, cluster.randomServer()) + defer cnc.Close() + + // Test for both single server and clustered mode. + for _, s := range []server{ + {1, js, nc}, + {3, cjs, cnc}, + } { + t.Run(fmt.Sprintf("R%d", s.replicas), func(t *testing.T) { + streamAssetVersionChecks(t, s) + consumerAssetVersionChecks(t, s) + }) + } +} + +func streamAssetVersionChecks(t *testing.T, s server) { + // Add stream. + sc := nats.StreamConfig{Name: streamName, Replicas: s.replicas} + si, err := s.js.AddStream(&sc) + require_NoError(t, err) + require_Equal(t, si.Config.Metadata[JSCreatedVersionMetadataKey], VERSION) + require_Equal(t, si.Config.Metadata[JSCreatedLevelMetadataKey], JSApiLevel) + require_Equal(t, si.Config.Metadata[JSRequiredLevelMetadataKey], "0") + + // Stream info. + si, err = s.js.StreamInfo(streamName) + require_NoError(t, err) + require_Equal(t, si.Config.Metadata[JSCreatedVersionMetadataKey], VERSION) + require_Equal(t, si.Config.Metadata[JSCreatedLevelMetadataKey], JSApiLevel) + require_Equal(t, si.Config.Metadata[JSRequiredLevelMetadataKey], "0") + + // Update stream. + // Metadata set on creation should be preserved, even if not included in update. + si, err = s.js.UpdateStream(&sc) + require_NoError(t, err) + require_Equal(t, si.Config.Metadata[JSCreatedVersionMetadataKey], VERSION) + require_Equal(t, si.Config.Metadata[JSCreatedLevelMetadataKey], JSApiLevel) + require_Equal(t, si.Config.Metadata[JSRequiredLevelMetadataKey], "0") +} + +func consumerAssetVersionChecks(t *testing.T, s server) { + // Add consumer. + cc := nats.ConsumerConfig{Name: consumerName, Replicas: s.replicas} + ci, err := s.js.AddConsumer(streamName, &cc) + require_NoError(t, err) + require_Equal(t, ci.Config.Metadata[JSCreatedVersionMetadataKey], VERSION) + require_Equal(t, ci.Config.Metadata[JSCreatedLevelMetadataKey], JSApiLevel) + require_Equal(t, ci.Config.Metadata[JSRequiredLevelMetadataKey], "0") + + // Consumer info. + ci, err = s.js.ConsumerInfo(streamName, consumerName) + require_NoError(t, err) + require_Equal(t, ci.Config.Metadata[JSCreatedVersionMetadataKey], VERSION) + require_Equal(t, ci.Config.Metadata[JSCreatedLevelMetadataKey], JSApiLevel) + require_Equal(t, ci.Config.Metadata[JSRequiredLevelMetadataKey], "0") + + // Update consumer. + // Metadata set on creation should be preserved, even if not included in update. + ci, err = s.js.UpdateConsumer(streamName, &cc) + require_NoError(t, err) + require_Equal(t, ci.Config.Metadata[JSCreatedVersionMetadataKey], VERSION) + require_Equal(t, ci.Config.Metadata[JSCreatedLevelMetadataKey], JSApiLevel) + require_Equal(t, ci.Config.Metadata[JSRequiredLevelMetadataKey], "0") + + // Use pause advisories to know when pause/resume is applied. + pauseCh := make(chan *nats.Msg, 10) + _, err = s.nc.ChanSubscribe(JSAdvisoryConsumerPausePre+".STREAM.CONSUMER", pauseCh) + require_NoError(t, err) + + // Pause consumer, should up required API level. + jsTestPause_PauseConsumer(t, s.nc, streamName, consumerName, time.Now().Add(time.Second*3)) + require_ChanRead(t, pauseCh, time.Second*2) + require_Len(t, len(pauseCh), 0) + + ci, err = s.js.ConsumerInfo(streamName, consumerName) + require_NoError(t, err) + require_Equal(t, ci.Config.Metadata[JSCreatedVersionMetadataKey], VERSION) + require_Equal(t, ci.Config.Metadata[JSCreatedLevelMetadataKey], JSApiLevel) + require_Equal(t, ci.Config.Metadata[JSRequiredLevelMetadataKey], "1") + + // Unpause consumer, should lower required API level. + subj := fmt.Sprintf("$JS.API.CONSUMER.PAUSE.%s.%s", streamName, consumerName) + _, err = s.nc.Request(subj, nil, time.Second) + require_NoError(t, err) + require_ChanRead(t, pauseCh, time.Second*2) + require_Len(t, len(pauseCh), 0) + + ci, err = s.js.ConsumerInfo(streamName, consumerName) + require_NoError(t, err) + require_Equal(t, ci.Config.Metadata[JSCreatedVersionMetadataKey], VERSION) + require_Equal(t, ci.Config.Metadata[JSCreatedLevelMetadataKey], JSApiLevel) + require_Equal(t, ci.Config.Metadata[JSRequiredLevelMetadataKey], "0") +} + +func TestJetStreamAssetVersionMetadataStreamRestoreAndRestart(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + path := "../test/configs/jetstream/restore_empty_R1F_stream" + restoreStreamFromPath(t, nc, path) + + // Stream restore should result in empty metadata to be preserved. + si, err := js.StreamInfo(streamName) + require_NoError(t, err) + require_Equal(t, len(si.Config.Metadata), 0) + + // Restart server. + port := s.opts.Port + sd := s.StoreDir() + nc.Close() + s.Shutdown() + s = RunJetStreamServerOnPort(port, sd) + defer s.Shutdown() + nc, js = jsClientConnect(t, s) + defer nc.Close() + + // After restart (or upgrade) metadata data should remain empty. + si, err = js.StreamInfo(streamName) + require_NoError(t, err) + require_Equal(t, len(si.Config.Metadata), 0) +} + +func TestJetStreamAssetVersionMetadataStreamRestoreAndRestartCluster(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + path := "../test/configs/jetstream/restore_empty_R3F_stream" + restoreStreamFromPath(t, nc, path) + + // Stream restore should result in empty metadata to be preserved. + si, err := js.StreamInfo(streamName) + require_NoError(t, err) + require_Equal(t, len(si.Config.Metadata), 0) + + // Restart cluster. + c.stopAll() + c.restartAllSamePorts() + defer c.shutdown() + c.waitOnAllCurrent() + c.waitOnStreamLeader("$G", streamName) + nc, js = jsClientConnect(t, c.randomServer()) + defer nc.Close() + + // After restart (or upgrade) metadata data should remain empty. + si, err = js.StreamInfo(streamName) + require_NoError(t, err) + require_Equal(t, len(si.Config.Metadata), 0) +} + +func restoreStreamFromPath(t *testing.T, nc *nats.Conn, path string) { + var rreq JSApiStreamRestoreRequest + buf, err := os.ReadFile(fmt.Sprintf("%s/backup.json", path)) + require_NoError(t, err) + err = json.Unmarshal(buf, &rreq) + require_NoError(t, err) + + data, err := os.Open(fmt.Sprintf("%s/stream.tar.s2", path)) + require_NoError(t, err) + defer data.Close() + + var rresp JSApiStreamRestoreResponse + msg, err := nc.Request(fmt.Sprintf(JSApiStreamRestoreT, rreq.Config.Name), buf, 5*time.Second) + require_NoError(t, err) + json.Unmarshal(msg.Data, &rresp) + if rresp.Error != nil { + t.Fatalf("Error on restore: %+v", rresp.Error) + } + + var chunk [1024]byte + for { + n, err := data.Read(chunk[:]) + if err == io.EOF { + break + } + require_NoError(t, err) + + msg, err = nc.Request(rresp.DeliverSubject, chunk[:n], 5*time.Second) + require_NoError(t, err) + json.Unmarshal(msg.Data, &rresp) + if rresp.Error != nil { + t.Fatalf("Error on restore: %+v", rresp.Error) + } + } + msg, err = nc.Request(rresp.DeliverSubject, nil, 5*time.Second) + require_NoError(t, err) + err = json.Unmarshal(msg.Data, &rresp) + require_NoError(t, err) +} diff --git a/test/configs/jetstream/restore_empty_R1F_stream/backup.json b/test/configs/jetstream/restore_empty_R1F_stream/backup.json new file mode 100644 index 00000000000..ebc48e723c4 --- /dev/null +++ b/test/configs/jetstream/restore_empty_R1F_stream/backup.json @@ -0,0 +1,35 @@ +{ + "config": { + "name": "STREAM", + "subjects": [ + "stream" + ], + "retention": "limits", + "max_consumers": -1, + "max_msgs_per_subject": -1, + "max_msgs": -1, + "max_bytes": -1, + "max_age": 0, + "max_msg_size": -1, + "storage": "file", + "discard": "old", + "num_replicas": 1, + "duplicate_window": 120000000000, + "sealed": false, + "deny_delete": false, + "deny_purge": false, + "allow_rollup_hdrs": false, + "allow_direct": true, + "mirror_direct": false, + "consumer_limits": {} + }, + "state": { + "messages": 0, + "bytes": 0, + "first_seq": 0, + "first_ts": "0001-01-01T00:00:00Z", + "last_seq": 0, + "last_ts": "0001-01-01T00:00:00Z", + "consumer_count": 0 + } +} \ No newline at end of file diff --git a/test/configs/jetstream/restore_empty_R1F_stream/stream.tar.s2 b/test/configs/jetstream/restore_empty_R1F_stream/stream.tar.s2 new file mode 100644 index 0000000000000000000000000000000000000000..a0e9234d609bdb7cd8cc5d767438ac54723b5b6f GIT binary patch literal 861 zcmY+Czi-n(6vtmeLLC}tBI?1VL6t>Ok&KZ7hq$u&PR3T&TT#&AOo%@|!zfG?sP9KT$Mv|JO1ER) zWzwd{dBjx}wW%*;Eg1Egs}4=LEL?Y7=F4%dVaRv7d|BI@yFS4q-|u#u6_*QjLJvE~Za{qR22lorlW}7(cVI&o014P@t~CN;>)B8Jx@9kxdB81<7SBV+#}w z4#Ls)r2G&{Yq(_?CdD=Db*h-x-~$w7SmH3wDx-2(wpMkUQpMgLn1TFBDCDt)q z$gTu1i;raPgOVXwg@L?D5YnItnZZ=$rIJcPb}*YQ%Vh@h$$^ZZW literal 0 HcmV?d00001 diff --git a/test/configs/jetstream/restore_empty_R3F_stream/backup.json b/test/configs/jetstream/restore_empty_R3F_stream/backup.json new file mode 100644 index 00000000000..f9bdb2d49dc --- /dev/null +++ b/test/configs/jetstream/restore_empty_R3F_stream/backup.json @@ -0,0 +1,35 @@ +{ + "config": { + "name": "STREAM", + "subjects": [ + "stream" + ], + "retention": "limits", + "max_consumers": -1, + "max_msgs_per_subject": -1, + "max_msgs": -1, + "max_bytes": -1, + "max_age": 0, + "max_msg_size": -1, + "storage": "file", + "discard": "old", + "num_replicas": 3, + "duplicate_window": 120000000000, + "sealed": false, + "deny_delete": false, + "deny_purge": false, + "allow_rollup_hdrs": false, + "allow_direct": true, + "mirror_direct": false, + "consumer_limits": {} + }, + "state": { + "messages": 0, + "bytes": 0, + "first_seq": 0, + "first_ts": "0001-01-01T00:00:00Z", + "last_seq": 0, + "last_ts": "0001-01-01T00:00:00Z", + "consumer_count": 0 + } +} \ No newline at end of file diff --git a/test/configs/jetstream/restore_empty_R3F_stream/stream.tar.s2 b/test/configs/jetstream/restore_empty_R3F_stream/stream.tar.s2 new file mode 100644 index 0000000000000000000000000000000000000000..2775b7461ce8640229e054747c0df9547fe90698 GIT binary patch literal 861 zcmY*XyKmD#82=Iy;y@sYS`RJ_iY%x=O49TBm6(@kp+Ln;ZRh|(#)&V*;77P~lm=7; zNQfDUftdjbF|op*0IX~*EC?h}5DQ`{=N1OW@9uZM*L@q?0hTdq4==!lZE(KW{&w#~ z`=YaUnm8_@tbt|&5<3kq>_Ist!cjnZ0LbSu2=X8bC4^K1vXIa01EeU1ra^VB{D=cr z&cXD6#SR68L^x8O7vWi~qG}L(0Xd?WSZnE~iS>rAtGZ$~Rc(g|mGXWGUcs%OBAmh# zY0i$2F&ZBg;2JWK4vJRb+MnhMohPei!%7z&{iGZ7~5R^j2B%E#U!l&3^k zAK$+t&r{;W#FZ@>Bdpa?vyNMBrD^H9r8FDZ)HT%9H1o1tlS3yU9JkzFI(hsYpV^>u zgLGpiTUTTjbDluHT9YY>Nf>)k$f3R$c>KO52hN(^jY2jEcyF>*SGJAxntn1LkXym(zdK}TsCvwm8Nn&ph*p&2r zuj_CLwI;iR%^@cC&69&ikqoM73l&6xdm<|ZH z=}{U{QA9n*XMASF@rf%I54a>8*)H*UTw@Xn{Q+H-Ci2$~(8%`(efzpgnK*U!UugSp z71o{g^{>n61rP*Y#;5bJ(t`bG*7sN?SONh|39hG;AD+Yc%v}ybm@h~^V<_K2kYEN5 z6V+^XNKfze^p@U5#CV-57bN%qC0-Vr#aUsRhb3zbx5HViac&L@3s5Rx199?hTHvTe zU>+aJJOCl1VG(i#RYORE4rF$w>Mw;<3bH%1*&1JFP@n$(F&sUAzpGfzO7IrA^ODl& q_&1gBd~?5S%>63v6@ Date: Fri, 30 Aug 2024 17:16:23 +0200 Subject: [PATCH 02/14] Lint imports Signed-off-by: Maurice van Veen --- server/jetstream_versioning_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/jetstream_versioning_test.go b/server/jetstream_versioning_test.go index 2a4dbca1da1..2b72acb047d 100644 --- a/server/jetstream_versioning_test.go +++ b/server/jetstream_versioning_test.go @@ -16,11 +16,12 @@ package server import ( "encoding/json" "fmt" - "github.com/nats-io/nats.go" "io" "os" "testing" "time" + + "github.com/nats-io/nats.go" ) func metadataAllSet(featureLevel string) map[string]string { From 7fa3b13a1f829746d439d7822226e5546ff47c19 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Fri, 30 Aug 2024 17:23:37 +0200 Subject: [PATCH 03/14] Fix for TestJetStreamClusterConsumerActions Signed-off-by: Maurice van Veen --- server/jetstream_cluster.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 64f142e0942..cc1f07379c2 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -7286,6 +7286,9 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec // if name was set by the user. if oname != _EMPTY_ { if ca = sa.consumers[oname]; ca != nil && !ca.deleted { + // Provided config might miss metadata, include from existing config. + setConsumerAssetVersionMetadata(cfg, ca.Config) + if action == ActionCreate && !reflect.DeepEqual(cfg, ca.Config) { resp.Error = NewJSConsumerAlreadyExistsError() s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp)) From 2c3f9984d57f455748187eb10cbc94a08810b2b5 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Fri, 30 Aug 2024 17:26:56 +0200 Subject: [PATCH 04/14] Skip JS versioning tests when running other tests from the server package Signed-off-by: Maurice van Veen --- server/jetstream_versioning_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/jetstream_versioning_test.go b/server/jetstream_versioning_test.go index 2b72acb047d..0bf9634aec6 100644 --- a/server/jetstream_versioning_test.go +++ b/server/jetstream_versioning_test.go @@ -11,6 +11,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build !skip_js_tests +// +build !skip_js_tests + package server import ( From 1b70d9cdb1c352e69a81eb75d08c60a5e0caf434 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Fri, 30 Aug 2024 18:07:12 +0200 Subject: [PATCH 05/14] More comprehensive fix when doing equality check on cfg and prevCfg Signed-off-by: Maurice van Veen --- server/jetstream_cluster.go | 4 +- server/jetstream_versioning.go | 37 +++++++++++++ server/jetstream_versioning_test.go | 80 +++++++++++++++++++++++++++-- 3 files changed, 114 insertions(+), 7 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index cc1f07379c2..a9863207a23 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -7286,8 +7286,8 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec // if name was set by the user. if oname != _EMPTY_ { if ca = sa.consumers[oname]; ca != nil && !ca.deleted { - // Provided config might miss metadata, include from existing config. - setConsumerAssetVersionMetadata(cfg, ca.Config) + // Provided config might miss metadata, copy from existing config. + copyConsumerAssetVersionMetadata(cfg, ca.Config) if action == ActionCreate && !reflect.DeepEqual(cfg, ca.Config) { resp.Error = NewJSConsumerAlreadyExistsError() diff --git a/server/jetstream_versioning.go b/server/jetstream_versioning.go index 57ca0506509..e0e995ac94d 100644 --- a/server/jetstream_versioning.go +++ b/server/jetstream_versioning.go @@ -76,6 +76,43 @@ func setConsumerAssetVersionMetadata(cfg *ConsumerConfig, prevCfg *ConsumerConfi cfg.Metadata[JSRequiredLevelMetadataKey] = requiredApiLevel } +// copyConsumerAssetVersionMetadata copies versioning fields from metadata of prevCfg into cfg. +// Removes versioning fields if no previous metadata, updates if set, and removes fields if it doesn't exist in prevCfg. +// +// Note: useful when doing equality checks on cfg and prevCfg, but ignoring any versioning metadata differences. +// MUST be followed up with a call to setConsumerAssetVersionMetadata to fix potentially lost metadata. +func copyConsumerAssetVersionMetadata(cfg *ConsumerConfig, prevCfg *ConsumerConfig) { + // Remove fields when no previous metadata. + if prevCfg == nil || prevCfg.Metadata == nil { + if cfg.Metadata != nil { + delete(cfg.Metadata, JSCreatedVersionMetadataKey) + delete(cfg.Metadata, JSCreatedLevelMetadataKey) + delete(cfg.Metadata, JSRequiredLevelMetadataKey) + if len(cfg.Metadata) == 0 { + cfg.Metadata = nil + } + } + return + } + + // Set if exists, delete otherwise. + setOrDeleteInMetadata(cfg, prevCfg, JSCreatedVersionMetadataKey) + setOrDeleteInMetadata(cfg, prevCfg, JSCreatedLevelMetadataKey) + setOrDeleteInMetadata(cfg, prevCfg, JSRequiredLevelMetadataKey) +} + +// setOrDeleteInMetadata sets field with key/value in metadata of cfg if set, deletes otherwise. +func setOrDeleteInMetadata(cfg *ConsumerConfig, prevCfg *ConsumerConfig, key string) { + if value, ok := prevCfg.Metadata[key]; ok { + if cfg.Metadata == nil { + cfg.Metadata = make(map[string]string) + } + cfg.Metadata[key] = value + } else { + delete(cfg.Metadata, key) + } +} + // preserveAssetCreatedVersionMetadata sets metadata to contain which version and API level the asset was created on. // Preserves previous metadata, if not set it initializes versions for the metadata. func preserveAssetCreatedVersionMetadata(metadata, prevMetadata map[string]string) { diff --git a/server/jetstream_versioning_test.go b/server/jetstream_versioning_test.go index 0bf9634aec6..e2659516324 100644 --- a/server/jetstream_versioning_test.go +++ b/server/jetstream_versioning_test.go @@ -51,7 +51,7 @@ func metadataUpdatedPrevious(featureLevel string) map[string]string { } } -func metadataEmpty() map[string]string { +func metadataOnlyRequired() map[string]string { return map[string]string{ JSRequiredLevelMetadataKey: "0", } @@ -86,13 +86,13 @@ func TestJetStreamSetStreamAssetVersionMetadata(t *testing.T) { desc: "update/empty-prev-metadata", cfg: &StreamConfig{}, prev: &StreamConfig{}, - expectedMetadata: metadataEmpty(), + expectedMetadata: metadataOnlyRequired(), }, { desc: "update/empty-prev-metadata/delete-user-provided", cfg: &StreamConfig{Metadata: metadataPrevious()}, prev: &StreamConfig{}, - expectedMetadata: metadataEmpty(), + expectedMetadata: metadataOnlyRequired(), }, } { t.Run(test.desc, func(t *testing.T) { @@ -159,13 +159,13 @@ func TestJetStreamSetConsumerAssetVersionMetadata(t *testing.T) { desc: "update/empty-prev-metadata", cfg: &ConsumerConfig{}, prev: &ConsumerConfig{}, - expectedMetadata: metadataEmpty(), + expectedMetadata: metadataOnlyRequired(), }, { desc: "update/empty-prev-metadata/delete-user-provided", cfg: &ConsumerConfig{Metadata: metadataPrevious()}, prev: &ConsumerConfig{}, - expectedMetadata: metadataEmpty(), + expectedMetadata: metadataOnlyRequired(), }, } { t.Run(test.desc, func(t *testing.T) { @@ -177,6 +177,76 @@ func TestJetStreamSetConsumerAssetVersionMetadata(t *testing.T) { } } +func TestJetStreamCopyConsumerAssetVersionMetadata(t *testing.T) { + for _, test := range []struct { + desc string + cfg *ConsumerConfig + prev *ConsumerConfig + }{ + { + desc: "no-previous-ignore", + cfg: &ConsumerConfig{Metadata: metadataAllSet("-1")}, + prev: nil, + }, + { + desc: "nil-previous-metadata-ignore", + cfg: &ConsumerConfig{Metadata: metadataAllSet("-1")}, + prev: &ConsumerConfig{Metadata: nil}, + }, + { + desc: "nil-current-metadata-ignore", + cfg: &ConsumerConfig{Metadata: nil}, + prev: &ConsumerConfig{Metadata: metadataPrevious()}, + }, + { + desc: "copy-previous", + cfg: &ConsumerConfig{Metadata: metadataAllSet("-1")}, + prev: &ConsumerConfig{Metadata: metadataPrevious()}, + }, + { + desc: "delete-missing-fields", + cfg: &ConsumerConfig{Metadata: metadataAllSet("-1")}, + prev: &ConsumerConfig{Metadata: make(map[string]string)}, + }, + } { + t.Run(test.desc, func(t *testing.T) { + copyConsumerAssetVersionMetadata(test.cfg, test.prev) + + var expectedMetadata map[string]string + if test.prev != nil { + expectedMetadata = test.prev.Metadata + } + + value, ok := expectedMetadata[JSCreatedVersionMetadataKey] + if ok { + require_Equal(t, test.cfg.Metadata[JSCreatedVersionMetadataKey], value) + } else { + // Key shouldn't exist. + _, ok = test.cfg.Metadata[JSCreatedVersionMetadataKey] + require_False(t, ok) + } + + value, ok = expectedMetadata[JSCreatedLevelMetadataKey] + if ok { + require_Equal(t, test.cfg.Metadata[JSCreatedLevelMetadataKey], value) + } else { + // Key shouldn't exist. + _, ok = test.cfg.Metadata[JSCreatedLevelMetadataKey] + require_False(t, ok) + } + + value, ok = expectedMetadata[JSRequiredLevelMetadataKey] + if ok { + require_Equal(t, test.cfg.Metadata[JSRequiredLevelMetadataKey], value) + } else { + // Key shouldn't exist. + _, ok = test.cfg.Metadata[JSRequiredLevelMetadataKey] + require_False(t, ok) + } + }) + } +} + type server struct { replicas int js nats.JetStreamContext From e34a60c7b20c986ce6c031f96d09ec13d784e61a Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Fri, 30 Aug 2024 19:26:50 +0200 Subject: [PATCH 06/14] Lint Signed-off-by: Maurice van Veen --- server/jetstream_versioning_test.go | 36 ++++++++++++++--------------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/server/jetstream_versioning_test.go b/server/jetstream_versioning_test.go index e2659516324..468cafac2e2 100644 --- a/server/jetstream_versioning_test.go +++ b/server/jetstream_versioning_test.go @@ -179,34 +179,34 @@ func TestJetStreamSetConsumerAssetVersionMetadata(t *testing.T) { func TestJetStreamCopyConsumerAssetVersionMetadata(t *testing.T) { for _, test := range []struct { - desc string - cfg *ConsumerConfig - prev *ConsumerConfig + desc string + cfg *ConsumerConfig + prev *ConsumerConfig }{ { - desc: "no-previous-ignore", - cfg: &ConsumerConfig{Metadata: metadataAllSet("-1")}, - prev: nil, + desc: "no-previous-ignore", + cfg: &ConsumerConfig{Metadata: metadataAllSet("-1")}, + prev: nil, }, { - desc: "nil-previous-metadata-ignore", - cfg: &ConsumerConfig{Metadata: metadataAllSet("-1")}, - prev: &ConsumerConfig{Metadata: nil}, + desc: "nil-previous-metadata-ignore", + cfg: &ConsumerConfig{Metadata: metadataAllSet("-1")}, + prev: &ConsumerConfig{Metadata: nil}, }, { - desc: "nil-current-metadata-ignore", - cfg: &ConsumerConfig{Metadata: nil}, - prev: &ConsumerConfig{Metadata: metadataPrevious()}, + desc: "nil-current-metadata-ignore", + cfg: &ConsumerConfig{Metadata: nil}, + prev: &ConsumerConfig{Metadata: metadataPrevious()}, }, { - desc: "copy-previous", - cfg: &ConsumerConfig{Metadata: metadataAllSet("-1")}, - prev: &ConsumerConfig{Metadata: metadataPrevious()}, + desc: "copy-previous", + cfg: &ConsumerConfig{Metadata: metadataAllSet("-1")}, + prev: &ConsumerConfig{Metadata: metadataPrevious()}, }, { - desc: "delete-missing-fields", - cfg: &ConsumerConfig{Metadata: metadataAllSet("-1")}, - prev: &ConsumerConfig{Metadata: make(map[string]string)}, + desc: "delete-missing-fields", + cfg: &ConsumerConfig{Metadata: metadataAllSet("-1")}, + prev: &ConsumerConfig{Metadata: make(map[string]string)}, }, } { t.Run(test.desc, func(t *testing.T) { From 491aab5e32c230d3603da806697876ee0fbd55f2 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Tue, 3 Sep 2024 13:42:20 +0200 Subject: [PATCH 07/14] Simplify validation Signed-off-by: Maurice van Veen --- server/jetstream_versioning_test.go | 38 +++++++++++------------------ 1 file changed, 14 insertions(+), 24 deletions(-) diff --git a/server/jetstream_versioning_test.go b/server/jetstream_versioning_test.go index 468cafac2e2..cd4bad3d1d0 100644 --- a/server/jetstream_versioning_test.go +++ b/server/jetstream_versioning_test.go @@ -281,29 +281,29 @@ func TestJetStreamAssetVersionMetadataMutations(t *testing.T) { } } +func validateMetadata(metadata map[string]string, expectedFeatureLevel string) bool { + return metadata[JSCreatedVersionMetadataKey] == VERSION || + metadata[JSCreatedLevelMetadataKey] == JSApiLevel || + metadata[JSRequiredLevelMetadataKey] == expectedFeatureLevel +} + func streamAssetVersionChecks(t *testing.T, s server) { // Add stream. sc := nats.StreamConfig{Name: streamName, Replicas: s.replicas} si, err := s.js.AddStream(&sc) require_NoError(t, err) - require_Equal(t, si.Config.Metadata[JSCreatedVersionMetadataKey], VERSION) - require_Equal(t, si.Config.Metadata[JSCreatedLevelMetadataKey], JSApiLevel) - require_Equal(t, si.Config.Metadata[JSRequiredLevelMetadataKey], "0") + require_True(t, validateMetadata(si.Config.Metadata, "0")) // Stream info. si, err = s.js.StreamInfo(streamName) require_NoError(t, err) - require_Equal(t, si.Config.Metadata[JSCreatedVersionMetadataKey], VERSION) - require_Equal(t, si.Config.Metadata[JSCreatedLevelMetadataKey], JSApiLevel) - require_Equal(t, si.Config.Metadata[JSRequiredLevelMetadataKey], "0") + require_True(t, validateMetadata(si.Config.Metadata, "0")) // Update stream. // Metadata set on creation should be preserved, even if not included in update. si, err = s.js.UpdateStream(&sc) require_NoError(t, err) - require_Equal(t, si.Config.Metadata[JSCreatedVersionMetadataKey], VERSION) - require_Equal(t, si.Config.Metadata[JSCreatedLevelMetadataKey], JSApiLevel) - require_Equal(t, si.Config.Metadata[JSRequiredLevelMetadataKey], "0") + require_True(t, validateMetadata(si.Config.Metadata, "0")) } func consumerAssetVersionChecks(t *testing.T, s server) { @@ -311,24 +311,18 @@ func consumerAssetVersionChecks(t *testing.T, s server) { cc := nats.ConsumerConfig{Name: consumerName, Replicas: s.replicas} ci, err := s.js.AddConsumer(streamName, &cc) require_NoError(t, err) - require_Equal(t, ci.Config.Metadata[JSCreatedVersionMetadataKey], VERSION) - require_Equal(t, ci.Config.Metadata[JSCreatedLevelMetadataKey], JSApiLevel) - require_Equal(t, ci.Config.Metadata[JSRequiredLevelMetadataKey], "0") + require_True(t, validateMetadata(ci.Config.Metadata, "0")) // Consumer info. ci, err = s.js.ConsumerInfo(streamName, consumerName) require_NoError(t, err) - require_Equal(t, ci.Config.Metadata[JSCreatedVersionMetadataKey], VERSION) - require_Equal(t, ci.Config.Metadata[JSCreatedLevelMetadataKey], JSApiLevel) - require_Equal(t, ci.Config.Metadata[JSRequiredLevelMetadataKey], "0") + require_True(t, validateMetadata(ci.Config.Metadata, "0")) // Update consumer. // Metadata set on creation should be preserved, even if not included in update. ci, err = s.js.UpdateConsumer(streamName, &cc) require_NoError(t, err) - require_Equal(t, ci.Config.Metadata[JSCreatedVersionMetadataKey], VERSION) - require_Equal(t, ci.Config.Metadata[JSCreatedLevelMetadataKey], JSApiLevel) - require_Equal(t, ci.Config.Metadata[JSRequiredLevelMetadataKey], "0") + require_True(t, validateMetadata(ci.Config.Metadata, "0")) // Use pause advisories to know when pause/resume is applied. pauseCh := make(chan *nats.Msg, 10) @@ -342,9 +336,7 @@ func consumerAssetVersionChecks(t *testing.T, s server) { ci, err = s.js.ConsumerInfo(streamName, consumerName) require_NoError(t, err) - require_Equal(t, ci.Config.Metadata[JSCreatedVersionMetadataKey], VERSION) - require_Equal(t, ci.Config.Metadata[JSCreatedLevelMetadataKey], JSApiLevel) - require_Equal(t, ci.Config.Metadata[JSRequiredLevelMetadataKey], "1") + require_True(t, validateMetadata(ci.Config.Metadata, "1")) // Unpause consumer, should lower required API level. subj := fmt.Sprintf("$JS.API.CONSUMER.PAUSE.%s.%s", streamName, consumerName) @@ -355,9 +347,7 @@ func consumerAssetVersionChecks(t *testing.T, s server) { ci, err = s.js.ConsumerInfo(streamName, consumerName) require_NoError(t, err) - require_Equal(t, ci.Config.Metadata[JSCreatedVersionMetadataKey], VERSION) - require_Equal(t, ci.Config.Metadata[JSCreatedLevelMetadataKey], JSApiLevel) - require_Equal(t, ci.Config.Metadata[JSRequiredLevelMetadataKey], "0") + require_True(t, validateMetadata(ci.Config.Metadata, "0")) } func TestJetStreamAssetVersionMetadataStreamRestoreAndRestart(t *testing.T) { From 768f8c210a8acc457dd005f3fe82c6e4b97fbc7e Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Mon, 2 Sep 2024 16:50:50 +0200 Subject: [PATCH 08/14] (2.11) ADR-44: JetStream Dynamic Metadata Signed-off-by: Maurice van Veen --- server/jetstream_api.go | 14 +-- server/jetstream_cluster.go | 12 +- server/jetstream_versioning.go | 57 ++++++++++ server/jetstream_versioning_test.go | 163 ++++++++++++++++++++++++++-- 4 files changed, 223 insertions(+), 23 deletions(-) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index dbd93015d24..e91aaa921bb 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1478,7 +1478,7 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account, resp.StreamInfo = &StreamInfo{ Created: mset.createdTime(), State: mset.state(), - Config: mset.config(), + Config: includeDynamicStreamAssetVersionMetadata(mset.config()), TimeStamp: time.Now().UTC(), Mirror: mset.mirrorInfo(), Sources: mset.sourcesInfo(), @@ -1572,7 +1572,7 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account, resp.StreamInfo = &StreamInfo{ Created: mset.createdTime(), State: mset.state(), - Config: mset.config(), + Config: includeDynamicStreamAssetVersionMetadata(mset.config()), Domain: s.getOpts().JetStreamDomain, Mirror: mset.mirrorInfo(), Sources: mset.sourcesInfo(), @@ -1955,7 +1955,7 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s resp.StreamInfo = &StreamInfo{ Created: mset.createdTime(), State: mset.stateWithDetail(details), - Config: config, + Config: includeDynamicStreamAssetVersionMetadata(config), Domain: s.getOpts().JetStreamDomain, Cluster: js.clusterInfo(mset.raftGroup()), Mirror: mset.mirrorInfo(), @@ -3594,7 +3594,7 @@ func (s *Server) processStreamRestore(ci *ClientInfo, acc *Account, cfg *StreamC resp.StreamInfo = &StreamInfo{ Created: mset.createdTime(), State: mset.state(), - Config: mset.config(), + Config: includeDynamicStreamAssetVersionMetadata(mset.config()), TimeStamp: time.Now().UTC(), } s.Noticef("Completed restore of %s for stream '%s > %s' in %v", @@ -4057,7 +4057,7 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } - resp.ConsumerInfo = o.initialInfo() + resp.ConsumerInfo = includeDynamicConsumerInfoVersionMetadata(o.initialInfo()) s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) if o.cfg.PauseUntil != nil && !o.cfg.PauseUntil.IsZero() && time.Now().Before(*o.cfg.PauseUntil) { @@ -4403,7 +4403,7 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account, Stream: ca.Stream, Name: ca.Name, Created: ca.Created, - Config: ca.Config, + Config: includeDynamicConsumerAssetVersionMetadata(ca.Config), TimeStamp: time.Now().UTC(), } b := s.jsonResponse(resp) @@ -4446,7 +4446,7 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account, return } - if resp.ConsumerInfo = obs.info(); resp.ConsumerInfo == nil { + if resp.ConsumerInfo = includeDynamicConsumerInfoVersionMetadata(obs.info()); resp.ConsumerInfo == nil { // This consumer returned nil which means it's closed. Respond with not found. resp.Error = NewJSConsumerNotFoundError() s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index a9863207a23..57d34457b36 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -3278,7 +3278,7 @@ func (js *jetStream) processStreamLeaderChange(mset *stream, isLeader bool) { resp.StreamInfo = &StreamInfo{ Created: mset.createdTime(), State: mset.state(), - Config: mset.config(), + Config: includeDynamicStreamAssetVersionMetadata(mset.config()), Cluster: js.clusterInfo(mset.raftGroup()), Sources: mset.sourcesInfo(), Mirror: mset.mirrorInfo(), @@ -3708,7 +3708,7 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss resp.StreamInfo = &StreamInfo{ Created: mset.createdTime(), State: mset.state(), - Config: mset.config(), + Config: includeDynamicStreamAssetVersionMetadata(mset.config()), Cluster: js.clusterInfo(mset.raftGroup()), Mirror: mset.mirrorInfo(), Sources: mset.sourcesInfo(), @@ -3775,7 +3775,7 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme resp.StreamInfo = &StreamInfo{ Created: mset.createdTime(), State: mset.state(), - Config: mset.config(), + Config: includeDynamicStreamAssetVersionMetadata(mset.config()), Cluster: js.clusterInfo(mset.raftGroup()), Sources: mset.sourcesInfo(), Mirror: mset.mirrorInfo(), @@ -4484,7 +4484,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state client, subject, reply := ca.Client, ca.Subject, ca.Reply js.mu.Unlock() var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}} - resp.ConsumerInfo = o.info() + resp.ConsumerInfo = includeDynamicConsumerInfoVersionMetadata(o.info()) s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) return } @@ -4522,7 +4522,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state js.mu.RUnlock() if !recovering { var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}} - resp.ConsumerInfo = o.info() + resp.ConsumerInfo = includeDynamicConsumerInfoVersionMetadata(o.info()) s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) } } @@ -5222,7 +5222,7 @@ func (js *jetStream) processConsumerLeaderChange(o *consumer, isLeader bool) err resp.Error = NewJSConsumerCreateError(err, Unless(err)) s.sendAPIErrResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) } else { - resp.ConsumerInfo = o.initialInfo() + resp.ConsumerInfo = includeDynamicConsumerInfoVersionMetadata(o.initialInfo()) s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) o.sendCreateAdvisory() } diff --git a/server/jetstream_versioning.go b/server/jetstream_versioning.go index e0e995ac94d..1a6337324c5 100644 --- a/server/jetstream_versioning.go +++ b/server/jetstream_versioning.go @@ -20,15 +20,21 @@ const ( JSCreatedVersionMetadataKey = "_nats.created.server.version" JSCreatedLevelMetadataKey = "_nats.created.server.api_level" JSRequiredLevelMetadataKey = "_nats.server.require.api_level" + JSServerVersionMetadataKey = "_nats.server.version" + JSServerLevelMetadataKey = "_nats.server.api_level" ) // setStreamAssetVersionMetadata sets JetStream stream metadata, like the server version and API level. // Given: // - cfg!=nil, prevCfg==nil add stream: adds created and required metadata // - cfg!=nil, prevCfg!=nil update stream: created metadata is preserved, required metadata is updated +// +// Any dynamic metadata is removed, it must not be stored and only be added for responses. func setStreamAssetVersionMetadata(cfg *StreamConfig, prevCfg *StreamConfig) { if cfg.Metadata == nil { cfg.Metadata = make(map[string]string) + } else { + deleteDynamicMetadata(cfg.Metadata) } var prevMetadata map[string]string @@ -45,13 +51,29 @@ func setStreamAssetVersionMetadata(cfg *StreamConfig, prevCfg *StreamConfig) { cfg.Metadata[JSRequiredLevelMetadataKey] = requiredApiLevel } +// includeDynamicStreamAssetVersionMetadata adds dynamic fields into the (copied) metadata. +func includeDynamicStreamAssetVersionMetadata(cfg StreamConfig) StreamConfig { + newCfg := cfg + newCfg.Metadata = make(map[string]string) + for key, value := range cfg.Metadata { + newCfg.Metadata[key] = value + } + newCfg.Metadata[JSServerVersionMetadataKey] = VERSION + newCfg.Metadata[JSServerLevelMetadataKey] = JSApiLevel + return newCfg +} + // setConsumerAssetVersionMetadata sets JetStream consumer metadata, like the server version and API level. // Given: // - cfg!=nil, prevCfg==nil add consumer: adds created and required metadata // - cfg!=nil, prevCfg!=nil update consumer: created metadata is preserved, required metadata is updated +// +// Any dynamic metadata is removed, it must not be stored and only be added for responses. func setConsumerAssetVersionMetadata(cfg *ConsumerConfig, prevCfg *ConsumerConfig) { if cfg.Metadata == nil { cfg.Metadata = make(map[string]string) + } else { + deleteDynamicMetadata(cfg.Metadata) } var prevMetadata map[string]string @@ -76,12 +98,41 @@ func setConsumerAssetVersionMetadata(cfg *ConsumerConfig, prevCfg *ConsumerConfi cfg.Metadata[JSRequiredLevelMetadataKey] = requiredApiLevel } +// includeDynamicConsumerAssetVersionMetadata adds dynamic fields into the (copied) metadata. +func includeDynamicConsumerAssetVersionMetadata(cfg *ConsumerConfig) *ConsumerConfig { + newCfg := *cfg + newCfg.Metadata = make(map[string]string) + for key, value := range cfg.Metadata { + newCfg.Metadata[key] = value + } + newCfg.Metadata[JSServerVersionMetadataKey] = VERSION + newCfg.Metadata[JSServerLevelMetadataKey] = JSApiLevel + return &newCfg +} + +// includeDynamicConsumerInfoVersionMetadata adds dynamic fields into the (copied) metadata. +func includeDynamicConsumerInfoVersionMetadata(info *ConsumerInfo) *ConsumerInfo { + if info == nil { + return nil + } + + newInfo := *info + cfg := includeDynamicConsumerAssetVersionMetadata(info.Config) + newInfo.Config = cfg + return &newInfo +} + // copyConsumerAssetVersionMetadata copies versioning fields from metadata of prevCfg into cfg. // Removes versioning fields if no previous metadata, updates if set, and removes fields if it doesn't exist in prevCfg. +// Any dynamic metadata is removed, it must not be stored and only be added for responses. // // Note: useful when doing equality checks on cfg and prevCfg, but ignoring any versioning metadata differences. // MUST be followed up with a call to setConsumerAssetVersionMetadata to fix potentially lost metadata. func copyConsumerAssetVersionMetadata(cfg *ConsumerConfig, prevCfg *ConsumerConfig) { + if cfg.Metadata != nil { + deleteDynamicMetadata(cfg.Metadata) + } + // Remove fields when no previous metadata. if prevCfg == nil || prevCfg.Metadata == nil { if cfg.Metadata != nil { @@ -134,3 +185,9 @@ func preserveAssetCreatedVersionMetadata(metadata, prevMetadata map[string]strin delete(metadata, JSCreatedLevelMetadataKey) } } + +// deleteDynamicMetadata deletes dynamic fields from the metadata. +func deleteDynamicMetadata(metadata map[string]string) { + delete(metadata, JSServerVersionMetadataKey) + delete(metadata, JSServerLevelMetadataKey) +} diff --git a/server/jetstream_versioning_test.go b/server/jetstream_versioning_test.go index cd4bad3d1d0..1766c976615 100644 --- a/server/jetstream_versioning_test.go +++ b/server/jetstream_versioning_test.go @@ -21,6 +21,7 @@ import ( "fmt" "io" "os" + "reflect" "testing" "time" @@ -104,6 +105,36 @@ func TestJetStreamSetStreamAssetVersionMetadata(t *testing.T) { } } +func TestJetStreamSetStreamAssetVersionMetadataRemoveDynamicFields(t *testing.T) { + dynamicMetadata := func() map[string]string { + return map[string]string{ + JSServerVersionMetadataKey: "dynamic-version", + JSServerLevelMetadataKey: "dynamic-version", + } + } + + cfg := StreamConfig{Metadata: dynamicMetadata()} + setStreamAssetVersionMetadata(&cfg, nil) + require_True(t, reflect.DeepEqual(cfg.Metadata, metadataAllSet("0"))) + + cfg = StreamConfig{Metadata: dynamicMetadata()} + prevCfg := StreamConfig{Metadata: metadataAllSet("0")} + setStreamAssetVersionMetadata(&cfg, &prevCfg) + require_True(t, reflect.DeepEqual(cfg.Metadata, metadataAllSet("0"))) +} + +func TestJetStreamIncludeDynamicStreamAssetVersionMetadata(t *testing.T) { + cfg := StreamConfig{Metadata: metadataAllSet("0")} + newCfg := includeDynamicStreamAssetVersionMetadata(cfg) + + // Only new metadata must contain dynamic fields. + metadata := metadataAllSet("0") + require_True(t, reflect.DeepEqual(cfg.Metadata, metadata)) + metadata[JSServerVersionMetadataKey] = VERSION + metadata[JSServerLevelMetadataKey] = JSApiLevel + require_True(t, reflect.DeepEqual(newCfg.Metadata, metadata)) +} + func TestJetStreamSetConsumerAssetVersionMetadata(t *testing.T) { pauseUntil := time.Unix(0, 0) pauseUntilZero := time.Time{} @@ -177,6 +208,51 @@ func TestJetStreamSetConsumerAssetVersionMetadata(t *testing.T) { } } +func TestJetStreamSetConsumerAssetVersionMetadataRemoveDynamicFields(t *testing.T) { + dynamicMetadata := func() map[string]string { + return map[string]string{ + JSServerVersionMetadataKey: "dynamic-version", + JSServerLevelMetadataKey: "dynamic-version", + } + } + + cfg := ConsumerConfig{Metadata: dynamicMetadata()} + setConsumerAssetVersionMetadata(&cfg, nil) + require_True(t, reflect.DeepEqual(cfg.Metadata, metadataAllSet("0"))) + + cfg = ConsumerConfig{Metadata: dynamicMetadata()} + prevCfg := ConsumerConfig{Metadata: metadataAllSet("0")} + setConsumerAssetVersionMetadata(&cfg, &prevCfg) + require_True(t, reflect.DeepEqual(cfg.Metadata, metadataAllSet("0"))) +} + +func TestJetStreamIncludeDynamicConsumerAssetVersionMetadata(t *testing.T) { + cfg := ConsumerConfig{Metadata: metadataAllSet("0")} + newCfg := includeDynamicConsumerAssetVersionMetadata(&cfg) + + // Only new metadata must contain dynamic fields. + metadata := metadataAllSet("0") + require_True(t, reflect.DeepEqual(cfg.Metadata, metadata)) + metadata[JSServerVersionMetadataKey] = VERSION + metadata[JSServerLevelMetadataKey] = JSApiLevel + require_True(t, reflect.DeepEqual(newCfg.Metadata, metadata)) +} + +func TestJetStreamIncludeDynamicConsumerInfoVersionMetadata(t *testing.T) { + ci := ConsumerInfo{Config: &ConsumerConfig{Metadata: metadataAllSet("0")}} + newCi := includeDynamicConsumerInfoVersionMetadata(&ci) + + // Configs should not equal, as that would mean we've overwritten the original ConsumerInfo. + require_False(t, reflect.DeepEqual(ci, newCi)) + + // Only new metadata must contain dynamic fields. + metadata := metadataAllSet("0") + require_True(t, reflect.DeepEqual(ci.Config.Metadata, metadata)) + metadata[JSServerVersionMetadataKey] = VERSION + metadata[JSServerLevelMetadataKey] = JSApiLevel + require_True(t, reflect.DeepEqual(newCi.Config.Metadata, metadata)) +} + func TestJetStreamCopyConsumerAssetVersionMetadata(t *testing.T) { for _, test := range []struct { desc string @@ -247,6 +323,24 @@ func TestJetStreamCopyConsumerAssetVersionMetadata(t *testing.T) { } } +func TestJetStreamCopyConsumerAssetVersionMetadataRemoveDynamicFields(t *testing.T) { + dynamicMetadata := func() map[string]string { + return map[string]string{ + JSServerVersionMetadataKey: "dynamic-version", + JSServerLevelMetadataKey: "dynamic-version", + } + } + + cfg := ConsumerConfig{Metadata: dynamicMetadata()} + copyConsumerAssetVersionMetadata(&cfg, nil) + require_Equal(t, len(cfg.Metadata), 0) + + cfg = ConsumerConfig{Metadata: dynamicMetadata()} + prevCfg := ConsumerConfig{Metadata: metadataAllSet("0")} + copyConsumerAssetVersionMetadata(&cfg, &prevCfg) + require_True(t, reflect.DeepEqual(cfg.Metadata, metadataAllSet("0"))) +} + type server struct { replicas int js nats.JetStreamContext @@ -284,7 +378,9 @@ func TestJetStreamAssetVersionMetadataMutations(t *testing.T) { func validateMetadata(metadata map[string]string, expectedFeatureLevel string) bool { return metadata[JSCreatedVersionMetadataKey] == VERSION || metadata[JSCreatedLevelMetadataKey] == JSApiLevel || - metadata[JSRequiredLevelMetadataKey] == expectedFeatureLevel + metadata[JSRequiredLevelMetadataKey] == expectedFeatureLevel || + metadata[JSServerVersionMetadataKey] == VERSION || + metadata[JSServerLevelMetadataKey] == JSApiLevel } func streamAssetVersionChecks(t *testing.T, s server) { @@ -294,6 +390,12 @@ func streamAssetVersionChecks(t *testing.T, s server) { require_NoError(t, err) require_True(t, validateMetadata(si.Config.Metadata, "0")) + // (Double) add stream, has different code path for clustered streams. + sc = nats.StreamConfig{Name: streamName, Replicas: s.replicas} + si, err = s.js.AddStream(&sc) + require_NoError(t, err) + require_True(t, validateMetadata(si.Config.Metadata, "0")) + // Stream info. si, err = s.js.StreamInfo(streamName) require_NoError(t, err) @@ -348,6 +450,29 @@ func consumerAssetVersionChecks(t *testing.T, s server) { ci, err = s.js.ConsumerInfo(streamName, consumerName) require_NoError(t, err) require_True(t, validateMetadata(ci.Config.Metadata, "0")) + + // Test scaling up/down. + if s.replicas == 3 { + // Scale down. + cc.Replicas = 1 + ci, err = s.js.UpdateConsumer(streamName, &cc) + require_NoError(t, err) + require_True(t, validateMetadata(ci.Config.Metadata, "0")) + + ci, err = s.js.ConsumerInfo(streamName, consumerName) + require_NoError(t, err) + require_True(t, validateMetadata(ci.Config.Metadata, "0")) + + // Scale up. + cc.Replicas = 3 + ci, err = s.js.UpdateConsumer(streamName, &cc) + require_NoError(t, err) + require_True(t, validateMetadata(ci.Config.Metadata, "0")) + + ci, err = s.js.ConsumerInfo(streamName, consumerName) + require_NoError(t, err) + require_True(t, validateMetadata(ci.Config.Metadata, "0")) + } } func TestJetStreamAssetVersionMetadataStreamRestoreAndRestart(t *testing.T) { @@ -359,10 +484,15 @@ func TestJetStreamAssetVersionMetadataStreamRestoreAndRestart(t *testing.T) { path := "../test/configs/jetstream/restore_empty_R1F_stream" restoreStreamFromPath(t, nc, path) - // Stream restore should result in empty metadata to be preserved. + expectedMetadata := map[string]string{ + JSServerVersionMetadataKey: VERSION, + JSServerLevelMetadataKey: JSApiLevel, + } + + // Stream restore should result in empty metadata to be preserved, only adding dynamic metadata. si, err := js.StreamInfo(streamName) require_NoError(t, err) - require_Equal(t, len(si.Config.Metadata), 0) + require_True(t, reflect.DeepEqual(si.Config.Metadata, expectedMetadata)) // Restart server. port := s.opts.Port @@ -374,10 +504,10 @@ func TestJetStreamAssetVersionMetadataStreamRestoreAndRestart(t *testing.T) { nc, js = jsClientConnect(t, s) defer nc.Close() - // After restart (or upgrade) metadata data should remain empty. + // After restart (or upgrade) metadata data should remain empty, only adding dynamic metadata. si, err = js.StreamInfo(streamName) require_NoError(t, err) - require_Equal(t, len(si.Config.Metadata), 0) + require_True(t, reflect.DeepEqual(si.Config.Metadata, expectedMetadata)) } func TestJetStreamAssetVersionMetadataStreamRestoreAndRestartCluster(t *testing.T) { @@ -389,10 +519,15 @@ func TestJetStreamAssetVersionMetadataStreamRestoreAndRestartCluster(t *testing. path := "../test/configs/jetstream/restore_empty_R3F_stream" restoreStreamFromPath(t, nc, path) - // Stream restore should result in empty metadata to be preserved. + expectedMetadata := map[string]string{ + JSServerVersionMetadataKey: VERSION, + JSServerLevelMetadataKey: JSApiLevel, + } + + // Stream restore should result in empty metadata to be preserved, only adding dynamic metadata. si, err := js.StreamInfo(streamName) require_NoError(t, err) - require_Equal(t, len(si.Config.Metadata), 0) + require_True(t, reflect.DeepEqual(si.Config.Metadata, expectedMetadata)) // Restart cluster. c.stopAll() @@ -403,10 +538,10 @@ func TestJetStreamAssetVersionMetadataStreamRestoreAndRestartCluster(t *testing. nc, js = jsClientConnect(t, c.randomServer()) defer nc.Close() - // After restart (or upgrade) metadata data should remain empty. + // After restart (or upgrade) metadata data should remain empty, only adding dynamic metadata. si, err = js.StreamInfo(streamName) require_NoError(t, err) - require_Equal(t, len(si.Config.Metadata), 0) + require_True(t, reflect.DeepEqual(si.Config.Metadata, expectedMetadata)) } func restoreStreamFromPath(t *testing.T, nc *nats.Conn, path string) { @@ -443,8 +578,16 @@ func restoreStreamFromPath(t *testing.T, nc *nats.Conn, path string) { t.Fatalf("Error on restore: %+v", rresp.Error) } } + + expectedMetadata := map[string]string{ + JSServerVersionMetadataKey: VERSION, + JSServerLevelMetadataKey: JSApiLevel, + } + msg, err = nc.Request(rresp.DeliverSubject, nil, 5*time.Second) require_NoError(t, err) - err = json.Unmarshal(msg.Data, &rresp) + var cresp JSApiStreamCreateResponse + err = json.Unmarshal(msg.Data, &cresp) require_NoError(t, err) + require_True(t, reflect.DeepEqual(cresp.Config.Metadata, expectedMetadata)) } From 7598505f110b6ec818504db50c9fa60a4926832b Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Thu, 5 Sep 2024 09:53:41 +0200 Subject: [PATCH 09/14] Remove extra copy Signed-off-by: Maurice van Veen --- server/jetstream_versioning.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/server/jetstream_versioning.go b/server/jetstream_versioning.go index 1a6337324c5..135dd2c3418 100644 --- a/server/jetstream_versioning.go +++ b/server/jetstream_versioning.go @@ -53,14 +53,14 @@ func setStreamAssetVersionMetadata(cfg *StreamConfig, prevCfg *StreamConfig) { // includeDynamicStreamAssetVersionMetadata adds dynamic fields into the (copied) metadata. func includeDynamicStreamAssetVersionMetadata(cfg StreamConfig) StreamConfig { - newCfg := cfg - newCfg.Metadata = make(map[string]string) - for key, value := range cfg.Metadata { - newCfg.Metadata[key] = value + prevMetadata := cfg.Metadata + cfg.Metadata = make(map[string]string) + for key, value := range prevMetadata { + cfg.Metadata[key] = value } - newCfg.Metadata[JSServerVersionMetadataKey] = VERSION - newCfg.Metadata[JSServerLevelMetadataKey] = JSApiLevel - return newCfg + cfg.Metadata[JSServerVersionMetadataKey] = VERSION + cfg.Metadata[JSServerLevelMetadataKey] = JSApiLevel + return cfg } // setConsumerAssetVersionMetadata sets JetStream consumer metadata, like the server version and API level. From 0f60da2f171be20f96204e3e8db3efee1602a601 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Thu, 5 Sep 2024 10:03:34 +0200 Subject: [PATCH 10/14] Shorten function names Signed-off-by: Maurice van Veen --- server/jetstream_api.go | 14 +++++++------- server/jetstream_cluster.go | 12 ++++++------ server/jetstream_versioning.go | 14 +++++++------- server/jetstream_versioning_test.go | 12 ++++++------ 4 files changed, 26 insertions(+), 26 deletions(-) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index e91aaa921bb..cece0241880 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1478,7 +1478,7 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account, resp.StreamInfo = &StreamInfo{ Created: mset.createdTime(), State: mset.state(), - Config: includeDynamicStreamAssetVersionMetadata(mset.config()), + Config: setDynamicStreamMetadata(mset.config()), TimeStamp: time.Now().UTC(), Mirror: mset.mirrorInfo(), Sources: mset.sourcesInfo(), @@ -1572,7 +1572,7 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account, resp.StreamInfo = &StreamInfo{ Created: mset.createdTime(), State: mset.state(), - Config: includeDynamicStreamAssetVersionMetadata(mset.config()), + Config: setDynamicStreamMetadata(mset.config()), Domain: s.getOpts().JetStreamDomain, Mirror: mset.mirrorInfo(), Sources: mset.sourcesInfo(), @@ -1955,7 +1955,7 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s resp.StreamInfo = &StreamInfo{ Created: mset.createdTime(), State: mset.stateWithDetail(details), - Config: includeDynamicStreamAssetVersionMetadata(config), + Config: setDynamicStreamMetadata(config), Domain: s.getOpts().JetStreamDomain, Cluster: js.clusterInfo(mset.raftGroup()), Mirror: mset.mirrorInfo(), @@ -3594,7 +3594,7 @@ func (s *Server) processStreamRestore(ci *ClientInfo, acc *Account, cfg *StreamC resp.StreamInfo = &StreamInfo{ Created: mset.createdTime(), State: mset.state(), - Config: includeDynamicStreamAssetVersionMetadata(mset.config()), + Config: setDynamicStreamMetadata(mset.config()), TimeStamp: time.Now().UTC(), } s.Noticef("Completed restore of %s for stream '%s > %s' in %v", @@ -4057,7 +4057,7 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } - resp.ConsumerInfo = includeDynamicConsumerInfoVersionMetadata(o.initialInfo()) + resp.ConsumerInfo = setDynamicConsumerInfoMetadata(o.initialInfo()) s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) if o.cfg.PauseUntil != nil && !o.cfg.PauseUntil.IsZero() && time.Now().Before(*o.cfg.PauseUntil) { @@ -4403,7 +4403,7 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account, Stream: ca.Stream, Name: ca.Name, Created: ca.Created, - Config: includeDynamicConsumerAssetVersionMetadata(ca.Config), + Config: setDynamicConsumerMetadata(ca.Config), TimeStamp: time.Now().UTC(), } b := s.jsonResponse(resp) @@ -4446,7 +4446,7 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account, return } - if resp.ConsumerInfo = includeDynamicConsumerInfoVersionMetadata(obs.info()); resp.ConsumerInfo == nil { + if resp.ConsumerInfo = setDynamicConsumerInfoMetadata(obs.info()); resp.ConsumerInfo == nil { // This consumer returned nil which means it's closed. Respond with not found. resp.Error = NewJSConsumerNotFoundError() s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 57d34457b36..ce47dca407f 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -3278,7 +3278,7 @@ func (js *jetStream) processStreamLeaderChange(mset *stream, isLeader bool) { resp.StreamInfo = &StreamInfo{ Created: mset.createdTime(), State: mset.state(), - Config: includeDynamicStreamAssetVersionMetadata(mset.config()), + Config: setDynamicStreamMetadata(mset.config()), Cluster: js.clusterInfo(mset.raftGroup()), Sources: mset.sourcesInfo(), Mirror: mset.mirrorInfo(), @@ -3708,7 +3708,7 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss resp.StreamInfo = &StreamInfo{ Created: mset.createdTime(), State: mset.state(), - Config: includeDynamicStreamAssetVersionMetadata(mset.config()), + Config: setDynamicStreamMetadata(mset.config()), Cluster: js.clusterInfo(mset.raftGroup()), Mirror: mset.mirrorInfo(), Sources: mset.sourcesInfo(), @@ -3775,7 +3775,7 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme resp.StreamInfo = &StreamInfo{ Created: mset.createdTime(), State: mset.state(), - Config: includeDynamicStreamAssetVersionMetadata(mset.config()), + Config: setDynamicStreamMetadata(mset.config()), Cluster: js.clusterInfo(mset.raftGroup()), Sources: mset.sourcesInfo(), Mirror: mset.mirrorInfo(), @@ -4484,7 +4484,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state client, subject, reply := ca.Client, ca.Subject, ca.Reply js.mu.Unlock() var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}} - resp.ConsumerInfo = includeDynamicConsumerInfoVersionMetadata(o.info()) + resp.ConsumerInfo = setDynamicConsumerInfoMetadata(o.info()) s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) return } @@ -4522,7 +4522,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state js.mu.RUnlock() if !recovering { var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}} - resp.ConsumerInfo = includeDynamicConsumerInfoVersionMetadata(o.info()) + resp.ConsumerInfo = setDynamicConsumerInfoMetadata(o.info()) s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) } } @@ -5222,7 +5222,7 @@ func (js *jetStream) processConsumerLeaderChange(o *consumer, isLeader bool) err resp.Error = NewJSConsumerCreateError(err, Unless(err)) s.sendAPIErrResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) } else { - resp.ConsumerInfo = includeDynamicConsumerInfoVersionMetadata(o.initialInfo()) + resp.ConsumerInfo = setDynamicConsumerInfoMetadata(o.initialInfo()) s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) o.sendCreateAdvisory() } diff --git a/server/jetstream_versioning.go b/server/jetstream_versioning.go index 135dd2c3418..2c0efcfed0a 100644 --- a/server/jetstream_versioning.go +++ b/server/jetstream_versioning.go @@ -51,8 +51,8 @@ func setStreamAssetVersionMetadata(cfg *StreamConfig, prevCfg *StreamConfig) { cfg.Metadata[JSRequiredLevelMetadataKey] = requiredApiLevel } -// includeDynamicStreamAssetVersionMetadata adds dynamic fields into the (copied) metadata. -func includeDynamicStreamAssetVersionMetadata(cfg StreamConfig) StreamConfig { +// setDynamicStreamMetadata adds dynamic fields into the (copied) metadata. +func setDynamicStreamMetadata(cfg StreamConfig) StreamConfig { prevMetadata := cfg.Metadata cfg.Metadata = make(map[string]string) for key, value := range prevMetadata { @@ -98,8 +98,8 @@ func setConsumerAssetVersionMetadata(cfg *ConsumerConfig, prevCfg *ConsumerConfi cfg.Metadata[JSRequiredLevelMetadataKey] = requiredApiLevel } -// includeDynamicConsumerAssetVersionMetadata adds dynamic fields into the (copied) metadata. -func includeDynamicConsumerAssetVersionMetadata(cfg *ConsumerConfig) *ConsumerConfig { +// setDynamicConsumerMetadata adds dynamic fields into the (copied) metadata. +func setDynamicConsumerMetadata(cfg *ConsumerConfig) *ConsumerConfig { newCfg := *cfg newCfg.Metadata = make(map[string]string) for key, value := range cfg.Metadata { @@ -110,14 +110,14 @@ func includeDynamicConsumerAssetVersionMetadata(cfg *ConsumerConfig) *ConsumerCo return &newCfg } -// includeDynamicConsumerInfoVersionMetadata adds dynamic fields into the (copied) metadata. -func includeDynamicConsumerInfoVersionMetadata(info *ConsumerInfo) *ConsumerInfo { +// setDynamicConsumerInfoMetadata adds dynamic fields into the (copied) metadata. +func setDynamicConsumerInfoMetadata(info *ConsumerInfo) *ConsumerInfo { if info == nil { return nil } newInfo := *info - cfg := includeDynamicConsumerAssetVersionMetadata(info.Config) + cfg := setDynamicConsumerMetadata(info.Config) newInfo.Config = cfg return &newInfo } diff --git a/server/jetstream_versioning_test.go b/server/jetstream_versioning_test.go index 1766c976615..89a82e2d74e 100644 --- a/server/jetstream_versioning_test.go +++ b/server/jetstream_versioning_test.go @@ -123,9 +123,9 @@ func TestJetStreamSetStreamAssetVersionMetadataRemoveDynamicFields(t *testing.T) require_True(t, reflect.DeepEqual(cfg.Metadata, metadataAllSet("0"))) } -func TestJetStreamIncludeDynamicStreamAssetVersionMetadata(t *testing.T) { +func TestJetStreamSetDynamicStreamMetadata(t *testing.T) { cfg := StreamConfig{Metadata: metadataAllSet("0")} - newCfg := includeDynamicStreamAssetVersionMetadata(cfg) + newCfg := setDynamicStreamMetadata(cfg) // Only new metadata must contain dynamic fields. metadata := metadataAllSet("0") @@ -226,9 +226,9 @@ func TestJetStreamSetConsumerAssetVersionMetadataRemoveDynamicFields(t *testing. require_True(t, reflect.DeepEqual(cfg.Metadata, metadataAllSet("0"))) } -func TestJetStreamIncludeDynamicConsumerAssetVersionMetadata(t *testing.T) { +func TestJetStreamSetDynamicConsumerMetadata(t *testing.T) { cfg := ConsumerConfig{Metadata: metadataAllSet("0")} - newCfg := includeDynamicConsumerAssetVersionMetadata(&cfg) + newCfg := setDynamicConsumerMetadata(&cfg) // Only new metadata must contain dynamic fields. metadata := metadataAllSet("0") @@ -238,9 +238,9 @@ func TestJetStreamIncludeDynamicConsumerAssetVersionMetadata(t *testing.T) { require_True(t, reflect.DeepEqual(newCfg.Metadata, metadata)) } -func TestJetStreamIncludeDynamicConsumerInfoVersionMetadata(t *testing.T) { +func TestJetStreamSetDynamicConsumerInfoMetadata(t *testing.T) { ci := ConsumerInfo{Config: &ConsumerConfig{Metadata: metadataAllSet("0")}} - newCi := includeDynamicConsumerInfoVersionMetadata(&ci) + newCi := setDynamicConsumerInfoMetadata(&ci) // Configs should not equal, as that would mean we've overwritten the original ConsumerInfo. require_False(t, reflect.DeepEqual(ci, newCi)) From 604cc0e20207f52f9602a15112a8c590a709b150 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Thu, 5 Sep 2024 10:09:50 +0200 Subject: [PATCH 11/14] Shorten function names Signed-off-by: Maurice van Veen --- server/jetstream_api.go | 10 +++++----- server/jetstream_cluster.go | 6 +++--- server/jetstream_versioning.go | 22 +++++++++++----------- server/jetstream_versioning_test.go | 26 +++++++++++++------------- 4 files changed, 32 insertions(+), 32 deletions(-) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index dbd93015d24..c56a529ec1e 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1423,7 +1423,7 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account, } // Initialize asset version metadata. - setStreamAssetVersionMetadata(&cfg.StreamConfig, nil) + setStaticStreamMetadata(&cfg.StreamConfig, nil) streamName := streamNameFromSubject(subject) if streamName != cfg.Name { @@ -1561,7 +1561,7 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account, } // Update asset version metadata. - setStreamAssetVersionMetadata(&cfg, &mset.cfg) + setStaticStreamMetadata(&cfg, &mset.cfg) if err := mset.updatePedantic(&cfg, ncfg.Pedantic); err != nil { resp.Error = NewJSStreamUpdateError(err, Unless(err)) @@ -4043,7 +4043,7 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun } // Initialize/update asset version metadata. - setConsumerAssetVersionMetadata(&req.Config, oldCfg) + setStaticConsumerMetadata(&req.Config, oldCfg) o, err := stream.addConsumerWithAction(&req.Config, req.Action, req.Pedantic) @@ -4603,7 +4603,7 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account // Update asset version metadata due to updating pause/resume. // Only PauseUntil is updated above, so reuse config for both. - setConsumerAssetVersionMetadata(nca.Config, nca.Config) + setStaticConsumerMetadata(nca.Config, nca.Config) eca := encodeAddConsumerAssignment(&nca) cc.meta.Propose(eca) @@ -4640,7 +4640,7 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account // Update asset version metadata due to updating pause/resume. // Only PauseUntil is updated above, so reuse config for both. - setConsumerAssetVersionMetadata(&ncfg, &ncfg) + setStaticConsumerMetadata(&ncfg, &ncfg) if err := obs.updateConfig(&ncfg); err != nil { // The only type of error that should be returned here is from o.store, diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index a9863207a23..16dfabc32b7 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -6246,7 +6246,7 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su } // Update asset version metadata. - setStreamAssetVersionMetadata(cfg, osa.Config) + setStaticStreamMetadata(cfg, osa.Config) var newCfg *StreamConfig if jsa := js.accounts[acc.Name]; jsa != nil { @@ -7287,7 +7287,7 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec if oname != _EMPTY_ { if ca = sa.consumers[oname]; ca != nil && !ca.deleted { // Provided config might miss metadata, copy from existing config. - copyConsumerAssetVersionMetadata(cfg, ca.Config) + copyConsumerMetadata(cfg, ca.Config) if action == ActionCreate && !reflect.DeepEqual(cfg, ca.Config) { resp.Error = NewJSConsumerAlreadyExistsError() @@ -7308,7 +7308,7 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec if ca != nil { oldCfg = ca.Config } - setConsumerAssetVersionMetadata(cfg, oldCfg) + setStaticConsumerMetadata(cfg, oldCfg) // If this is new consumer. if ca == nil { diff --git a/server/jetstream_versioning.go b/server/jetstream_versioning.go index e0e995ac94d..60b0658cc3f 100644 --- a/server/jetstream_versioning.go +++ b/server/jetstream_versioning.go @@ -22,11 +22,11 @@ const ( JSRequiredLevelMetadataKey = "_nats.server.require.api_level" ) -// setStreamAssetVersionMetadata sets JetStream stream metadata, like the server version and API level. +// setStaticStreamMetadata sets JetStream stream metadata, like the server version and API level. // Given: // - cfg!=nil, prevCfg==nil add stream: adds created and required metadata // - cfg!=nil, prevCfg!=nil update stream: created metadata is preserved, required metadata is updated -func setStreamAssetVersionMetadata(cfg *StreamConfig, prevCfg *StreamConfig) { +func setStaticStreamMetadata(cfg *StreamConfig, prevCfg *StreamConfig) { if cfg.Metadata == nil { cfg.Metadata = make(map[string]string) } @@ -39,17 +39,17 @@ func setStreamAssetVersionMetadata(cfg *StreamConfig, prevCfg *StreamConfig) { prevMetadata = make(map[string]string) } } - preserveAssetCreatedVersionMetadata(cfg.Metadata, prevMetadata) + preserveCreatedMetadata(cfg.Metadata, prevMetadata) requiredApiLevel := "0" cfg.Metadata[JSRequiredLevelMetadataKey] = requiredApiLevel } -// setConsumerAssetVersionMetadata sets JetStream consumer metadata, like the server version and API level. +// setStaticConsumerMetadata sets JetStream consumer metadata, like the server version and API level. // Given: // - cfg!=nil, prevCfg==nil add consumer: adds created and required metadata // - cfg!=nil, prevCfg!=nil update consumer: created metadata is preserved, required metadata is updated -func setConsumerAssetVersionMetadata(cfg *ConsumerConfig, prevCfg *ConsumerConfig) { +func setStaticConsumerMetadata(cfg *ConsumerConfig, prevCfg *ConsumerConfig) { if cfg.Metadata == nil { cfg.Metadata = make(map[string]string) } @@ -62,7 +62,7 @@ func setConsumerAssetVersionMetadata(cfg *ConsumerConfig, prevCfg *ConsumerConfi prevMetadata = make(map[string]string) } } - preserveAssetCreatedVersionMetadata(cfg.Metadata, prevMetadata) + preserveCreatedMetadata(cfg.Metadata, prevMetadata) requiredApiLevel := "0" @@ -76,12 +76,12 @@ func setConsumerAssetVersionMetadata(cfg *ConsumerConfig, prevCfg *ConsumerConfi cfg.Metadata[JSRequiredLevelMetadataKey] = requiredApiLevel } -// copyConsumerAssetVersionMetadata copies versioning fields from metadata of prevCfg into cfg. +// copyConsumerMetadata copies versioning fields from metadata of prevCfg into cfg. // Removes versioning fields if no previous metadata, updates if set, and removes fields if it doesn't exist in prevCfg. // // Note: useful when doing equality checks on cfg and prevCfg, but ignoring any versioning metadata differences. -// MUST be followed up with a call to setConsumerAssetVersionMetadata to fix potentially lost metadata. -func copyConsumerAssetVersionMetadata(cfg *ConsumerConfig, prevCfg *ConsumerConfig) { +// MUST be followed up with a call to setStaticConsumerMetadata to fix potentially lost metadata. +func copyConsumerMetadata(cfg *ConsumerConfig, prevCfg *ConsumerConfig) { // Remove fields when no previous metadata. if prevCfg == nil || prevCfg.Metadata == nil { if cfg.Metadata != nil { @@ -113,9 +113,9 @@ func setOrDeleteInMetadata(cfg *ConsumerConfig, prevCfg *ConsumerConfig, key str } } -// preserveAssetCreatedVersionMetadata sets metadata to contain which version and API level the asset was created on. +// preserveCreatedMetadata sets metadata to contain which version and API level the asset was created on. // Preserves previous metadata, if not set it initializes versions for the metadata. -func preserveAssetCreatedVersionMetadata(metadata, prevMetadata map[string]string) { +func preserveCreatedMetadata(metadata, prevMetadata map[string]string) { if prevMetadata == nil { metadata[JSCreatedVersionMetadataKey] = VERSION metadata[JSCreatedLevelMetadataKey] = JSApiLevel diff --git a/server/jetstream_versioning_test.go b/server/jetstream_versioning_test.go index cd4bad3d1d0..7b2b5ac0909 100644 --- a/server/jetstream_versioning_test.go +++ b/server/jetstream_versioning_test.go @@ -57,7 +57,7 @@ func metadataOnlyRequired() map[string]string { } } -func TestJetStreamSetStreamAssetVersionMetadata(t *testing.T) { +func TestJetStreamSetStaticStreamMetadata(t *testing.T) { for _, test := range []struct { desc string cfg *StreamConfig @@ -96,7 +96,7 @@ func TestJetStreamSetStreamAssetVersionMetadata(t *testing.T) { }, } { t.Run(test.desc, func(t *testing.T) { - setStreamAssetVersionMetadata(test.cfg, test.prev) + setStaticStreamMetadata(test.cfg, test.prev) require_Equal(t, test.cfg.Metadata[JSCreatedVersionMetadataKey], test.expectedMetadata[JSCreatedVersionMetadataKey]) require_Equal(t, test.cfg.Metadata[JSCreatedLevelMetadataKey], test.expectedMetadata[JSCreatedLevelMetadataKey]) require_Equal(t, test.cfg.Metadata[JSRequiredLevelMetadataKey], test.expectedMetadata[JSRequiredLevelMetadataKey]) @@ -104,7 +104,7 @@ func TestJetStreamSetStreamAssetVersionMetadata(t *testing.T) { } } -func TestJetStreamSetConsumerAssetVersionMetadata(t *testing.T) { +func TestJetStreamSetStaticConsumerMetadata(t *testing.T) { pauseUntil := time.Unix(0, 0) pauseUntilZero := time.Time{} for _, test := range []struct { @@ -169,7 +169,7 @@ func TestJetStreamSetConsumerAssetVersionMetadata(t *testing.T) { }, } { t.Run(test.desc, func(t *testing.T) { - setConsumerAssetVersionMetadata(test.cfg, test.prev) + setStaticConsumerMetadata(test.cfg, test.prev) require_Equal(t, test.cfg.Metadata[JSCreatedVersionMetadataKey], test.expectedMetadata[JSCreatedVersionMetadataKey]) require_Equal(t, test.cfg.Metadata[JSCreatedLevelMetadataKey], test.expectedMetadata[JSCreatedLevelMetadataKey]) require_Equal(t, test.cfg.Metadata[JSRequiredLevelMetadataKey], test.expectedMetadata[JSRequiredLevelMetadataKey]) @@ -177,7 +177,7 @@ func TestJetStreamSetConsumerAssetVersionMetadata(t *testing.T) { } } -func TestJetStreamCopyConsumerAssetVersionMetadata(t *testing.T) { +func TestJetStreamCopyConsumerMetadata(t *testing.T) { for _, test := range []struct { desc string cfg *ConsumerConfig @@ -210,7 +210,7 @@ func TestJetStreamCopyConsumerAssetVersionMetadata(t *testing.T) { }, } { t.Run(test.desc, func(t *testing.T) { - copyConsumerAssetVersionMetadata(test.cfg, test.prev) + copyConsumerMetadata(test.cfg, test.prev) var expectedMetadata map[string]string if test.prev != nil { @@ -258,7 +258,7 @@ const ( consumerName = "CONSUMER" ) -func TestJetStreamAssetVersionMetadataMutations(t *testing.T) { +func TestJetStreamMetadataMutations(t *testing.T) { single := RunBasicJetStreamServer(t) defer single.Shutdown() nc, js := jsClientConnect(t, single) @@ -275,8 +275,8 @@ func TestJetStreamAssetVersionMetadataMutations(t *testing.T) { {3, cjs, cnc}, } { t.Run(fmt.Sprintf("R%d", s.replicas), func(t *testing.T) { - streamAssetVersionChecks(t, s) - consumerAssetVersionChecks(t, s) + streamMetadataChecks(t, s) + consumerMetadataChecks(t, s) }) } } @@ -287,7 +287,7 @@ func validateMetadata(metadata map[string]string, expectedFeatureLevel string) b metadata[JSRequiredLevelMetadataKey] == expectedFeatureLevel } -func streamAssetVersionChecks(t *testing.T, s server) { +func streamMetadataChecks(t *testing.T, s server) { // Add stream. sc := nats.StreamConfig{Name: streamName, Replicas: s.replicas} si, err := s.js.AddStream(&sc) @@ -306,7 +306,7 @@ func streamAssetVersionChecks(t *testing.T, s server) { require_True(t, validateMetadata(si.Config.Metadata, "0")) } -func consumerAssetVersionChecks(t *testing.T, s server) { +func consumerMetadataChecks(t *testing.T, s server) { // Add consumer. cc := nats.ConsumerConfig{Name: consumerName, Replicas: s.replicas} ci, err := s.js.AddConsumer(streamName, &cc) @@ -350,7 +350,7 @@ func consumerAssetVersionChecks(t *testing.T, s server) { require_True(t, validateMetadata(ci.Config.Metadata, "0")) } -func TestJetStreamAssetVersionMetadataStreamRestoreAndRestart(t *testing.T) { +func TestJetStreamMetadataStreamRestoreAndRestart(t *testing.T) { s := RunBasicJetStreamServer(t) defer s.Shutdown() nc, js := jsClientConnect(t, s) @@ -380,7 +380,7 @@ func TestJetStreamAssetVersionMetadataStreamRestoreAndRestart(t *testing.T) { require_Equal(t, len(si.Config.Metadata), 0) } -func TestJetStreamAssetVersionMetadataStreamRestoreAndRestartCluster(t *testing.T) { +func TestJetStreamMetadataStreamRestoreAndRestartCluster(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() nc, js := jsClientConnect(t, c.randomServer()) From 5dd64fe35dae3d56c0232abbd286ea31630052cc Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Thu, 5 Sep 2024 10:24:48 +0200 Subject: [PATCH 12/14] Update test names Signed-off-by: Maurice van Veen --- server/jetstream_versioning_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/jetstream_versioning_test.go b/server/jetstream_versioning_test.go index de8ab80b63e..e478631df63 100644 --- a/server/jetstream_versioning_test.go +++ b/server/jetstream_versioning_test.go @@ -105,7 +105,7 @@ func TestJetStreamSetStaticStreamMetadata(t *testing.T) { } } -func TestJetStreamSetStreamAssetVersionMetadataRemoveDynamicFields(t *testing.T) { +func TestJetStreamSetStaticStreamMetadataRemoveDynamicFields(t *testing.T) { dynamicMetadata := func() map[string]string { return map[string]string{ JSServerVersionMetadataKey: "dynamic-version", @@ -208,7 +208,7 @@ func TestJetStreamSetStaticConsumerMetadata(t *testing.T) { } } -func TestJetStreamSetConsumerAssetVersionMetadataRemoveDynamicFields(t *testing.T) { +func TestJetStreamSetStaticConsumerMetadataRemoveDynamicFields(t *testing.T) { dynamicMetadata := func() map[string]string { return map[string]string{ JSServerVersionMetadataKey: "dynamic-version", @@ -323,7 +323,7 @@ func TestJetStreamCopyConsumerMetadata(t *testing.T) { } } -func TestJetStreamCopyConsumerAssetVersionMetadataRemoveDynamicFields(t *testing.T) { +func TestJetStreamCopyConsumerMetadataRemoveDynamicFields(t *testing.T) { dynamicMetadata := func() map[string]string { return map[string]string{ JSServerVersionMetadataKey: "dynamic-version", From ca9ec9513932da2bc1ec57d40dc2325f43d4c660 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Thu, 5 Sep 2024 12:52:20 +0200 Subject: [PATCH 13/14] Use int for JSApiLevel Signed-off-by: Maurice van Veen --- server/jetstream_versioning.go | 16 +++++++++------- server/jetstream_versioning_test.go | 5 +++-- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/server/jetstream_versioning.go b/server/jetstream_versioning.go index 60b0658cc3f..1c49468650e 100644 --- a/server/jetstream_versioning.go +++ b/server/jetstream_versioning.go @@ -13,9 +13,11 @@ package server +import "strconv" + const ( // JSApiLevel is the maximum supported JetStream API level for this server. - JSApiLevel = "1" + JSApiLevel int = 1 JSCreatedVersionMetadataKey = "_nats.created.server.version" JSCreatedLevelMetadataKey = "_nats.created.server.api_level" @@ -41,8 +43,8 @@ func setStaticStreamMetadata(cfg *StreamConfig, prevCfg *StreamConfig) { } preserveCreatedMetadata(cfg.Metadata, prevMetadata) - requiredApiLevel := "0" - cfg.Metadata[JSRequiredLevelMetadataKey] = requiredApiLevel + var requiredApiLevel int + cfg.Metadata[JSRequiredLevelMetadataKey] = strconv.Itoa(requiredApiLevel) } // setStaticConsumerMetadata sets JetStream consumer metadata, like the server version and API level. @@ -64,16 +66,16 @@ func setStaticConsumerMetadata(cfg *ConsumerConfig, prevCfg *ConsumerConfig) { } preserveCreatedMetadata(cfg.Metadata, prevMetadata) - requiredApiLevel := "0" + var requiredApiLevel int // Added in 2.11, absent | zero is the feature is not used. // one could be stricter and say even if its set but the time // has already passed it is also not needed to restore the consumer if cfg.PauseUntil != nil && !cfg.PauseUntil.IsZero() { - requiredApiLevel = "1" + requiredApiLevel = 1 } - cfg.Metadata[JSRequiredLevelMetadataKey] = requiredApiLevel + cfg.Metadata[JSRequiredLevelMetadataKey] = strconv.Itoa(requiredApiLevel) } // copyConsumerMetadata copies versioning fields from metadata of prevCfg into cfg. @@ -118,7 +120,7 @@ func setOrDeleteInMetadata(cfg *ConsumerConfig, prevCfg *ConsumerConfig, key str func preserveCreatedMetadata(metadata, prevMetadata map[string]string) { if prevMetadata == nil { metadata[JSCreatedVersionMetadataKey] = VERSION - metadata[JSCreatedLevelMetadataKey] = JSApiLevel + metadata[JSCreatedLevelMetadataKey] = strconv.Itoa(JSApiLevel) return } diff --git a/server/jetstream_versioning_test.go b/server/jetstream_versioning_test.go index 7b2b5ac0909..ca77f22e3c4 100644 --- a/server/jetstream_versioning_test.go +++ b/server/jetstream_versioning_test.go @@ -21,6 +21,7 @@ import ( "fmt" "io" "os" + "strconv" "testing" "time" @@ -30,7 +31,7 @@ import ( func metadataAllSet(featureLevel string) map[string]string { return map[string]string{ JSCreatedVersionMetadataKey: VERSION, - JSCreatedLevelMetadataKey: JSApiLevel, + JSCreatedLevelMetadataKey: strconv.Itoa(JSApiLevel), JSRequiredLevelMetadataKey: featureLevel, } } @@ -283,7 +284,7 @@ func TestJetStreamMetadataMutations(t *testing.T) { func validateMetadata(metadata map[string]string, expectedFeatureLevel string) bool { return metadata[JSCreatedVersionMetadataKey] == VERSION || - metadata[JSCreatedLevelMetadataKey] == JSApiLevel || + metadata[JSCreatedLevelMetadataKey] == strconv.Itoa(JSApiLevel) || metadata[JSRequiredLevelMetadataKey] == expectedFeatureLevel } From 5205a052a553e5160e6dff949f3c6bb9bce1aca5 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Fri, 6 Sep 2024 09:32:55 +0200 Subject: [PATCH 14/14] Use pointers for setDynamicStreamMetadata Signed-off-by: Maurice van Veen --- server/jetstream_api.go | 12 +++++++----- server/jetstream_cluster.go | 9 ++++++--- server/jetstream_versioning.go | 16 ++++++++-------- server/jetstream_versioning_test.go | 2 +- 4 files changed, 22 insertions(+), 17 deletions(-) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 111a1d234bc..263ded5b79f 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1475,10 +1475,11 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account, s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } + msetCfg := mset.config() resp.StreamInfo = &StreamInfo{ Created: mset.createdTime(), State: mset.state(), - Config: setDynamicStreamMetadata(mset.config()), + Config: *setDynamicStreamMetadata(&msetCfg), TimeStamp: time.Now().UTC(), Mirror: mset.mirrorInfo(), Sources: mset.sourcesInfo(), @@ -1569,10 +1570,11 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account, return } + msetCfg := mset.config() resp.StreamInfo = &StreamInfo{ Created: mset.createdTime(), State: mset.state(), - Config: setDynamicStreamMetadata(mset.config()), + Config: *setDynamicStreamMetadata(&msetCfg), Domain: s.getOpts().JetStreamDomain, Mirror: mset.mirrorInfo(), Sources: mset.sourcesInfo(), @@ -1951,11 +1953,10 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s } config := mset.config() - resp.StreamInfo = &StreamInfo{ Created: mset.createdTime(), State: mset.stateWithDetail(details), - Config: setDynamicStreamMetadata(config), + Config: *setDynamicStreamMetadata(&config), Domain: s.getOpts().JetStreamDomain, Cluster: js.clusterInfo(mset.raftGroup()), Mirror: mset.mirrorInfo(), @@ -3591,10 +3592,11 @@ func (s *Server) processStreamRestore(ci *ClientInfo, acc *Account, cfg *StreamC s.Warnf("Restore failed for %s for stream '%s > %s' in %v", friendlyBytes(int64(total)), streamName, acc.Name, end.Sub(start)) } else { + msetCfg := mset.config() resp.StreamInfo = &StreamInfo{ Created: mset.createdTime(), State: mset.state(), - Config: setDynamicStreamMetadata(mset.config()), + Config: *setDynamicStreamMetadata(&msetCfg), TimeStamp: time.Now().UTC(), } s.Noticef("Completed restore of %s for stream '%s > %s' in %v", diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 6e342839b04..a5bd993b7f1 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -3275,10 +3275,11 @@ func (js *jetStream) processStreamLeaderChange(mset *stream, isLeader bool) { resp.Error = NewJSStreamCreateError(err, Unless(err)) s.sendAPIErrResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) } else { + msetCfg := mset.config() resp.StreamInfo = &StreamInfo{ Created: mset.createdTime(), State: mset.state(), - Config: setDynamicStreamMetadata(mset.config()), + Config: *setDynamicStreamMetadata(&msetCfg), Cluster: js.clusterInfo(mset.raftGroup()), Sources: mset.sourcesInfo(), Mirror: mset.mirrorInfo(), @@ -3705,10 +3706,11 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss // Send our response. var resp = JSApiStreamUpdateResponse{ApiResponse: ApiResponse{Type: JSApiStreamUpdateResponseType}} + msetCfg := mset.config() resp.StreamInfo = &StreamInfo{ Created: mset.createdTime(), State: mset.state(), - Config: setDynamicStreamMetadata(mset.config()), + Config: *setDynamicStreamMetadata(&msetCfg), Cluster: js.clusterInfo(mset.raftGroup()), Mirror: mset.mirrorInfo(), Sources: mset.sourcesInfo(), @@ -3772,10 +3774,11 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme if !recovering { var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}} + msetCfg := mset.config() resp.StreamInfo = &StreamInfo{ Created: mset.createdTime(), State: mset.state(), - Config: setDynamicStreamMetadata(mset.config()), + Config: *setDynamicStreamMetadata(&msetCfg), Cluster: js.clusterInfo(mset.raftGroup()), Sources: mset.sourcesInfo(), Mirror: mset.mirrorInfo(), diff --git a/server/jetstream_versioning.go b/server/jetstream_versioning.go index b9cba082f49..ee2d3f55cd5 100644 --- a/server/jetstream_versioning.go +++ b/server/jetstream_versioning.go @@ -54,15 +54,15 @@ func setStaticStreamMetadata(cfg *StreamConfig, prevCfg *StreamConfig) { } // setDynamicStreamMetadata adds dynamic fields into the (copied) metadata. -func setDynamicStreamMetadata(cfg StreamConfig) StreamConfig { - prevMetadata := cfg.Metadata - cfg.Metadata = make(map[string]string) - for key, value := range prevMetadata { - cfg.Metadata[key] = value +func setDynamicStreamMetadata(cfg *StreamConfig) *StreamConfig { + newCfg := *cfg + newCfg.Metadata = make(map[string]string) + for key, value := range cfg.Metadata { + newCfg.Metadata[key] = value } - cfg.Metadata[JSServerVersionMetadataKey] = VERSION - cfg.Metadata[JSServerLevelMetadataKey] = strconv.Itoa(JSApiLevel) - return cfg + newCfg.Metadata[JSServerVersionMetadataKey] = VERSION + newCfg.Metadata[JSServerLevelMetadataKey] = strconv.Itoa(JSApiLevel) + return &newCfg } // setStaticConsumerMetadata sets JetStream consumer metadata, like the server version and API level. diff --git a/server/jetstream_versioning_test.go b/server/jetstream_versioning_test.go index 0724bfce3aa..f376bd4ea3d 100644 --- a/server/jetstream_versioning_test.go +++ b/server/jetstream_versioning_test.go @@ -126,7 +126,7 @@ func TestJetStreamSetStaticStreamMetadataRemoveDynamicFields(t *testing.T) { func TestJetStreamSetDynamicStreamMetadata(t *testing.T) { cfg := StreamConfig{Metadata: metadataAllSet("0")} - newCfg := setDynamicStreamMetadata(cfg) + newCfg := setDynamicStreamMetadata(&cfg) // Only new metadata must contain dynamic fields. metadata := metadataAllSet("0")