Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(2.11) ADR-44: JetStream Asset Versioning in Metadata #5850

Merged
merged 12 commits into from
Sep 7, 2024
1 change: 1 addition & 0 deletions server/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ func (s *Server) enableJetStream(cfg JetStreamConfig) error {
s.Noticef(" TPM File: %q, Pcr: %d", opts.JetStreamTpm.KeysFile,
opts.JetStreamTpm.Pcr)
}
s.Noticef(" API Level: %d", JSApiLevel)
s.Noticef("-------------------------------------------")

// Setup our internal subscriptions.
Expand Down
24 changes: 22 additions & 2 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1422,6 +1422,9 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account,
return
}

// Initialize asset version metadata.
setStaticStreamMetadata(&cfg.StreamConfig, nil)

streamName := streamNameFromSubject(subject)
if streamName != cfg.Name {
resp.Error = NewJSStreamMismatchError()
Expand Down Expand Up @@ -1557,6 +1560,9 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account,
return
}

// Update asset version metadata.
setStaticStreamMetadata(&cfg, &mset.cfg)

if err := mset.updatePedantic(&cfg, ncfg.Pedantic); err != nil {
resp.Error = NewJSStreamUpdateError(err, Unless(err))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
Expand Down Expand Up @@ -4028,12 +4034,17 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun
return
}

// If the consumer already exists then don't allow updating the PauseUntil, just set
// it back to whatever the current configured value is.
var oldCfg *ConsumerConfig
if o := stream.lookupConsumer(consumerName); o != nil {
oldCfg = &o.cfg
// If the consumer already exists then don't allow updating the PauseUntil, just set
// it back to whatever the current configured value is.
req.Config.PauseUntil = o.cfg.PauseUntil
}

// Initialize/update asset version metadata.
setStaticConsumerMetadata(&req.Config, oldCfg)

o, err := stream.addConsumerWithAction(&req.Config, req.Action, req.Pedantic)

if err != nil {
Expand Down Expand Up @@ -4589,6 +4600,11 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account
} else {
nca.Config.PauseUntil = nil
}

// Update asset version metadata due to updating pause/resume.
// Only PauseUntil is updated above, so reuse config for both.
setStaticConsumerMetadata(nca.Config, nca.Config)

eca := encodeAddConsumerAssignment(&nca)
cc.meta.Propose(eca)

Expand Down Expand Up @@ -4622,6 +4638,10 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account
ncfg.PauseUntil = nil
}

// Update asset version metadata due to updating pause/resume.
// Only PauseUntil is updated above, so reuse config for both.
setStaticConsumerMetadata(&ncfg, &ncfg)

if err := obs.updateConfig(&ncfg); err != nil {
// The only type of error that should be returned here is from o.store,
// so use a store failed error type.
Expand Down
14 changes: 14 additions & 0 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -6244,6 +6244,10 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
return
}

// Update asset version metadata.
setStaticStreamMetadata(cfg, osa.Config)

