Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-picks for 2.10.26-RC.6 #6578

Merged
merged 4 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ go:
# This should be quoted or use .x, but should not be unquoted.
# Remember that a YAML bare float drops trailing zeroes.
- "1.23.6"
- "1.22.12"

go_import_path: github.com/nats-io/nats-server

Expand Down Expand Up @@ -46,9 +45,6 @@ jobs:
env: TEST_SUITE=srv_pkg_non_js_tests
- name: "Run all tests from all other packages"
env: TEST_SUITE=non_srv_pkg_tests
- name: "Compile with older Go release"
go: "1.21.x"
env: TEST_SUITE=build_only

script: ./scripts/runTestsOnTravis.sh $TEST_SUITE

Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
module github.com/nats-io/nats-server/v2

go 1.22.0
go 1.23.0

toolchain go1.22.8
toolchain go1.23.6

require (
github.com/klauspost/compress v1.18.0
github.com/minio/highwayhash v1.0.3
github.com/nats-io/jwt/v2 v2.7.3
github.com/nats-io/nats.go v1.39.0
github.com/nats-io/nats.go v1.39.1
github.com/nats-io/nkeys v0.4.10
github.com/nats-io/nuid v1.0.1
go.uber.org/automaxprocs v1.6.0
golang.org/x/crypto v0.33.0
golang.org/x/crypto v0.34.0
golang.org/x/sys v0.30.0
golang.org/x/time v0.10.0
)
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD
github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ=
github.com/nats-io/jwt/v2 v2.7.3 h1:6bNPK+FXgBeAqdj4cYQ0F8ViHRbi7woQLq4W29nUAzE=
github.com/nats-io/jwt/v2 v2.7.3/go.mod h1:GvkcbHhKquj3pkioy5put1wvPxs78UlZ7D/pY+BgZk4=
github.com/nats-io/nats.go v1.39.0 h1:2/yg2JQjiYYKLwDuBzV0FbB2sIV+eFNkEevlRi4n9lI=
github.com/nats-io/nats.go v1.39.0/go.mod h1:MgRb8oOdigA6cYpEPhXJuRVH6UE/V4jblJ2jQ27IXYM=
github.com/nats-io/nats.go v1.39.1 h1:oTkfKBmz7W047vRxV762M67ZdXeOtUgvbBaNoQ+3PPk=
github.com/nats-io/nats.go v1.39.1/go.mod h1:MgRb8oOdigA6cYpEPhXJuRVH6UE/V4jblJ2jQ27IXYM=
github.com/nats-io/nkeys v0.4.10 h1:glmRrpCmYLHByYcePvnTBEAwawwapjCPMjy2huw20wc=
github.com/nats-io/nkeys v0.4.10/go.mod h1:OjRrnIKnWBFl+s4YK5ChQfvHP2fxqZexrKJoVVyWB3U=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
Expand All @@ -20,8 +20,8 @@ github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMT
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
golang.org/x/crypto v0.33.0 h1:IOBPskki6Lysi0lo9qQvbxiQ+FvsCC/YWOecCHAixus=
golang.org/x/crypto v0.33.0/go.mod h1:bVdXmD7IV/4GdElGPozy6U7lWdRXA4qyRVGJV57uQ5M=
golang.org/x/crypto v0.34.0 h1:+/C6tk6rf/+t5DhUketUbD1aNGqiSX3j15Z6xuIDlBA=
golang.org/x/crypto v0.34.0/go.mod h1:dy7dXNW32cAb/6/PRuTNsix8T+vJAqvuIy5Bli/x0YQ=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc=
golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
Expand Down
49 changes: 32 additions & 17 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,9 @@ const (
maxNoRTTPingBeforeFirstPong = 2 * time.Second

// For stalling fast producers
stallClientMinDuration = 100 * time.Millisecond
stallClientMaxDuration = time.Second
stallClientMinDuration = 2 * time.Millisecond
stallClientMaxDuration = 5 * time.Millisecond
stallTotalAllowed = 10 * time.Millisecond
)

