From 0df9af33c75e84a541d821945095582e1852a0d4 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Mon, 24 Feb 2025 12:50:57 +0000 Subject: [PATCH 1/4] Update dependencies, minimum Go 1.23 Signed-off-by: Neil Twigg --- .travis.yml | 4 ---- go.mod | 8 ++++---- go.sum | 8 ++++---- 3 files changed, 8 insertions(+), 12 deletions(-) diff --git a/.travis.yml b/.travis.yml index eb03f2663b0..8a502709b9b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -9,7 +9,6 @@ go: # This should be quoted or use .x, but should not be unquoted. # Remember that a YAML bare float drops trailing zeroes. - "1.23.6" - - "1.22.12" go_import_path: github.com/nats-io/nats-server @@ -46,9 +45,6 @@ jobs: env: TEST_SUITE=srv_pkg_non_js_tests - name: "Run all tests from all other packages" env: TEST_SUITE=non_srv_pkg_tests - - name: "Compile with older Go release" - go: "1.21.x" - env: TEST_SUITE=build_only script: ./scripts/runTestsOnTravis.sh $TEST_SUITE diff --git a/go.mod b/go.mod index d89f8013655..ae1f8ac4f7b 100644 --- a/go.mod +++ b/go.mod @@ -1,18 +1,18 @@ module github.com/nats-io/nats-server/v2 -go 1.22.0 +go 1.23.0 -toolchain go1.22.8 +toolchain go1.23.6 require ( github.com/klauspost/compress v1.18.0 github.com/minio/highwayhash v1.0.3 github.com/nats-io/jwt/v2 v2.7.3 - github.com/nats-io/nats.go v1.39.0 + github.com/nats-io/nats.go v1.39.1 github.com/nats-io/nkeys v0.4.10 github.com/nats-io/nuid v1.0.1 go.uber.org/automaxprocs v1.6.0 - golang.org/x/crypto v0.33.0 + golang.org/x/crypto v0.34.0 golang.org/x/sys v0.30.0 golang.org/x/time v0.10.0 ) diff --git a/go.sum b/go.sum index 67bbd5ac398..fb6deedbe76 100644 --- a/go.sum +++ b/go.sum @@ -6,8 +6,8 @@ github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ= github.com/nats-io/jwt/v2 v2.7.3 h1:6bNPK+FXgBeAqdj4cYQ0F8ViHRbi7woQLq4W29nUAzE= github.com/nats-io/jwt/v2 v2.7.3/go.mod h1:GvkcbHhKquj3pkioy5put1wvPxs78UlZ7D/pY+BgZk4= -github.com/nats-io/nats.go v1.39.0 h1:2/yg2JQjiYYKLwDuBzV0FbB2sIV+eFNkEevlRi4n9lI= -github.com/nats-io/nats.go v1.39.0/go.mod h1:MgRb8oOdigA6cYpEPhXJuRVH6UE/V4jblJ2jQ27IXYM= +github.com/nats-io/nats.go v1.39.1 h1:oTkfKBmz7W047vRxV762M67ZdXeOtUgvbBaNoQ+3PPk= +github.com/nats-io/nats.go v1.39.1/go.mod h1:MgRb8oOdigA6cYpEPhXJuRVH6UE/V4jblJ2jQ27IXYM= github.com/nats-io/nkeys v0.4.10 h1:glmRrpCmYLHByYcePvnTBEAwawwapjCPMjy2huw20wc= github.com/nats-io/nkeys v0.4.10/go.mod h1:OjRrnIKnWBFl+s4YK5ChQfvHP2fxqZexrKJoVVyWB3U= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= @@ -20,8 +20,8 @@ github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMT github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= -golang.org/x/crypto v0.33.0 h1:IOBPskki6Lysi0lo9qQvbxiQ+FvsCC/YWOecCHAixus= -golang.org/x/crypto v0.33.0/go.mod h1:bVdXmD7IV/4GdElGPozy6U7lWdRXA4qyRVGJV57uQ5M= +golang.org/x/crypto v0.34.0 h1:+/C6tk6rf/+t5DhUketUbD1aNGqiSX3j15Z6xuIDlBA= +golang.org/x/crypto v0.34.0/go.mod h1:dy7dXNW32cAb/6/PRuTNsix8T+vJAqvuIy5Bli/x0YQ= golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= From 7243f427ab2c8b16479540b88ccfade738744bf3 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 24 Feb 2025 11:50:31 -0500 Subject: [PATCH 2/4] [IMPROVED] Adjusted stalled producers to not be as penalizing. (#6568) Fixes #5394 Signed-off-by: Derek Collison --- server/client.go | 49 +++++++++++++++++++++++------------- server/client_test.go | 26 ------------------- server/norace_test.go | 58 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 90 insertions(+), 43 deletions(-) diff --git a/server/client.go b/server/client.go index 1556fcb49ee..e3724bc3e54 100644 --- a/server/client.go +++ b/server/client.go @@ -113,8 +113,9 @@ const ( maxNoRTTPingBeforeFirstPong = 2 * time.Second // For stalling fast producers - stallClientMinDuration = 100 * time.Millisecond - stallClientMaxDuration = time.Second + stallClientMinDuration = 2 * time.Millisecond + stallClientMaxDuration = 5 * time.Millisecond + stallTotalAllowed = 10 * time.Millisecond ) var readLoopReportThreshold = readLoopReport @@ -462,6 +463,9 @@ type readCache struct { // Capture the time we started processing our readLoop. start time.Time + + // Total time stalled so far for readLoop processing. + tst time.Duration } // set the flag (would be equivalent to set the boolean to true) @@ -1414,6 +1418,11 @@ func (c *client) readLoop(pre []byte) { } return } + // Clear total stalled time here. + if c.in.tst >= stallClientMaxDuration { + c.rateLimitFormatWarnf("Producer was stalled for a total of %v", c.in.tst.Round(time.Millisecond)) + } + c.in.tst = 0 } // If we are a ROUTER/LEAF and have processed an INFO, it is possible that @@ -1730,7 +1739,7 @@ func (c *client) flushOutbound() bool { // Check if we have a stalled gate and if so and we are recovering release // any stalled producers. Only kind==CLIENT will stall. - if c.out.stc != nil && (n == attempted || c.out.pb < c.out.mp/2) { + if c.out.stc != nil && (n == attempted || c.out.pb < c.out.mp/4*3) { close(c.out.stc) c.out.stc = nil } @@ -2292,7 +2301,8 @@ func (c *client) queueOutbound(data []byte) { // Check here if we should create a stall channel if we are falling behind. // We do this here since if we wait for consumer's writeLoop it could be // too late with large number of fan in producers. - if c.out.pb > c.out.mp/2 && c.out.stc == nil { + // If the outbound connection is > 75% of maximum pending allowed, create a stall gate. + if c.out.pb > c.out.mp/4*3 && c.out.stc == nil { c.out.stc = make(chan struct{}) } } @@ -3337,31 +3347,36 @@ func (c *client) msgHeader(subj, reply []byte, sub *subscription) []byte { } func (c *client) stalledWait(producer *client) { + // Check to see if we have exceeded our total wait time per readLoop invocation. + if producer.in.tst > stallTotalAllowed { + return + } + + // Grab stall channel which the slow consumer will close when caught up. stall := c.out.stc - ttl := stallDuration(c.out.pb, c.out.mp) + c.mu.Unlock() defer c.mu.Lock() + // Calculate stall time. + ttl := stallClientMinDuration + if c.out.pb >= c.out.mp { + ttl = stallClientMaxDuration + } + // Now check if we are close to total allowed. + if producer.in.tst+ttl > stallTotalAllowed { + ttl = stallTotalAllowed - producer.in.tst + } delay := time.NewTimer(ttl) defer delay.Stop() + start := time.Now() select { case <-stall: case <-delay.C: producer.Debugf("Timed out of fast producer stall (%v)", ttl) } -} - -func stallDuration(pb, mp int64) time.Duration { - ttl := stallClientMinDuration - if pb >= mp { - ttl = stallClientMaxDuration - } else if hmp := mp / 2; pb > hmp { - bsz := hmp / 10 - additional := int64(ttl) * ((pb - hmp) / bsz) - ttl += time.Duration(additional) - } - return ttl + producer.in.tst += time.Since(start) } // Used to treat maps as efficient set diff --git a/server/client_test.go b/server/client_test.go index 8de1a5bafba..abf0cb89502 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -2062,32 +2062,6 @@ func TestClientNoSlowConsumerIfConnectExpected(t *testing.T) { } } -func TestClientStalledDuration(t *testing.T) { - for _, test := range []struct { - name string - pb int64 - mp int64 - expectedTTL time.Duration - }{ - {"pb above mp", 110, 100, stallClientMaxDuration}, - {"pb equal mp", 100, 100, stallClientMaxDuration}, - {"pb below mp/2", 49, 100, stallClientMinDuration}, - {"pb equal mp/2", 50, 100, stallClientMinDuration}, - {"pb at 55% of mp", 55, 100, stallClientMinDuration + 1*stallClientMinDuration}, - {"pb at 60% of mp", 60, 100, stallClientMinDuration + 2*stallClientMinDuration}, - {"pb at 70% of mp", 70, 100, stallClientMinDuration + 4*stallClientMinDuration}, - {"pb at 80% of mp", 80, 100, stallClientMinDuration + 6*stallClientMinDuration}, - {"pb at 90% of mp", 90, 100, stallClientMinDuration + 8*stallClientMinDuration}, - {"pb at 99% of mp", 99, 100, stallClientMinDuration + 9*stallClientMinDuration}, - } { - t.Run(test.name, func(t *testing.T) { - if ttl := stallDuration(test.pb, test.mp); ttl != test.expectedTTL { - t.Fatalf("For pb=%v mp=%v, expected TTL to be %v, got %v", test.pb, test.mp, test.expectedTTL, ttl) - } - }) - } -} - func TestClientIPv6Address(t *testing.T) { opts := DefaultOptions() opts.Host = "0.0.0.0" diff --git a/server/norace_test.go b/server/norace_test.go index 8c9785f505c..9f58bd81b11 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -11502,3 +11502,61 @@ func TestNoRaceNoFastProducerStall(t *testing.T) { } wg.Wait() } + +func TestNoRaceProducerStallLimits(t *testing.T) { + tmpl := ` + listen: "127.0.0.1:-1" + ` + conf := createConfFile(t, []byte(tmpl)) + s, _ := RunServerWithConfig(conf) + defer s.Shutdown() + + ncSlow := natsConnect(t, s.ClientURL()) + defer ncSlow.Close() + natsSub(t, ncSlow, "foo", func(m *nats.Msg) { m.Respond([]byte("42")) }) + natsFlush(t, ncSlow) + + ncProd := natsConnect(t, s.ClientURL()) + defer ncProd.Close() + + cid, err := ncSlow.GetClientID() + require_NoError(t, err) + c := s.GetClient(cid) + require_True(t, c != nil) + + // Artificially set a stall channel on the subscriber. + c.mu.Lock() + c.out.stc = make(chan struct{}) + c.mu.Unlock() + + start := time.Now() + _, err = ncProd.Request("foo", []byte("HELLO"), time.Second) + elapsed := time.Since(start) + require_NoError(t, err) + + // This should have not cleared on its own but should have bettwen min and max pause. + require_True(t, elapsed >= stallClientMinDuration) + require_True(t, elapsed < stallClientMaxDuration) + + // Now test total maximum by loading up a bunch of requests and measuring the last one. + // Artificially set a stall channel again on the subscriber. + c.mu.Lock() + c.out.stc = make(chan struct{}) + // This will prevent us from clearing the stc. + c.out.pb = c.out.mp/4*3 + 100 + c.mu.Unlock() + + for i := 0; i < 10; i++ { + err = ncProd.PublishRequest("foo", "bar", []byte("HELLO")) + require_NoError(t, err) + } + start = time.Now() + _, err = ncProd.Request("foo", []byte("HELLO"), time.Second) + elapsed = time.Since(start) + require_NoError(t, err) + + require_True(t, elapsed >= stallTotalAllowed) + // Should always be close to totalAllowed (e.g. 10ms), but if you run alot of them in one go can bump up + // just past 12ms, hence the Max setting below to avoid a flapper. + require_True(t, elapsed < stallTotalAllowed+stallClientMaxDuration) +} From c53419e8c82a0df505c0e81e1f23dd1d934f8a34 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 24 Feb 2025 14:33:43 -0500 Subject: [PATCH 3/4] [FIXED] Preserve max delivered messages with Interest retention (#6575) Resolves https://github.com/nats-io/nats-server/issues/6538 If a consumer reached max deliveries for a message, it should preserve the redelivered state and allow inspecting its content. However, if a new consumer would be created and consume this message as well, it would still be removed under Interest retention. This PR fixes that by using the redelivered state to keep marking there's interest. Only downside is that the redelivered state gets cleaned up after a restart (this PR does not change/fix that). So if the consumer that had a max delivery message keeps acknowledging messages and its acknowledgement floor moves up, it would clean up the redelivered state below this ack floor. Honestly I feel like keeping messages around if max delivery is reached makes the code very complex. It would be a lot cleaner if we'd only have the acknowledgement floor, starting sequence, and pending messages in-between, not also redelivered state that can be below ack floor. It's not something we can change now I suppose, but I'd be in favor of having messages automatically be removed once max delivery is reached and all consumers have consumed the message. DLQ-style behavior would then be more explicitly (and reliably) handled by the client, for example by publishing into another stream and then TERM the message, instead of relying on advisories that could be missed. Signed-off-by: Maurice van Veen --- server/consumer.go | 44 ++++++------ server/jetstream_cluster_1_test.go | 2 +- server/jetstream_cluster_3_test.go | 2 +- server/jetstream_consumer_test.go | 53 +++++++++++++++ server/jetstream_test.go | 105 +++++++++++++++++++++++++++++ 5 files changed, 185 insertions(+), 21 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index dc5f7e58288..83f2f3ce83f 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1749,11 +1749,16 @@ func (o *consumer) hasMaxDeliveries(seq uint64) bool { if o.maxp > 0 && len(o.pending) >= o.maxp { o.signalNewMessages() } - // Cleanup our tracking. - delete(o.pending, seq) - if o.rdc != nil { - delete(o.rdc, seq) + // Make sure to remove from pending. + if p, ok := o.pending[seq]; ok && p != nil { + delete(o.pending, seq) + o.updateDelivered(p.Sequence, seq, dc, p.Timestamp) + } + // Ensure redelivered state is set, if not already. + if o.rdc == nil { + o.rdc = make(map[uint64]uint64) } + o.rdc[seq] = dc return true } return false @@ -2988,6 +2993,7 @@ func (o *consumer) needAck(sseq uint64, subj string) bool { var needAck bool var asflr, osseq uint64 var pending map[uint64]*Pending + var rdc map[uint64]uint64 o.mu.RLock() defer o.mu.RUnlock() @@ -3012,7 +3018,7 @@ func (o *consumer) needAck(sseq uint64, subj string) bool { } if o.isLeader() { asflr, osseq = o.asflr, o.sseq - pending = o.pending + pending, rdc = o.pending, o.rdc } else { if o.store == nil { return false @@ -3023,7 +3029,7 @@ func (o *consumer) needAck(sseq uint64, subj string) bool { return sseq > o.asflr && !o.isFiltered() } // If loading state as here, the osseq is +1. - asflr, osseq, pending = state.AckFloor.Stream, state.Delivered.Stream+1, state.Pending + asflr, osseq, pending, rdc = state.AckFloor.Stream, state.Delivered.Stream+1, state.Pending, state.Redelivered } switch o.cfg.AckPolicy { @@ -3039,6 +3045,12 @@ func (o *consumer) needAck(sseq uint64, subj string) bool { } } + // Finally check if redelivery of this message is tracked. + // If the message is not pending, it should be preserved if it reached max delivery. + if !needAck { + _, needAck = rdc[sseq] + } + return needAck } @@ -3497,7 +3509,10 @@ func (o *consumer) deliveryCount(seq uint64) uint64 { if o.rdc == nil { return 1 } - return o.rdc[seq] + if dc := o.rdc[seq]; dc >= 1 { + return dc + } + return 1 } // Increase the delivery count for this message. @@ -3811,10 +3826,7 @@ func (o *consumer) checkAckFloor() { // Check if this message was pending. o.mu.RLock() p, isPending := o.pending[seq] - var rdc uint64 = 1 - if o.rdc != nil { - rdc = o.rdc[seq] - } + rdc := o.deliveryCount(seq) o.mu.RUnlock() // If it was pending for us, get rid of it. if isPending { @@ -3832,10 +3844,7 @@ func (o *consumer) checkAckFloor() { if p != nil { dseq = p.Sequence } - var rdc uint64 = 1 - if o.rdc != nil { - rdc = o.rdc[seq] - } + rdc := o.deliveryCount(seq) toTerm = append(toTerm, seq, dseq, rdc) } } @@ -5409,10 +5418,7 @@ func (o *consumer) decStreamPending(sseq uint64, subj string) { // Check if this message was pending. p, wasPending := o.pending[sseq] - var rdc uint64 = 1 - if o.rdc != nil { - rdc = o.rdc[sseq] - } + rdc := o.deliveryCount(sseq) o.mu.Unlock() diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index ca2eed3e507..6ae0afa8700 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -6601,7 +6601,7 @@ func TestJetStreamClusterMaxDeliveriesOnInterestStreams(t *testing.T) { require_Equal(t, ci.AckFloor.Consumer, 1) require_Equal(t, ci.AckFloor.Stream, 1) require_Equal(t, ci.NumAckPending, 0) - require_Equal(t, ci.NumRedelivered, 0) + require_Equal(t, ci.NumRedelivered, 1) require_Equal(t, ci.NumPending, 0) } } diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 13ca180bb02..18f4d4c15b4 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -5427,7 +5427,7 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) { require_Equal(t, a.AckFloor.Stream, 10) } require_Equal(t, a.NumPending, 40) - require_Equal(t, a.NumRedelivered, 0) + require_Equal(t, a.NumRedelivered, 10) a.Cluster, b.Cluster = nil, nil a.Delivered.Last, b.Delivered.Last = nil, nil if !reflect.DeepEqual(a, b) { diff --git a/server/jetstream_consumer_test.go b/server/jetstream_consumer_test.go index a45a30a13d5..4652c19d549 100644 --- a/server/jetstream_consumer_test.go +++ b/server/jetstream_consumer_test.go @@ -1707,3 +1707,56 @@ func TestJetStreamConsumerMessageDeletedDuringRedelivery(t *testing.T) { }) } } + +func TestJetStreamConsumerDeliveryCount(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + require_NoError(t, err) + + for i := 0; i < 2; i++ { + _, err = js.Publish("foo", nil) + require_NoError(t, err) + } + + sub, err := js.PullSubscribe( + "foo", + "CONSUMER", + nats.ManualAck(), + nats.AckExplicit(), + nats.AckWait(time.Second), + nats.MaxDeliver(1), + ) + require_NoError(t, err) + + acc, err := s.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + o := mset.lookupConsumer("CONSUMER") + require_NotNil(t, o) + + msgs, err := sub.Fetch(2) + require_NoError(t, err) + require_Len(t, len(msgs), 2) + require_NoError(t, msgs[1].Nak()) + + require_Equal(t, o.deliveryCount(1), 1) + require_Equal(t, o.deliveryCount(2), 1) + + // max deliver 1 so this will fail + _, err = sub.Fetch(1, nats.MaxWait(250*time.Millisecond)) + require_Error(t, err) + + // This would previously report delivery count 0, because o.rdc!=nil + require_Equal(t, o.deliveryCount(1), 1) + require_Equal(t, o.deliveryCount(2), 1) + +} diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 0b183f6e208..8a9a26ec79a 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -23203,3 +23203,108 @@ func TestJetStreamConsumerPendingCountAfterMsgAckAboveFloor(t *testing.T) { // We've fetched 2 message so should report 0 pending. requireExpected(0) } + +// https://github.com/nats-io/nats-server/issues/6538 +func TestJetStreamInterestMaxDeliveryReached(t *testing.T) { + maxWait := 250 * time.Millisecond + for _, useNak := range []bool{true, false} { + for _, test := range []struct { + title string + action func(s *Server, sub *nats.Subscription) + }{ + { + title: "fetch", + action: func(s *Server, sub *nats.Subscription) { + time.Sleep(time.Second) + + // max deliver 1 so this will fail + _, err := sub.Fetch(1, nats.MaxWait(maxWait)) + require_Error(t, err) + }, + }, + { + title: "expire pending", + action: func(s *Server, sub *nats.Subscription) { + acc, err := s.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + o := mset.lookupConsumer("consumer") + require_NotNil(t, o) + + o.mu.Lock() + o.forceExpirePending() + o.mu.Unlock() + }, + }, + } { + title := fmt.Sprintf("nak/%s", test.title) + if !useNak { + title = fmt.Sprintf("no-%s", title) + } + t.Run(title, func(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Storage: nats.FileStorage, + Subjects: []string{"test"}, + Replicas: 1, + Retention: nats.InterestPolicy, + }) + require_NoError(t, err) + + sub, err := js.PullSubscribe("test", "consumer", nats.AckWait(time.Second), nats.MaxDeliver(1)) + require_NoError(t, err) + + _, err = nc.Request("test", []byte("hello"), maxWait) + require_NoError(t, err) + + nfo, err := js.StreamInfo("TEST") + require_NoError(t, err) + require_Equal(t, nfo.State.Msgs, uint64(1)) + + msg, err := sub.Fetch(1, nats.MaxWait(maxWait)) + require_NoError(t, err) + require_Len(t, 1, len(msg)) + if useNak { + require_NoError(t, msg[0].Nak()) + } + + cnfo, err := js.ConsumerInfo("TEST", "consumer") + require_NoError(t, err) + require_Equal(t, cnfo.NumAckPending, 1) + + test.action(s, sub) + + // max deliver 1 so this will fail + _, err = sub.Fetch(1, nats.MaxWait(maxWait)) + require_Error(t, err) + + cnfo, err = js.ConsumerInfo("TEST", "consumer") + require_NoError(t, err) + require_Equal(t, cnfo.NumAckPending, 0) + + nfo, err = js.StreamInfo("TEST") + require_NoError(t, err) + require_Equal(t, nfo.State.Msgs, uint64(1)) + + sub2, err := js.PullSubscribe("test", "consumer2") + require_NoError(t, err) + + msg, err = sub2.Fetch(1) + require_NoError(t, err) + require_Len(t, 1, len(msg)) + require_NoError(t, msg[0].AckSync()) + + nfo, err = js.StreamInfo("TEST") + require_NoError(t, err) + require_Equal(t, nfo.State.Msgs, uint64(1)) + }) + } + } +} From 4ec93372de44e73275451346e18e1a6381c3d4ef Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Mon, 24 Feb 2025 16:20:09 +0000 Subject: [PATCH 4/4] Reduce allocations in `writeMsgRecord` Signed-off-by: Neil Twigg --- server/filestore.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 60829447afb..7168e76a453 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -5323,9 +5323,7 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte // With headers, high bit on total length will be set. // total_len(4) sequence(8) timestamp(8) subj_len(2) subj hdr_len(4) hdr msg hash(8) - // First write header, etc. var le = binary.LittleEndian - var hdr [msgHdrSize]byte l := uint32(rl) hasHeaders := len(mhdr) > 0 @@ -5333,13 +5331,15 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte l |= hbit } + // Reserve space for the header on the underlying buffer. + mb.cache.buf = append(mb.cache.buf, make([]byte, msgHdrSize)...) + hdr := mb.cache.buf[len(mb.cache.buf)-msgHdrSize : len(mb.cache.buf)] le.PutUint32(hdr[0:], l) le.PutUint64(hdr[4:], seq) le.PutUint64(hdr[12:], uint64(ts)) le.PutUint16(hdr[20:], uint16(len(subj))) // Now write to underlying buffer. - mb.cache.buf = append(mb.cache.buf, hdr[:]...) mb.cache.buf = append(mb.cache.buf, subj...) if hasHeaders { @@ -5353,13 +5353,12 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte // Calculate hash. mb.hh.Reset() mb.hh.Write(hdr[4:20]) - mb.hh.Write([]byte(subj)) + mb.hh.Write(stringToBytes(subj)) if hasHeaders { mb.hh.Write(mhdr) } mb.hh.Write(msg) - checksum := mb.hh.Sum(nil) - // Grab last checksum + checksum := mb.hh.Sum(mb.lchk[:0:highwayhash.Size64]) copy(mb.lchk[0:], checksum) // Update write through cache.