diff --git a/internal/impl/nats/integration_kv_test.go b/internal/impl/nats/integration_kv_test.go index affe637c2..f5a9d1002 100644 --- a/internal/impl/nats/integration_kv_test.go +++ b/internal/impl/nats/integration_kv_test.go @@ -151,6 +151,7 @@ cache_resources: require.NoError(t, err) m := service.NewMessage([]byte("hello")) + m.MetaSetMut("inherited", "world") return p.Process(context.Background(), m) } @@ -172,6 +173,12 @@ cache_resources: bytes, err := m.AsBytes() require.NoError(t, err) assert.Equal(t, []byte("lawblog"), bytes) + if v, ok := m.MetaGetMut("inherited"); ok { + assert.Equal(t, "world", v) + } else { + t.Error("inherited metadata not found") + } + }) t.Run("get_revision operation", func(t *testing.T) { @@ -193,6 +200,12 @@ cache_resources: bytes, err := m.AsBytes() require.NoError(t, err) assert.Equal(t, []byte("lawblog"), bytes) + if v, ok := m.MetaGetMut("inherited"); ok { + assert.Equal(t, "world", v) + } else { + t.Error("inherited metadata not found") + } + }) t.Run("create operation (success)", func(t *testing.T) { @@ -210,6 +223,12 @@ cache_resources: bytes, err := m.AsBytes() require.NoError(t, err) assert.Equal(t, []byte("hello"), bytes) + if v, ok := m.MetaGetMut("inherited"); ok { + assert.Equal(t, "world", v) + } else { + t.Error("inherited metadata not found") + } + }) t.Run("create operation (error)", func(t *testing.T) { diff --git a/internal/impl/nats/processor_kv.go b/internal/impl/nats/processor_kv.go index 8ef1775f2..85b49b1f8 100644 --- a/internal/impl/nats/processor_kv.go +++ b/internal/impl/nats/processor_kv.go @@ -217,7 +217,10 @@ func (p *kvProcessor) Process(ctx context.Context, msg *service.Message) (servic if err != nil { return nil, err } - return service.MessageBatch{newMessageFromKVEntry(entry)}, nil + m := msg.Copy() + p.addMetadataFromEntry(m, entry) + m.SetBytes(entry.Value()) + return service.MessageBatch{m}, nil case kvpOperationGetRevision: revision, err := p.parseRevision(msg) @@ -228,7 +231,10 @@ func (p *kvProcessor) Process(ctx context.Context, msg *service.Message) (servic if err != nil { return nil, err } - return service.MessageBatch{newMessageFromKVEntry(entry)}, nil + m := msg.Copy() + p.addMetadataFromEntry(m, entry) + m.SetBytes(entry.Value()) + return service.MessageBatch{m}, nil case kvpOperationCreate: revision, err := kv.Create(ctx, key, bytes) @@ -359,6 +365,15 @@ func (p *kvProcessor) addMetadata(msg *service.Message, key string, revision uin msg.MetaSetMut(metaKVOperation, operation.String()) } +func (p *kvProcessor) addMetadataFromEntry(msg *service.Message, entry jetstream.KeyValueEntry) { + msg.MetaSetMut(metaKVKey, entry.Key()) + msg.MetaSetMut(metaKVBucket, p.bucket) + msg.MetaSetMut(metaKVRevision, entry.Revision()) + msg.MetaSetMut(metaKVDelta, entry.Delta()) + msg.MetaSetMut(metaKVOperation, entry.Operation().String()) + msg.MetaSetMut(metaKVCreated, entry.Created()) +} + func (p *kvProcessor) Connect(ctx context.Context) (err error) { p.connMut.Lock() defer p.connMut.Unlock()