var readLoopReportThreshold = readLoopReport
Expand Down Expand Up @@ -462,6 +463,9 @@ type readCache struct {

// Capture the time we started processing our readLoop.
start time.Time

// Total time stalled so far for readLoop processing.
tst time.Duration
}

// set the flag (would be equivalent to set the boolean to true)
Expand Down Expand Up @@ -1414,6 +1418,11 @@ func (c *client) readLoop(pre []byte) {
}
return
}
// Clear total stalled time here.
if c.in.tst >= stallClientMaxDuration {
c.rateLimitFormatWarnf("Producer was stalled for a total of %v", c.in.tst.Round(time.Millisecond))
}
c.in.tst = 0
}

// If we are a ROUTER/LEAF and have processed an INFO, it is possible that
Expand Down Expand Up @@ -1730,7 +1739,7 @@ func (c *client) flushOutbound() bool {

// Check if we have a stalled gate and if so and we are recovering release
// any stalled producers. Only kind==CLIENT will stall.
if c.out.stc != nil && (n == attempted || c.out.pb < c.out.mp/2) {
if c.out.stc != nil && (n == attempted || c.out.pb < c.out.mp/4*3) {
close(c.out.stc)
c.out.stc = nil
}
Expand Down Expand Up @@ -2292,7 +2301,8 @@ func (c *client) queueOutbound(data []byte) {
// Check here if we should create a stall channel if we are falling behind.
// We do this here since if we wait for consumer's writeLoop it could be
// too late with large number of fan in producers.
if c.out.pb > c.out.mp/2 && c.out.stc == nil {
// If the outbound connection is > 75% of maximum pending allowed, create a stall gate.
if c.out.pb > c.out.mp/4*3 && c.out.stc == nil {
c.out.stc = make(chan struct{})
}
}
Expand Down Expand Up @@ -3337,31 +3347,36 @@ func (c *client) msgHeader(subj, reply []byte, sub *subscription) []byte {
}

func (c *client) stalledWait(producer *client) {
// Check to see if we have exceeded our total wait time per readLoop invocation.
if producer.in.tst > stallTotalAllowed {
return
}

// Grab stall channel which the slow consumer will close when caught up.
stall := c.out.stc
ttl := stallDuration(c.out.pb, c.out.mp)

c.mu.Unlock()
defer c.mu.Lock()

// Calculate stall time.
ttl := stallClientMinDuration
if c.out.pb >= c.out.mp {
ttl = stallClientMaxDuration
}
// Now check if we are close to total allowed.
if producer.in.tst+ttl > stallTotalAllowed {
ttl = stallTotalAllowed - producer.in.tst
}
delay := time.NewTimer(ttl)
defer delay.Stop()

start := time.Now()
select {
case <-stall:
case <-delay.C:
producer.Debugf("Timed out of fast producer stall (%v)", ttl)
}
}

func stallDuration(pb, mp int64) time.Duration {
ttl := stallClientMinDuration
if pb >= mp {
ttl = stallClientMaxDuration
} else if hmp := mp / 2; pb > hmp {
bsz := hmp / 10
additional := int64(ttl) * ((pb - hmp) / bsz)
ttl += time.Duration(additional)
}
return ttl
producer.in.tst += time.Since(start)
}

// Used to treat maps as efficient set
Expand Down
26 changes: 0 additions & 26 deletions server/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2062,32 +2062,6 @@ func TestClientNoSlowConsumerIfConnectExpected(t *testing.T) {
}
}

func TestClientStalledDuration(t *testing.T) {
for _, test := range []struct {
name string
pb int64
mp int64
expectedTTL time.Duration
}{
{"pb above mp", 110, 100, stallClientMaxDuration},
{"pb equal mp", 100, 100, stallClientMaxDuration},
{"pb below mp/2", 49, 100, stallClientMinDuration},
{"pb equal mp/2", 50, 100, stallClientMinDuration},
{"pb at 55% of mp", 55, 100, stallClientMinDuration + 1*stallClientMinDuration},
{"pb at 60% of mp", 60, 100, stallClientMinDuration + 2*stallClientMinDuration},
{"pb at 70% of mp", 70, 100, stallClientMinDuration + 4*stallClientMinDuration},
{"pb at 80% of mp", 80, 100, stallClientMinDuration + 6*stallClientMinDuration},
{"pb at 90% of mp", 90, 100, stallClientMinDuration + 8*stallClientMinDuration},
{"pb at 99% of mp", 99, 100, stallClientMinDuration + 9*stallClientMinDuration},
} {
t.Run(test.name, func(t *testing.T) {
if ttl := stallDuration(test.pb, test.mp); ttl != test.expectedTTL {
t.Fatalf("For pb=%v mp=%v, expected TTL to be %v, got %v", test.pb, test.mp, test.expectedTTL, ttl)
}
})
}
}

func TestClientIPv6Address(t *testing.T) {
opts := DefaultOptions()
opts.Host = "0.0.0.0"
Expand Down
44 changes: 25 additions & 19 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1749,11 +1749,16 @@ func (o *consumer) hasMaxDeliveries(seq uint64) bool {
if o.maxp > 0 && len(o.pending) >= o.maxp {
o.signalNewMessages()
}
// Cleanup our tracking.
delete(o.pending, seq)
if o.rdc != nil {
delete(o.rdc, seq)
// Make sure to remove from pending.
if p, ok := o.pending[seq]; ok && p != nil {
delete(o.pending, seq)
o.updateDelivered(p.Sequence, seq, dc, p.Timestamp)
}
// Ensure redelivered state is set, if not already.
if o.rdc == nil {
o.rdc = make(map[uint64]uint64)
}
o.rdc[seq] = dc
return true
}
return false
Expand Down Expand Up @@ -2988,6 +2993,7 @@ func (o *consumer) needAck(sseq uint64, subj string) bool {
var needAck bool
var asflr, osseq uint64
var pending map[uint64]*Pending
var rdc map[uint64]uint64

o.mu.RLock()
defer o.mu.RUnlock()
Expand All @@ -3012,7 +3018,7 @@ func (o *consumer) needAck(sseq uint64, subj string) bool {
}
if o.isLeader() {
asflr, osseq = o.asflr, o.sseq
pending = o.pending
pending, rdc = o.pending, o.rdc
} else {
if o.store == nil {
return false
Expand All @@ -3023,7 +3029,7 @@ func (o *consumer) needAck(sseq uint64, subj string) bool {
return sseq > o.asflr && !o.isFiltered()
}
// If loading state as here, the osseq is +1.
asflr, osseq, pending = state.AckFloor.Stream, state.Delivered.Stream+1, state.Pending
asflr, osseq, pending, rdc = state.AckFloor.Stream, state.Delivered.Stream+1, state.Pending, state.Redelivered
}

switch o.cfg.AckPolicy {
Expand All @@ -3039,6 +3045,12 @@ func (o *consumer) needAck(sseq uint64, subj string) bool {
}
}

// Finally check if redelivery of this message is tracked.
// If the message is not pending, it should be preserved if it reached max delivery.
if !needAck {
_, needAck = rdc[sseq]
}

return needAck
}

Expand Down Expand Up @@ -3497,7 +3509,10 @@ func (o *consumer) deliveryCount(seq uint64) uint64 {
if o.rdc == nil {
return 1
}
return o.rdc[seq]
if dc := o.rdc[seq]; dc >= 1 {
return dc
}
return 1
}

// Increase the delivery count for this message.
Expand Down Expand Up @@ -3811,10 +3826,7 @@ func (o *consumer) checkAckFloor() {
// Check if this message was pending.
o.mu.RLock()
p, isPending := o.pending[seq]
var rdc uint64 = 1
if o.rdc != nil {
rdc = o.rdc[seq]
}
rdc := o.deliveryCount(seq)
o.mu.RUnlock()
// If it was pending for us, get rid of it.
if isPending {
Expand All @@ -3832,10 +3844,7 @@ func (o *consumer) checkAckFloor() {
if p != nil {
dseq = p.Sequence
}
var rdc uint64 = 1
if o.rdc != nil {
rdc = o.rdc[seq]
}
rdc := o.deliveryCount(seq)
toTerm = append(toTerm, seq, dseq, rdc)
}
}
Expand Down Expand Up @@ -5409,10 +5418,7 @@ func (o *consumer) decStreamPending(sseq uint64, subj string) {

// Check if this message was pending.
p, wasPending := o.pending[sseq]
var rdc uint64 = 1
if o.rdc != nil {
rdc = o.rdc[sseq]
}
rdc := o.deliveryCount(sseq)

o.mu.Unlock()

Expand Down
11 changes: 5 additions & 6 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -5323,23 +5323,23 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte
// With headers, high bit on total length will be set.
// total_len(4) sequence(8) timestamp(8) subj_len(2) subj hdr_len(4) hdr msg hash(8)

// First write header, etc.
var le = binary.LittleEndian
var hdr [msgHdrSize]byte

l := uint32(rl)
hasHeaders := len(mhdr) > 0
if hasHeaders {
l |= hbit
}

// Reserve space for the header on the underlying buffer.
mb.cache.buf = append(mb.cache.buf, make([]byte, msgHdrSize)...)
hdr := mb.cache.buf[len(mb.cache.buf)-msgHdrSize : len(mb.cache.buf)]
le.PutUint32(hdr[0:], l)
le.PutUint64(hdr[4:], seq)
le.PutUint64(hdr[12:], uint64(ts))
le.PutUint16(hdr[20:], uint16(len(subj)))

// Now write to underlying buffer.
mb.cache.buf = append(mb.cache.buf, hdr[:]...)
mb.cache.buf = append(mb.cache.buf, subj...)

if hasHeaders {
Expand All @@ -5353,13 +5353,12 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte
// Calculate hash.
mb.hh.Reset()
mb.hh.Write(hdr[4:20])
mb.hh.Write([]byte(subj))
mb.hh.Write(stringToBytes(subj))
if hasHeaders {
mb.hh.Write(mhdr)
}
mb.hh.Write(msg)
checksum := mb.hh.Sum(nil)
// Grab last checksum
checksum := mb.hh.Sum(mb.lchk[:0:highwayhash.Size64])
copy(mb.lchk[0:], checksum)

// Update write through cache.
Expand Down
2 changes: 1 addition & 1 deletion server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6601,7 +6601,7 @@ func TestJetStreamClusterMaxDeliveriesOnInterestStreams(t *testing.T) {
require_Equal(t, ci.AckFloor.Consumer, 1)
require_Equal(t, ci.AckFloor.Stream, 1)
require_Equal(t, ci.NumAckPending, 0)
require_Equal(t, ci.NumRedelivered, 0)
require_Equal(t, ci.NumRedelivered, 1)
require_Equal(t, ci.NumPending, 0)
}
}
Expand Down
2 changes: 1 addition & 1 deletion server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5427,7 +5427,7 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) {
require_Equal(t, a.AckFloor.Stream, 10)
}
require_Equal(t, a.NumPending, 40)
require_Equal(t, a.NumRedelivered, 0)
require_Equal(t, a.NumRedelivered, 10)
a.Cluster, b.Cluster = nil, nil
a.Delivered.Last, b.Delivered.Last = nil, nil
if !reflect.DeepEqual(a, b) {
Expand Down
Loading
Loading