Skip to content

Commit ecf7237

Browse files
Cherry-picks for 2.10.26-RC.6 (#6578)
Includes the following: - #6574 - #6568 - #6575 - #6576 Signed-off-by: Neil Twigg <[email protected]>
2 parents 9f1a9cb + 4ec9337 commit ecf7237

12 files changed

+288
-82
lines changed

.travis.yml

-4
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ go:
99
# This should be quoted or use .x, but should not be unquoted.
1010
# Remember that a YAML bare float drops trailing zeroes.
1111
- "1.23.6"
12-
- "1.22.12"
1312

1413
go_import_path: github.com/nats-io/nats-server
1514

@@ -46,9 +45,6 @@ jobs:
4645
env: TEST_SUITE=srv_pkg_non_js_tests
4746
- name: "Run all tests from all other packages"
4847
env: TEST_SUITE=non_srv_pkg_tests
49-
- name: "Compile with older Go release"
50-
go: "1.21.x"
51-
env: TEST_SUITE=build_only
5248

5349
script: ./scripts/runTestsOnTravis.sh $TEST_SUITE
5450

go.mod

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
module github.com/nats-io/nats-server/v2
22

3-
go 1.22.0
3+
go 1.23.0
44

5-
toolchain go1.22.8
5+
toolchain go1.23.6
66

77
require (
88
github.com/klauspost/compress v1.18.0
99
github.com/minio/highwayhash v1.0.3
1010
github.com/nats-io/jwt/v2 v2.7.3
11-
github.com/nats-io/nats.go v1.39.0
11+
github.com/nats-io/nats.go v1.39.1
1212
github.com/nats-io/nkeys v0.4.10
1313
github.com/nats-io/nuid v1.0.1
1414
go.uber.org/automaxprocs v1.6.0
15-
golang.org/x/crypto v0.33.0
15+
golang.org/x/crypto v0.34.0
1616
golang.org/x/sys v0.30.0
1717
golang.org/x/time v0.10.0
1818
)

go.sum

+4-4
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD
66
github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ=
77
github.com/nats-io/jwt/v2 v2.7.3 h1:6bNPK+FXgBeAqdj4cYQ0F8ViHRbi7woQLq4W29nUAzE=
88
github.com/nats-io/jwt/v2 v2.7.3/go.mod h1:GvkcbHhKquj3pkioy5put1wvPxs78UlZ7D/pY+BgZk4=
9-
github.com/nats-io/nats.go v1.39.0 h1:2/yg2JQjiYYKLwDuBzV0FbB2sIV+eFNkEevlRi4n9lI=
10-
github.com/nats-io/nats.go v1.39.0/go.mod h1:MgRb8oOdigA6cYpEPhXJuRVH6UE/V4jblJ2jQ27IXYM=
9+
github.com/nats-io/nats.go v1.39.1 h1:oTkfKBmz7W047vRxV762M67ZdXeOtUgvbBaNoQ+3PPk=
10+
github.com/nats-io/nats.go v1.39.1/go.mod h1:MgRb8oOdigA6cYpEPhXJuRVH6UE/V4jblJ2jQ27IXYM=
1111
github.com/nats-io/nkeys v0.4.10 h1:glmRrpCmYLHByYcePvnTBEAwawwapjCPMjy2huw20wc=
1212
github.com/nats-io/nkeys v0.4.10/go.mod h1:OjRrnIKnWBFl+s4YK5ChQfvHP2fxqZexrKJoVVyWB3U=
1313
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
@@ -20,8 +20,8 @@ github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMT
2020
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
2121
go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
2222
go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
23-
golang.org/x/crypto v0.33.0 h1:IOBPskki6Lysi0lo9qQvbxiQ+FvsCC/YWOecCHAixus=
24-
golang.org/x/crypto v0.33.0/go.mod h1:bVdXmD7IV/4GdElGPozy6U7lWdRXA4qyRVGJV57uQ5M=
23+
golang.org/x/crypto v0.34.0 h1:+/C6tk6rf/+t5DhUketUbD1aNGqiSX3j15Z6xuIDlBA=
24+
golang.org/x/crypto v0.34.0/go.mod h1:dy7dXNW32cAb/6/PRuTNsix8T+vJAqvuIy5Bli/x0YQ=
2525
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
2626
golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc=
2727
golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=

server/client.go

+32-17
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,9 @@ const (
113113
maxNoRTTPingBeforeFirstPong = 2 * time.Second
114114

115115
// For stalling fast producers
116-
stallClientMinDuration = 100 * time.Millisecond
117-
stallClientMaxDuration = time.Second
116+
stallClientMinDuration = 2 * time.Millisecond
117+
stallClientMaxDuration = 5 * time.Millisecond
118+
stallTotalAllowed = 10 * time.Millisecond
118119
)
119120

120121
var readLoopReportThreshold = readLoopReport
@@ -462,6 +463,9 @@ type readCache struct {
462463

463464
// Capture the time we started processing our readLoop.
464465
start time.Time
466+
467+
// Total time stalled so far for readLoop processing.
468+
tst time.Duration
465469
}
466470

467471
// set the flag (would be equivalent to set the boolean to true)
@@ -1414,6 +1418,11 @@ func (c *client) readLoop(pre []byte) {
14141418
}
14151419
return
14161420
}
1421+
// Clear total stalled time here.
1422+
if c.in.tst >= stallClientMaxDuration {
1423+
c.rateLimitFormatWarnf("Producer was stalled for a total of %v", c.in.tst.Round(time.Millisecond))
1424+
}
1425+
c.in.tst = 0
14171426
}
14181427

