diff --git a/.travis.yml b/.travis.yml index eb03f2663b0..8a502709b9b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -9,7 +9,6 @@ go: # This should be quoted or use .x, but should not be unquoted. # Remember that a YAML bare float drops trailing zeroes. - "1.23.6" - - "1.22.12" go_import_path: github.com/nats-io/nats-server @@ -46,9 +45,6 @@ jobs: env: TEST_SUITE=srv_pkg_non_js_tests - name: "Run all tests from all other packages" env: TEST_SUITE=non_srv_pkg_tests - - name: "Compile with older Go release" - go: "1.21.x" - env: TEST_SUITE=build_only script: ./scripts/runTestsOnTravis.sh $TEST_SUITE diff --git a/go.mod b/go.mod index d89f8013655..ae1f8ac4f7b 100644 --- a/go.mod +++ b/go.mod @@ -1,18 +1,18 @@ module github.com/nats-io/nats-server/v2 -go 1.22.0 +go 1.23.0 -toolchain go1.22.8 +toolchain go1.23.6 require ( github.com/klauspost/compress v1.18.0 github.com/minio/highwayhash v1.0.3 github.com/nats-io/jwt/v2 v2.7.3 - github.com/nats-io/nats.go v1.39.0 + github.com/nats-io/nats.go v1.39.1 github.com/nats-io/nkeys v0.4.10 github.com/nats-io/nuid v1.0.1 go.uber.org/automaxprocs v1.6.0 - golang.org/x/crypto v0.33.0 + golang.org/x/crypto v0.34.0 golang.org/x/sys v0.30.0 golang.org/x/time v0.10.0 ) diff --git a/go.sum b/go.sum index 67bbd5ac398..fb6deedbe76 100644 --- a/go.sum +++ b/go.sum @@ -6,8 +6,8 @@ github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ= github.com/nats-io/jwt/v2 v2.7.3 h1:6bNPK+FXgBeAqdj4cYQ0F8ViHRbi7woQLq4W29nUAzE= github.com/nats-io/jwt/v2 v2.7.3/go.mod h1:GvkcbHhKquj3pkioy5put1wvPxs78UlZ7D/pY+BgZk4= -github.com/nats-io/nats.go v1.39.0 h1:2/yg2JQjiYYKLwDuBzV0FbB2sIV+eFNkEevlRi4n9lI= -github.com/nats-io/nats.go v1.39.0/go.mod h1:MgRb8oOdigA6cYpEPhXJuRVH6UE/V4jblJ2jQ27IXYM= +github.com/nats-io/nats.go v1.39.1 h1:oTkfKBmz7W047vRxV762M67ZdXeOtUgvbBaNoQ+3PPk= +github.com/nats-io/nats.go v1.39.1/go.mod h1:MgRb8oOdigA6cYpEPhXJuRVH6UE/V4jblJ2jQ27IXYM= github.com/nats-io/nkeys v0.4.10 h1:glmRrpCmYLHByYcePvnTBEAwawwapjCPMjy2huw20wc= github.com/nats-io/nkeys v0.4.10/go.mod h1:OjRrnIKnWBFl+s4YK5ChQfvHP2fxqZexrKJoVVyWB3U= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= @@ -20,8 +20,8 @@ github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMT github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= -golang.org/x/crypto v0.33.0 h1:IOBPskki6Lysi0lo9qQvbxiQ+FvsCC/YWOecCHAixus= -golang.org/x/crypto v0.33.0/go.mod h1:bVdXmD7IV/4GdElGPozy6U7lWdRXA4qyRVGJV57uQ5M= +golang.org/x/crypto v0.34.0 h1:+/C6tk6rf/+t5DhUketUbD1aNGqiSX3j15Z6xuIDlBA= +golang.org/x/crypto v0.34.0/go.mod h1:dy7dXNW32cAb/6/PRuTNsix8T+vJAqvuIy5Bli/x0YQ= golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= diff --git a/server/client.go b/server/client.go index 1556fcb49ee..e3724bc3e54 100644 --- a/server/client.go +++ b/server/client.go @@ -113,8 +113,9 @@ const ( maxNoRTTPingBeforeFirstPong = 2 * time.Second // For stalling fast producers - stallClientMinDuration = 100 * time.Millisecond - stallClientMaxDuration = time.Second + stallClientMinDuration = 2 * time.Millisecond + stallClientMaxDuration = 5 * time.Millisecond + stallTotalAllowed = 10 * time.Millisecond ) var readLoopReportThreshold = readLoopReport @@ -462,6 +463,9 @@ type readCache struct { // Capture the time we started processing our readLoop. start time.Time + + // Total time stalled so far for readLoop processing. + tst time.Duration } // set the flag (would be equivalent to set the boolean to true) @@ -1414,6 +1418,11 @@ func (c *client) readLoop(pre []byte) { } return } + // Clear total stalled time here. + if c.in.tst >= stallClientMaxDuration { + c.rateLimitFormatWarnf("Producer was stalled for a total of %v", c.in.tst.Round(time.Millisecond)) + } + c.in.tst = 0 } // If we are a ROUTER/LEAF and have processed an INFO, it is possible that @@ -1730,7 +1739,7 @@ func (c *client) flushOutbound() bool { // Check if we have a stalled gate and if so and we are recovering release // any stalled producers. Only kind==CLIENT will stall. - if c.out.stc != nil && (n == attempted || c.out.pb < c.out.mp/2) { + if c.out.stc != nil && (n == attempted || c.out.pb < c.out.mp/4*3) { close(c.out.stc) c.out.stc = nil } @@ -2292,7 +2301,8 @@ func (c *client) queueOutbound(data []byte) { // Check here if we should create a stall channel if we are falling behind. // We do this here since if we wait for consumer's writeLoop it could be // too late with large number of fan in producers. - if c.out.pb > c.out.mp/2 && c.out.stc == nil { + // If the outbound connection is > 75% of maximum pending allowed, create a stall gate. + if c.out.pb > c.out.mp/4*3 && c.out.stc == nil { c.out.stc = make(chan struct{}) } } @@ -3337,31 +3347,36 @@ func (c *client) msgHeader(subj, reply []byte, sub *subscription) []byte { } func (c *client) stalledWait(producer *client) { + // Check to see if we have exceeded our total wait time per readLoop invocation. + if producer.in.tst > stallTotalAllowed { + return + } + + // Grab stall channel which the slow consumer will close when caught up. stall := c.out.stc - ttl := stallDuration(c.out.pb, c.out.mp) + c.mu.Unlock() defer c.mu.Lock() + // Calculate stall time. + ttl := stallClientMinDuration + if c.out.pb >= c.out.mp { + ttl = stallClientMaxDuration + } + // Now check if we are close to total allowed. + if producer.in.tst+ttl > stallTotalAllowed { + ttl = stallTotalAllowed - producer.in.tst + } delay := time.NewTimer(ttl) defer delay.Stop() + start := time.Now() select { case <-stall: case <-delay.C: producer.Debugf("Timed out of fast producer stall (%v)", ttl) } -} - -func stallDuration(pb, mp int64) time.Duration { - ttl := stallClientMinDuration - if pb >= mp { - ttl = stallClientMaxDuration - } else if hmp := mp / 2; pb > hmp { - bsz := hmp / 10 - additional := int64(ttl) * ((pb - hmp) / bsz) - ttl += time.Duration(additional) - } - return ttl + producer.in.tst += time.Since(start) } // Used to treat maps as efficient set diff --git a/server/client_test.go b/server/client_test.go index 8de1a5bafba..abf0cb89502 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -2062,32 +2062,6 @@ func TestClientNoSlowConsumerIfConnectExpected(t *testing.T) { } } -func TestClientStalledDuration(t *testing.T) { - for _, test := range []struct { - name string - pb int64 - mp int64 - expectedTTL time.Duration - }{ - {"pb above mp", 110, 100, stallClientMaxDuration}, - {"pb equal mp", 100, 100, stallClientMaxDuration}, - {"pb below mp/2", 49, 100, stallClientMinDuration}, - {"pb equal mp/2", 50, 100, stallClientMinDuration}, - {"pb at 55% of mp", 55, 100, stallClientMinDuration + 1*stallClientMinDuration}, - {"pb at 60% of mp", 60, 100, stallClientMinDuration + 2*stallClientMinDuration}, - {"pb at 70% of mp", 70, 100, stallClientMinDuration + 4*stallClientMinDuration}, - {"pb at 80% of mp", 80, 100, stallClientMinDuration + 6*stallClientMinDuration}, - {"pb at 90% of mp", 90, 100, stallClientMinDuration + 8*stallClientMinDuration}, - {"pb at 99% of mp", 99, 100, stallClientMinDuration + 9*stallClientMinDuration}, - } { - t.Run(test.name, func(t *testing.T) { - if ttl := stallDuration(test.pb, test.mp); ttl != test.expectedTTL { - t.Fatalf("For pb=%v mp=%v, expected TTL to be %v, got %v", test.pb, test.mp, test.expectedTTL, ttl) - } - }) - } -} - func TestClientIPv6Address(t *testing.T) { opts := DefaultOptions() opts.Host = "0.0.0.0" diff --git a/server/consumer.go b/server/consumer.go index dc5f7e58288..83f2f3ce83f 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1749,11 +1749,16 @@ func (o *consumer) hasMaxDeliveries(seq uint64) bool { if o.maxp > 0 && len(o.pending) >= o.maxp { o.signalNewMessages() } - // Cleanup our tracking. - delete(o.pending, seq) - if o.rdc != nil { - delete(o.rdc, seq) + // Make sure to remove from pending. + if p, ok := o.pending[seq]; ok && p != nil { + delete(o.pending, seq) + o.updateDelivered(p.Sequence, seq, dc, p.Timestamp) + } + // Ensure redelivered state is set, if not already. + if o.rdc == nil { + o.rdc = make(map[uint64]uint64) } + o.rdc[seq] = dc return true } return false @@ -2988,6 +2993,7 @@ func (o *consumer) needAck(sseq uint64, subj string) bool { var needAck bool var asflr, osseq uint64 var pending map[uint64]*Pending + var rdc map[uint64]uint64 o.mu.RLock() defer o.mu.RUnlock() @@ -3012,7 +3018,7 @@ func (o *consumer) needAck(sseq uint64, subj string) bool { } if o.isLeader() { asflr, osseq = o.asflr, o.sseq - pending = o.pending + pending, rdc = o.pending, o.rdc } else { if o.store == nil { return false @@ -3023,7 +3029,7 @@ func (o *consumer) needAck(sseq uint64, subj string) bool { return sseq > o.asflr && !o.isFiltered() } // If loading state as here, the osseq is +1. - asflr, osseq, pending = state.AckFloor.Stream, state.Delivered.Stream+1, state.Pending + asflr, osseq, pending, rdc = state.AckFloor.Stream, state.Delivered.Stream+1, state.Pending, state.Redelivered } switch o.cfg.AckPolicy { @@ -3039,6 +3045,12 @@ func (o *consumer) needAck(sseq uint64, subj string) bool { } } + // Finally check if redelivery of this message is tracked. + // If the message is not pending, it should be preserved if it reached max delivery. + if !needAck { + _, needAck = rdc[sseq] + } + return needAck } @@ -3497,7 +3509,10 @@ func (o *consumer) deliveryCount(seq uint64) uint64 { if o.rdc == nil { return 1 } - return o.rdc[seq] + if dc := o.rdc[seq]; dc >= 1 { + return dc + } + return 1 } // Increase the delivery count for this message. @@ -3811,10 +3826,7 @@ func (o *consumer) checkAckFloor() { // Check if this message was pending. o.mu.RLock() p, isPending := o.pending[seq] - var rdc uint64 = 1 - if o.rdc != nil { - rdc = o.rdc[seq] - } + rdc := o.deliveryCount(seq) o.mu.RUnlock() // If it was pending for us, get rid of it. if isPending { @@ -3832,10 +3844,7 @@ func (o *consumer) checkAckFloor() { if p != nil { dseq = p.Sequence } - var rdc uint64 = 1 - if o.rdc != nil { - rdc = o.rdc[seq] - } + rdc := o.deliveryCount(seq) toTerm = append(toTerm, seq, dseq, rdc) } } @@ -5409,10 +5418,7 @@ func (o *consumer) decStreamPending(sseq uint64, subj string) { // Check if this message was pending. p, wasPending := o.pending[sseq] - var rdc uint64 = 1 - if o.rdc != nil { - rdc = o.rdc[sseq] - } + rdc := o.deliveryCount(sseq) o.mu.Unlock() diff --git a/server/filestore.go b/server/filestore.go index 60829447afb..7168e76a453 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -5323,9 +5323,7 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte // With headers, high bit on total length will be set. // total_len(4) sequence(8) timestamp(8) subj_len(2) subj hdr_len(4) hdr msg hash(8) - // First write header, etc. var le = binary.LittleEndian - var hdr [msgHdrSize]byte l := uint32(rl) hasHeaders := len(mhdr) > 0 @@ -5333,13 +5331,15 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte l |= hbit } + // Reserve space for the header on the underlying buffer. + mb.cache.buf = append(mb.cache.buf, make([]byte, msgHdrSize)...) + hdr := mb.cache.buf[len(mb.cache.buf)-msgHdrSize : len(mb.cache.buf)] le.PutUint32(hdr[0:], l) le.PutUint64(hdr[4:], seq) le.PutUint64(hdr[12:], uint64(ts)) le.PutUint16(hdr[20:], uint16(len(subj))) // Now write to underlying buffer. - mb.cache.buf = append(mb.cache.buf, hdr[:]...) mb.cache.buf = append(mb.cache.buf, subj...) if hasHeaders { @@ -5353,13 +5353,12 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte // Calculate hash. mb.hh.Reset() mb.hh.Write(hdr[4:20]) - mb.hh.Write([]byte(subj)) + mb.hh.Write(stringToBytes(subj)) if hasHeaders { mb.hh.Write(mhdr) } mb.hh.Write(msg) - checksum := mb.hh.Sum(nil) - // Grab last checksum + checksum := mb.hh.Sum(mb.lchk[:0:highwayhash.Size64]) copy(mb.lchk[0:], checksum) // Update write through cache. diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index ca2eed3e507..6ae0afa8700 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -6601,7 +6601,7 @@ func TestJetStreamClusterMaxDeliveriesOnInterestStreams(t *testing.T) { require_Equal(t, ci.AckFloor.Consumer, 1) require_Equal(t, ci.AckFloor.Stream, 1) require_Equal(t, ci.NumAckPending, 0) - require_Equal(t, ci.NumRedelivered, 0) + require_Equal(t, ci.NumRedelivered, 1) require_Equal(t, ci.NumPending, 0) } } diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 13ca180bb02..18f4d4c15b4 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -5427,7 +5427,7 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) { require_Equal(t, a.AckFloor.Stream, 10) } require_Equal(t, a.NumPending, 40) - require_Equal(t, a.NumRedelivered, 0) + require_Equal(t, a.NumRedelivered, 10) a.Cluster, b.Cluster = nil, nil a.Delivered.Last, b.Delivered.Last = nil, nil if !reflect.DeepEqual(a, b) { diff --git a/server/jetstream_consumer_test.go b/server/jetstream_consumer_test.go index a45a30a13d5..4652c19d549 100644 --- a/server/jetstream_consumer_test.go +++ b/server/jetstream_consumer_test.go @@ -1707,3 +1707,56 @@ func TestJetStreamConsumerMessageDeletedDuringRedelivery(t *testing.T) { }) } } + +func TestJetStreamConsumerDeliveryCount(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + require_NoError(t, err) + + for i := 0; i < 2; i++ { + _, err = js.Publish("foo", nil) + require_NoError(t, err) + } + + sub, err := js.PullSubscribe( + "foo", + "CONSUMER", + nats.ManualAck(), + nats.AckExplicit(), + nats.AckWait(time.Second), + nats.MaxDeliver(1), + ) + require_NoError(t, err) + + acc, err := s.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + o := mset.lookupConsumer("CONSUMER") + require_NotNil(t, o) + + msgs, err := sub.Fetch(2) + require_NoError(t, err) + require_Len(t, len(msgs), 2) + require_NoError(t, msgs[1].Nak()) + + require_Equal(t, o.deliveryCount(1), 1) + require_Equal(t, o.deliveryCount(2), 1) + + // max deliver 1 so this will fail + _, err = sub.Fetch(1, nats.MaxWait(250*time.Millisecond)) + require_Error(t, err) + + // This would previously report delivery count 0, because o.rdc!=nil + require_Equal(t, o.deliveryCount(1), 1) + require_Equal(t, o.deliveryCount(2), 1) + +} diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 0b183f6e208..8a9a26ec79a 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -23203,3 +23203,108 @@ func TestJetStreamConsumerPendingCountAfterMsgAckAboveFloor(t *testing.T) { // We've fetched 2 message so should report 0 pending. requireExpected(0) } + +// https://github.com/nats-io/nats-server/issues/6538 +func TestJetStreamInterestMaxDeliveryReached(t *testing.T) { + maxWait := 250 * time.Millisecond + for _, useNak := range []bool{true, false} { + for _, test := range []struct { + title string + action func(s *Server, sub *nats.Subscription) + }{ + { + title: "fetch", + action: func(s *Server, sub *nats.Subscription) { + time.Sleep(time.Second) + + // max deliver 1 so this will fail + _, err := sub.Fetch(1, nats.MaxWait(maxWait)) + require_Error(t, err) + }, + }, + { + title: "expire pending", + action: func(s *Server, sub *nats.Subscription) { + acc, err := s.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + o := mset.lookupConsumer("consumer") + require_NotNil(t, o) + + o.mu.Lock() + o.forceExpirePending() + o.mu.Unlock() + }, + }, + } { + title := fmt.Sprintf("nak/%s", test.title) + if !useNak { + title = fmt.Sprintf("no-%s", title) + } + t.Run(title, func(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Storage: nats.FileStorage, + Subjects: []string{"test"}, + Replicas: 1, + Retention: nats.InterestPolicy, + }) + require_NoError(t, err) + + sub, err := js.PullSubscribe("test", "consumer", nats.AckWait(time.Second), nats.MaxDeliver(1)) + require_NoError(t, err) + + _, err = nc.Request("test", []byte("hello"), maxWait) + require_NoError(t, err) + + nfo, err := js.StreamInfo("TEST") + require_NoError(t, err) + require_Equal(t, nfo.State.Msgs, uint64(1)) + + msg, err := sub.Fetch(1, nats.MaxWait(maxWait)) + require_NoError(t, err) + require_Len(t, 1, len(msg)) + if useNak { + require_NoError(t, msg[0].Nak()) + } + + cnfo, err := js.ConsumerInfo("TEST", "consumer") + require_NoError(t, err) + require_Equal(t, cnfo.NumAckPending, 1) + + test.action(s, sub) + + // max deliver 1 so this will fail + _, err = sub.Fetch(1, nats.MaxWait(maxWait)) + require_Error(t, err) + + cnfo, err = js.ConsumerInfo("TEST", "consumer") + require_NoError(t, err) + require_Equal(t, cnfo.NumAckPending, 0) + + nfo, err = js.StreamInfo("TEST") + require_NoError(t, err) + require_Equal(t, nfo.State.Msgs, uint64(1)) + + sub2, err := js.PullSubscribe("test", "consumer2") + require_NoError(t, err) + + msg, err = sub2.Fetch(1) + require_NoError(t, err) + require_Len(t, 1, len(msg)) + require_NoError(t, msg[0].AckSync()) + + nfo, err = js.StreamInfo("TEST") + require_NoError(t, err) + require_Equal(t, nfo.State.Msgs, uint64(1)) + }) + } + } +} diff --git a/server/norace_test.go b/server/norace_test.go index 8c9785f505c..9f58bd81b11 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -11502,3 +11502,61 @@ func TestNoRaceNoFastProducerStall(t *testing.T) { } wg.Wait() } + +func TestNoRaceProducerStallLimits(t *testing.T) { + tmpl := ` + listen: "127.0.0.1:-1" + ` + conf := createConfFile(t, []byte(tmpl)) + s, _ := RunServerWithConfig(conf) + defer s.Shutdown() + + ncSlow := natsConnect(t, s.ClientURL()) + defer ncSlow.Close() + natsSub(t, ncSlow, "foo", func(m *nats.Msg) { m.Respond([]byte("42")) }) + natsFlush(t, ncSlow) + + ncProd := natsConnect(t, s.ClientURL()) + defer ncProd.Close() + + cid, err := ncSlow.GetClientID() + require_NoError(t, err) + c := s.GetClient(cid) + require_True(t, c != nil) + + // Artificially set a stall channel on the subscriber. + c.mu.Lock() + c.out.stc = make(chan struct{}) + c.mu.Unlock() + + start := time.Now() + _, err = ncProd.Request("foo", []byte("HELLO"), time.Second) + elapsed := time.Since(start) + require_NoError(t, err) + + // This should have not cleared on its own but should have bettwen min and max pause. + require_True(t, elapsed >= stallClientMinDuration) + require_True(t, elapsed < stallClientMaxDuration) + + // Now test total maximum by loading up a bunch of requests and measuring the last one. + // Artificially set a stall channel again on the subscriber. + c.mu.Lock() + c.out.stc = make(chan struct{}) + // This will prevent us from clearing the stc. + c.out.pb = c.out.mp/4*3 + 100 + c.mu.Unlock() + + for i := 0; i < 10; i++ { + err = ncProd.PublishRequest("foo", "bar", []byte("HELLO")) + require_NoError(t, err) + } + start = time.Now() + _, err = ncProd.Request("foo", []byte("HELLO"), time.Second) + elapsed = time.Since(start) + require_NoError(t, err) + + require_True(t, elapsed >= stallTotalAllowed) + // Should always be close to totalAllowed (e.g. 10ms), but if you run alot of them in one go can bump up + // just past 12ms, hence the Max setting below to avoid a flapper. + require_True(t, elapsed < stallTotalAllowed+stallClientMaxDuration) +}