var newCfg *StreamConfig
if jsa := js.accounts[acc.Name]; jsa != nil {
js.mu.Unlock()
Expand Down Expand Up @@ -7282,6 +7286,9 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
// if name was set by the user.
if oname != _EMPTY_ {
if ca = sa.consumers[oname]; ca != nil && !ca.deleted {
// Provided config might miss metadata, copy from existing config.
copyConsumerMetadata(cfg, ca.Config)

if action == ActionCreate && !reflect.DeepEqual(cfg, ca.Config) {
resp.Error = NewJSConsumerAlreadyExistsError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
Expand All @@ -7296,6 +7303,13 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
}
}

// Initialize/update asset version metadata.
var oldCfg *ConsumerConfig
if ca != nil {
oldCfg = ca.Config
}
setStaticConsumerMetadata(cfg, oldCfg)

// If this is new consumer.
if ca == nil {
if action == ActionUpdate {
Expand Down
138 changes: 138 additions & 0 deletions server/jetstream_versioning.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright 2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import "strconv"

const (
// JSApiLevel is the maximum supported JetStream API level for this server.
JSApiLevel int = 1

JSCreatedVersionMetadataKey = "_nats.created.server.version"
JSCreatedLevelMetadataKey = "_nats.created.server.api_level"
JSRequiredLevelMetadataKey = "_nats.server.require.api_level"
)

// setStaticStreamMetadata sets JetStream stream metadata, like the server version and API level.
// Given:
// - cfg!=nil, prevCfg==nil add stream: adds created and required metadata
// - cfg!=nil, prevCfg!=nil update stream: created metadata is preserved, required metadata is updated
func setStaticStreamMetadata(cfg *StreamConfig, prevCfg *StreamConfig) {
if cfg.Metadata == nil {
cfg.Metadata = make(map[string]string)
}

var prevMetadata map[string]string
if prevCfg != nil {
prevMetadata = prevCfg.Metadata
if prevMetadata == nil {
// Initialize to empty to indicate we had a previous config but metadata was missing.
prevMetadata = make(map[string]string)
}
}
preserveCreatedMetadata(cfg.Metadata, prevMetadata)

var requiredApiLevel int
cfg.Metadata[JSRequiredLevelMetadataKey] = strconv.Itoa(requiredApiLevel)
}

// setStaticConsumerMetadata sets JetStream consumer metadata, like the server version and API level.
// Given:
// - cfg!=nil, prevCfg==nil add consumer: adds created and required metadata
// - cfg!=nil, prevCfg!=nil update consumer: created metadata is preserved, required metadata is updated
func setStaticConsumerMetadata(cfg *ConsumerConfig, prevCfg *ConsumerConfig) {
if cfg.Metadata == nil {
cfg.Metadata = make(map[string]string)
}

var prevMetadata map[string]string
if prevCfg != nil {
prevMetadata = prevCfg.Metadata
if prevMetadata == nil {
// Initialize to empty to indicate we had a previous config but metadata was missing.
prevMetadata = make(map[string]string)
}
}
preserveCreatedMetadata(cfg.Metadata, prevMetadata)

var requiredApiLevel int

// Added in 2.11, absent | zero is the feature is not used.
// one could be stricter and say even if its set but the time
// has already passed it is also not needed to restore the consumer
if cfg.PauseUntil != nil && !cfg.PauseUntil.IsZero() {
requiredApiLevel = 1
}

cfg.Metadata[JSRequiredLevelMetadataKey] = strconv.Itoa(requiredApiLevel)
}

// copyConsumerMetadata copies versioning fields from metadata of prevCfg into cfg.
// Removes versioning fields if no previous metadata, updates if set, and removes fields if it doesn't exist in prevCfg.
//
// Note: useful when doing equality checks on cfg and prevCfg, but ignoring any versioning metadata differences.
// MUST be followed up with a call to setStaticConsumerMetadata to fix potentially lost metadata.
func copyConsumerMetadata(cfg *ConsumerConfig, prevCfg *ConsumerConfig) {
// Remove fields when no previous metadata.
if prevCfg == nil || prevCfg.Metadata == nil {
if cfg.Metadata != nil {
delete(cfg.Metadata, JSCreatedVersionMetadataKey)
delete(cfg.Metadata, JSCreatedLevelMetadataKey)
delete(cfg.Metadata, JSRequiredLevelMetadataKey)
if len(cfg.Metadata) == 0 {
cfg.Metadata = nil
}
}
return
}

// Set if exists, delete otherwise.
setOrDeleteInMetadata(cfg, prevCfg, JSCreatedVersionMetadataKey)
setOrDeleteInMetadata(cfg, prevCfg, JSCreatedLevelMetadataKey)
setOrDeleteInMetadata(cfg, prevCfg, JSRequiredLevelMetadataKey)
}

// setOrDeleteInMetadata sets field with key/value in metadata of cfg if set, deletes otherwise.
func setOrDeleteInMetadata(cfg *ConsumerConfig, prevCfg *ConsumerConfig, key string) {
if value, ok := prevCfg.Metadata[key]; ok {
if cfg.Metadata == nil {
cfg.Metadata = make(map[string]string)
}
cfg.Metadata[key] = value
} else {
delete(cfg.Metadata, key)
}
}

// preserveCreatedMetadata sets metadata to contain which version and API level the asset was created on.
// Preserves previous metadata, if not set it initializes versions for the metadata.
func preserveCreatedMetadata(metadata, prevMetadata map[string]string) {
if prevMetadata == nil {
metadata[JSCreatedVersionMetadataKey] = VERSION
metadata[JSCreatedLevelMetadataKey] = strconv.Itoa(JSApiLevel)
return
}

// Preserve previous metadata if it was set, but delete if not since it could be user-provided.
if v := prevMetadata[JSCreatedVersionMetadataKey]; v != _EMPTY_ {
metadata[JSCreatedVersionMetadataKey] = v
} else {
delete(metadata, JSCreatedVersionMetadataKey)
}
if v := prevMetadata[JSCreatedLevelMetadataKey]; v != _EMPTY_ {
metadata[JSCreatedLevelMetadataKey] = v
} else {
delete(metadata, JSCreatedLevelMetadataKey)
}
}
Loading