Skip to content

Commit f5d181d

Browse files
committed
[ADDED] Publish header "Nats-Expected-Last-Subject-Sequence-Subject"
This change adds a new header "Nats-Expected-Last-Subject-Sequence-Subject" when when paired with "Nats-Expected-Last-Subject-Sequence" allows publishers to customize the subject used when the server enforces "Nats-Expected-Last-Subject-Sequence". Publishers can specify a alternative subject to be used that includes wildcards. Resolves #5280 Signed-off-by: Caleb Champlin <[email protected]>
1 parent fcff483 commit f5d181d

File tree

3 files changed

+126
-14
lines changed

3 files changed

+126
-14
lines changed

server/jetstream_cluster.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -7657,9 +7657,15 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
76577657
// Expected last sequence per subject.
76587658
// We can check for last sequence per subject but only if the expected seq <= lseq.
76597659
if seq, exists := getExpectedLastSeqPerSubject(hdr); exists && store != nil && seq > 0 && seq <= lseq {
7660+
// Allow override of the subject used for the check.
7661+
seqSubj := subject
7662+
if optSubj := getExpectedLastSeqPerSubjectForSubject(hdr); optSubj != _EMPTY_ {
7663+
seqSubj = optSubj
7664+
}
7665+
76607666
var smv StoreMsg
76617667
var fseq uint64
7662-
sm, err := store.LoadLastMsg(subject, &smv)
7668+
sm, err := store.LoadLastMsg(seqSubj, &smv)
76637669
if sm != nil {
76647670
fseq = sm.seq
76657671
}

server/jetstream_test.go

+94
Original file line numberDiff line numberDiff line change
@@ -11269,6 +11269,100 @@ func TestJetStreamLastSequenceBySubject(t *testing.T) {
1126911269
}
1127011270
}
1127111271