14191428
// If we are a ROUTER/LEAF and have processed an INFO, it is possible that
@@ -1730,7 +1739,7 @@ func (c *client) flushOutbound() bool {
17301739

17311740
// Check if we have a stalled gate and if so and we are recovering release
17321741
// any stalled producers. Only kind==CLIENT will stall.
1733-
if c.out.stc != nil && (n == attempted || c.out.pb < c.out.mp/2) {
1742+
if c.out.stc != nil && (n == attempted || c.out.pb < c.out.mp/4*3) {
17341743
close(c.out.stc)
17351744
c.out.stc = nil
17361745
}
@@ -2292,7 +2301,8 @@ func (c *client) queueOutbound(data []byte) {
22922301
// Check here if we should create a stall channel if we are falling behind.
22932302
// We do this here since if we wait for consumer's writeLoop it could be
22942303
// too late with large number of fan in producers.
2295-
if c.out.pb > c.out.mp/2 && c.out.stc == nil {
2304+
// If the outbound connection is > 75% of maximum pending allowed, create a stall gate.
2305+
if c.out.pb > c.out.mp/4*3 && c.out.stc == nil {
22962306
c.out.stc = make(chan struct{})
22972307
}
22982308
}
@@ -3337,31 +3347,36 @@ func (c *client) msgHeader(subj, reply []byte, sub *subscription) []byte {
33373347
}
33383348

33393349
func (c *client) stalledWait(producer *client) {
3350+
// Check to see if we have exceeded our total wait time per readLoop invocation.
3351+
if producer.in.tst > stallTotalAllowed {
3352+
return
3353+
}
3354+
3355+
// Grab stall channel which the slow consumer will close when caught up.
33403356
stall := c.out.stc
3341-
ttl := stallDuration(c.out.pb, c.out.mp)
3357+
33423358
c.mu.Unlock()
33433359
defer c.mu.Lock()
33443360

3361+
// Calculate stall time.
3362+
ttl := stallClientMinDuration
3363+
if c.out.pb >= c.out.mp {
3364+
ttl = stallClientMaxDuration
3365+
}
3366+
// Now check if we are close to total allowed.
3367+
if producer.in.tst+ttl > stallTotalAllowed {
3368+
ttl = stallTotalAllowed - producer.in.tst
3369+
}
33453370
delay := time.NewTimer(ttl)
33463371
defer delay.Stop()
33473372

3373+
start := time.Now()
33483374
select {
33493375
case <-stall:
33503376
case <-delay.C:
33513377
producer.Debugf("Timed out of fast producer stall (%v)", ttl)
33523378
}
3353-
}
3354-
3355-
func stallDuration(pb, mp int64) time.Duration {
3356-
ttl := stallClientMinDuration
3357-
if pb >= mp {
3358-
ttl = stallClientMaxDuration
3359-
} else if hmp := mp / 2; pb > hmp {
3360-
bsz := hmp / 10
3361-
additional := int64(ttl) * ((pb - hmp) / bsz)
3362-
ttl += time.Duration(additional)
3363-
}
3364-
return ttl
3379+
producer.in.tst += time.Since(start)
33653380
}
33663381

33673382
// Used to treat maps as efficient set

server/client_test.go

-26
Original file line numberDiff line numberDiff line change
@@ -2062,32 +2062,6 @@ func TestClientNoSlowConsumerIfConnectExpected(t *testing.T) {
20622062
}
20632063
}
20642064

2065-
func TestClientStalledDuration(t *testing.T) {
2066-
for _, test := range []struct {
2067-
name string
2068-
pb int64
2069-
mp int64
2070-
expectedTTL time.Duration
2071-
}{
2072-
{"pb above mp", 110, 100, stallClientMaxDuration},
2073-
{"pb equal mp", 100, 100, stallClientMaxDuration},
2074-
{"pb below mp/2", 49, 100, stallClientMinDuration},
2075-
{"pb equal mp/2", 50, 100, stallClientMinDuration},
2076-
{"pb at 55% of mp", 55, 100, stallClientMinDuration + 1*stallClientMinDuration},
2077-
{"pb at 60% of mp", 60, 100, stallClientMinDuration + 2*stallClientMinDuration},
2078-
{"pb at 70% of mp", 70, 100, stallClientMinDuration + 4*stallClientMinDuration},
2079-
{"pb at 80% of mp", 80, 100, stallClientMinDuration + 6*stallClientMinDuration},
2080-
{"pb at 90% of mp", 90, 100, stallClientMinDuration + 8*stallClientMinDuration},
2081-
{"pb at 99% of mp", 99, 100, stallClientMinDuration + 9*stallClientMinDuration},
2082-
} {
2083-
t.Run(test.name, func(t *testing.T) {
2084-
if ttl := stallDuration(test.pb, test.mp); ttl != test.expectedTTL {
2085-
t.Fatalf("For pb=%v mp=%v, expected TTL to be %v, got %v", test.pb, test.mp, test.expectedTTL, ttl)
2086-
}
2087-
})
2088-
}
2089-
}
2090-
20912065
func TestClientIPv6Address(t *testing.T) {
20922066
opts := DefaultOptions()
20932067
opts.Host = "0.0.0.0"

server/consumer.go

+25-19
Original file line numberDiff line numberDiff line change
@@ -1749,11 +1749,16 @@ func (o *consumer) hasMaxDeliveries(seq uint64) bool {
17491749
if o.maxp > 0 && len(o.pending) >= o.maxp {
17501750
o.signalNewMessages()
17511751
}
1752-
// Cleanup our tracking.
1753-
delete(o.pending, seq)
1754-
if o.rdc != nil {
1755-
delete(o.rdc, seq)
1752+
// Make sure to remove from pending.
1753+
if p, ok := o.pending[seq]; ok && p != nil {
1754+
delete(o.pending, seq)
1755+
o.updateDelivered(p.Sequence, seq, dc, p.Timestamp)
1756+
}
1757+
// Ensure redelivered state is set, if not already.
1758+
if o.rdc == nil {
1759+
o.rdc = make(map[uint64]uint64)
17561760
}
1761+
o.rdc[seq] = dc
17571762
return true
17581763
}
17591764
return false
@@ -2988,6 +2993,7 @@ func (o *consumer) needAck(sseq uint64, subj string) bool {
29882993
var needAck bool
29892994
var asflr, osseq uint64
29902995
var pending map[uint64]*Pending
2996+
var rdc map[uint64]uint64
29912997

29922998
o.mu.RLock()
29932999
defer o.mu.RUnlock()
@@ -3012,7 +3018,7 @@ func (o *consumer) needAck(sseq uint64, subj string) bool {
30123018
}
30133019
if o.isLeader() {
30143020
asflr, osseq = o.asflr, o.sseq
3015-
pending = o.pending
3021+
pending, rdc = o.pending, o.rdc
30163022
} else {
30173023
if o.store == nil {
30183024
return false
@@ -3023,7 +3029,7 @@ func (o *consumer) needAck(sseq uint64, subj string) bool {
30233029
return sseq > o.asflr && !o.isFiltered()
30243030
}
30253031
// If loading state as here, the osseq is +1.
3026-
asflr, osseq, pending = state.AckFloor.Stream, state.Delivered.Stream+1, state.Pending
3032+
asflr, osseq, pending, rdc = state.AckFloor.Stream, state.Delivered.Stream+1, state.Pending, state.Redelivered
30273033
}
30283034

30293035
switch o.cfg.AckPolicy {
@@ -3039,6 +3045,12 @@ func (o *consumer) needAck(sseq uint64, subj string) bool {
30393045
}
30403046
}
30413047

3048+
// Finally check if redelivery of this message is tracked.
3049+
// If the message is not pending, it should be preserved if it reached max delivery.
3050+
if !needAck {
3051+
_, needAck = rdc[sseq]
3052+
}
3053+
30423054
return needAck
30433055
}
30443056

@@ -3497,7 +3509,10 @@ func (o *consumer) deliveryCount(seq uint64) uint64 {
34973509
if o.rdc == nil {
34983510
return 1
34993511
}
3500-
return o.rdc[seq]
3512+
if dc := o.rdc[seq]; dc >= 1 {
3513+
return dc
3514+
}
3515+
return 1
35013516
}
35023517

35033518
// Increase the delivery count for this message.
@@ -3811,10 +3826,7 @@ func (o *consumer) checkAckFloor() {
38113826
// Check if this message was pending.
38123827
o.mu.RLock()
38133828
p, isPending := o.pending[seq]
3814-
var rdc uint64 = 1
3815-
if o.rdc != nil {
3816-
rdc = o.rdc[seq]
3817-
}
3829+
rdc := o.deliveryCount(seq)
38183830
o.mu.RUnlock()
38193831
// If it was pending for us, get rid of it.
38203832
if isPending {
@@ -3832,10 +3844,7 @@ func (o *consumer) checkAckFloor() {
38323844
if p != nil {
38333845
dseq = p.Sequence
38343846
}
3835-
var rdc uint64 = 1
3836-
if o.rdc != nil {
3837-
rdc = o.rdc[seq]
3838-
}
3847+
rdc := o.deliveryCount(seq)
38393848
toTerm = append(toTerm, seq, dseq, rdc)
38403849
}
38413850
}
@@ -5409,10 +5418,7 @@ func (o *consumer) decStreamPending(sseq uint64, subj string) {
54095418

54105419
// Check if this message was pending.
54115420
p, wasPending := o.pending[sseq]
5412-
var rdc uint64 = 1
5413-
if o.rdc != nil {
5414-
rdc = o.rdc[sseq]
5415-
}
5421+
rdc := o.deliveryCount(sseq)
54165422

54175423
o.mu.Unlock()
54185424

server/filestore.go

+5-6
Original file line numberDiff line numberDiff line change
@@ -5323,23 +5323,23 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte
53235323
// With headers, high bit on total length will be set.
53245324
// total_len(4) sequence(8) timestamp(8) subj_len(2) subj hdr_len(4) hdr msg hash(8)
53255325

5326-
// First write header, etc.
53275326
var le = binary.LittleEndian
5328-
var hdr [msgHdrSize]byte
53295327

53305328
l := uint32(rl)
53315329
hasHeaders := len(mhdr) > 0
53325330
if hasHeaders {
53335331
l |= hbit
53345332
}
53355333

5334+
// Reserve space for the header on the underlying buffer.
5335+
mb.cache.buf = append(mb.cache.buf, make([]byte, msgHdrSize)...)
5336+
hdr := mb.cache.buf[len(mb.cache.buf)-msgHdrSize : len(mb.cache.buf)]
53365337
le.PutUint32(hdr[0:], l)
53375338
le.PutUint64(hdr[4:], seq)
53385339
le.PutUint64(hdr[12:], uint64(ts))
53395340
le.PutUint16(hdr[20:], uint16(len(subj)))
53405341

53415342
// Now write to underlying buffer.
5342-
mb.cache.buf = append(mb.cache.buf, hdr[:]...)
53435343
mb.cache.buf = append(mb.cache.buf, subj...)
53445344

53455345
if hasHeaders {
@@ -5353,13 +5353,12 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte
53535353
// Calculate hash.
53545354
mb.hh.Reset()
53555355
mb.hh.Write(hdr[4:20])
5356-
mb.hh.Write([]byte(subj))
5356+
mb.hh.Write(stringToBytes(subj))
53575357
if hasHeaders {
53585358
mb.hh.Write(mhdr)
53595359
}
53605360
mb.hh.Write(msg)
5361-
checksum := mb.hh.Sum(nil)
5362-
// Grab last checksum
5361+
checksum := mb.hh.Sum(mb.lchk[:0:highwayhash.Size64])
53635362
copy(mb.lchk[0:], checksum)
53645363

53655364
// Update write through cache.

server/jetstream_cluster_1_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -6601,7 +6601,7 @@ func TestJetStreamClusterMaxDeliveriesOnInterestStreams(t *testing.T) {
66016601
require_Equal(t, ci.AckFloor.Consumer, 1)
66026602
require_Equal(t, ci.AckFloor.Stream, 1)
66036603
require_Equal(t, ci.NumAckPending, 0)
6604-
require_Equal(t, ci.NumRedelivered, 0)
6604+
require_Equal(t, ci.NumRedelivered, 1)
66056605
require_Equal(t, ci.NumPending, 0)
66066606
}
66076607
}

server/jetstream_cluster_3_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -5427,7 +5427,7 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) {
54275427
require_Equal(t, a.AckFloor.Stream, 10)
54285428
}
54295429
require_Equal(t, a.NumPending, 40)
5430-
require_Equal(t, a.NumRedelivered, 0)
5430+
require_Equal(t, a.NumRedelivered, 10)
54315431
a.Cluster, b.Cluster = nil, nil
54325432
a.Delivered.Last, b.Delivered.Last = nil, nil
54335433
if !reflect.DeepEqual(a, b) {

0 commit comments

Comments
 (0)