11272+
func TestJetStreamLastSequenceBySubjectWithSubject(t *testing.T) {
11273+
for _, st := range []StorageType{FileStorage, MemoryStorage} {
11274+
t.Run(st.String(), func(t *testing.T) {
11275+
c := createJetStreamClusterExplicit(t, "JSC", 3)
11276+
defer c.shutdown()
11277+
11278+
nc, js := jsClientConnect(t, c.randomServer())
11279+
defer nc.Close()
11280+
11281+
cfg := StreamConfig{
11282+
Name: "KV",
11283+
Subjects: []string{"kv.>"},
11284+
Storage: st,
11285+
Replicas: 3,
11286+
MaxMsgsPer: 1,
11287+
}
11288+
11289+
req, err := json.Marshal(cfg)
11290+
if err != nil {
11291+
t.Fatalf("Unexpected error: %v", err)
11292+
}
11293+
// Do manually for now.
11294+
m, err := nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second)
11295+
require_NoError(t, err)
11296+
si, err := js.StreamInfo("KV")
11297+
if err != nil {
11298+
t.Fatalf("Unexpected error: %v, respmsg: %q", err, string(m.Data))
11299+
}
11300+
if si == nil || si.Config.Name != "KV" {
11301+
t.Fatalf("StreamInfo is not correct %+v", si)
11302+
}
11303+
11304+
js.PublishAsync("kv.1.foo", []byte("1:1")) // Last is 1 for kv.1.foo; 1 for kv.1.*;
11305+
js.PublishAsync("kv.1.bar", []byte("1:2")) // Last is 2 for kv.1.bar; 2 for kv.1.*;
11306+
js.PublishAsync("kv.2.foo", []byte("2:1")) // Last is 3 for kv.2.foo; 3 for kv.2.*;
11307+
js.PublishAsync("kv.3.bar", []byte("3:1")) // Last is 4 for kv.3.bar; 4 for kv.3.*;
11308+
js.PublishAsync("kv.1.baz", []byte("1:3")) // Last is 5 for kv.1.baz; 5 for kv.1.*;
11309+
js.PublishAsync("kv.1.bar", []byte("1:4")) // Last is 6 for kv.1.baz; 6 for kv.1.*;
11310+
js.PublishAsync("kv.2.baz", []byte("2:2")) // Last is 7 for kv.2.baz; 7 for kv.2.*;
11311+
11312+
select {
11313+
case <-js.PublishAsyncComplete():
11314+
case <-time.After(time.Second):
11315+
t.Fatalf("Did not receive completion signal")
11316+
}
11317+
11318+
// Now make sure we get an error if the last sequence is not correct per subject.
11319+
pubAndCheck := func(subj, seq string, ok bool) {
11320+
t.Helper()
11321+
m := nats.NewMsg(subj)
11322+
m.Data = []byte("HELLO")
11323+
11324+
// Expect last to be seq.
11325+
m.Header.Set(JSExpectedLastSubjSeq, seq)
11326+
11327+
// Constrain the sequence restriction to a specific subject
11328+
// e.g. "kv.1.*" for kv.1.foo, kv.1.bar, kv.1.baz; kv.2.* for kv.2.foo, kv.2.baz; kv.3.* for kv.3.bar
11329+
filterSubject := fmt.Sprintf("%s.*", subj[:strings.LastIndex(subj, ".")])
11330+
m.Header.Set(JSExpectedLastSubjSeqSubj, filterSubject)
11331+
_, err := js.PublishMsg(m)
11332+
if ok && err != nil {
11333+
t.Fatalf("Unexpected error: %v", err)
11334+
}
11335+
if !ok && err == nil {
11336+
t.Fatalf("Expected to get an error and got none")
11337+
}
11338+
}
11339+
11340+
pubAndCheck("kv.1.foo", "0", false)
11341+
pubAndCheck("kv.1.bar", "0", false)
11342+
pubAndCheck("kv.1.xxx", "0", false)
11343+
pubAndCheck("kv.1.foo", "1", false)
11344+
pubAndCheck("kv.1.bar", "1", false)
11345+
pubAndCheck("kv.1.xxx", "1", false)
11346+
pubAndCheck("kv.2.foo", "1", false)
11347+
pubAndCheck("kv.2.bar", "1", false)
11348+
pubAndCheck("kv.2.xxx", "1", false)
11349+
pubAndCheck("kv.1.bar", "2", false)
11350+
pubAndCheck("kv.1.bar", "3", false)
11351+
pubAndCheck("kv.1.bar", "4", false)
11352+
pubAndCheck("kv.1.bar", "5", false)
11353+
pubAndCheck("kv.1.bar", "6", true) // Last is 8 for kv.1.bar; 8 for kv.1.*;
11354+
pubAndCheck("kv.1.baz", "2", false)
11355+
pubAndCheck("kv.1.bar", "7", false)
11356+
pubAndCheck("kv.1.xxx", "8", true) // Last is 9 for kv.1.xxx; 9 for kv.1.*;
11357+
pubAndCheck("kv.2.foo", "2", false)
11358+
pubAndCheck("kv.2.foo", "7", true) // Last is 10 for kv.2.foo; 10 for kv.2.*;
11359+
pubAndCheck("kv.xxx", "0", true) // Last is 11 for kv.xxx; 11 for kv.*;
11360+
pubAndCheck("kv.3.xxx", "4", true) // Last is 12 for kv.3.xxx; 12 for kv.3.*;
11361+
pubAndCheck("kv.3.xyz", "12", true) // Last is 13 for kv.3.xyz; 13 for kv.3.*;
11362+
})
11363+
}
11364+
}
11365+
1127211366
func TestJetStreamFilteredConsumersWithWiderFilter(t *testing.T) {
1127311367
s := RunBasicJetStreamServer(t)
1127411368
defer s.Shutdown()

server/stream.go

+25-13
Original file line numberDiff line numberDiff line change
@@ -334,18 +334,19 @@ const (
334334

335335
// Headers for published messages.
336336
const (
337-
JSMsgId = "Nats-Msg-Id"
338-
JSExpectedStream = "Nats-Expected-Stream"
339-
JSExpectedLastSeq = "Nats-Expected-Last-Sequence"
340-
JSExpectedLastSubjSeq = "Nats-Expected-Last-Subject-Sequence"
341-
JSExpectedLastMsgId = "Nats-Expected-Last-Msg-Id"
342-
JSStreamSource = "Nats-Stream-Source"
343-
JSLastConsumerSeq = "Nats-Last-Consumer"
344-
JSLastStreamSeq = "Nats-Last-Stream"
345-
JSConsumerStalled = "Nats-Consumer-Stalled"
346-
JSMsgRollup = "Nats-Rollup"
347-
JSMsgSize = "Nats-Msg-Size"
348-
JSResponseType = "Nats-Response-Type"
337+
JSMsgId = "Nats-Msg-Id"
338+
JSExpectedStream = "Nats-Expected-Stream"
339+
JSExpectedLastSeq = "Nats-Expected-Last-Sequence"
340+
JSExpectedLastSubjSeq = "Nats-Expected-Last-Subject-Sequence"
341+
JSExpectedLastSubjSeqSubj = "Nats-Expected-Last-Subject-Sequence-Subject"
342+
JSExpectedLastMsgId = "Nats-Expected-Last-Msg-Id"
343+
JSStreamSource = "Nats-Stream-Source"
344+
JSLastConsumerSeq = "Nats-Last-Consumer"
345+
JSLastStreamSeq = "Nats-Last-Stream"
346+
JSConsumerStalled = "Nats-Consumer-Stalled"
347+
JSMsgRollup = "Nats-Rollup"
348+
JSMsgSize = "Nats-Msg-Size"
349+
JSResponseType = "Nats-Response-Type"
349350
)
350351

351352
// Headers for republished messages and direct gets.
@@ -3978,6 +3979,11 @@ func getExpectedLastSeqPerSubject(hdr []byte) (uint64, bool) {
39783979
return uint64(parseInt64(bseq)), true
39793980
}
39803981

3982+
// Fast lookup of expected subject for the expected stream sequence per subject.
3983+
func getExpectedLastSeqPerSubjectForSubject(hdr []byte) string {
3984+
return string(getHeader(JSExpectedLastSubjSeqSubj, hdr))
3985+
}
3986+
39813987
// Signal if we are clustered. Will acquire rlock.
39823988
func (mset *stream) IsClustered() bool {
39833989
mset.mu.RLock()
@@ -4538,10 +4544,16 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
45384544
// Expected last sequence per subject.
45394545
// If we are clustered we have prechecked seq > 0.
45404546
if seq, exists := getExpectedLastSeqPerSubject(hdr); exists {
4547+
// Allow override of the subject used for the check.
4548+
seqSubj := subject
4549+
if optSubj := getExpectedLastSeqPerSubjectForSubject(hdr); optSubj != _EMPTY_ {
4550+
seqSubj = optSubj
4551+
}
4552+
45414553
// TODO(dlc) - We could make a new store func that does this all in one.
45424554
var smv StoreMsg
45434555
var fseq uint64
4544-
sm, err := store.LoadLastMsg(subject, &smv)
4556+
sm, err := store.LoadLastMsg(seqSubj, &smv)
45454557
if sm != nil {
45464558
fseq = sm.seq
45474559
}

0 commit comments

Comments
 (0)