Skip to content

Commit deb6350

Browse files
[FIXED] LeafNode: connection may fail on slow link (#5424)
Added the leafnode remote configuration parameter `first_info_timeout` which is the amount of time that a server creating a leafnode connection will wait for the initial INFO from the remote server. Resolves #5417 Signed-off-by: Ivan Kozlovic <[email protected]>
2 parents 5670206 + d57650a commit deb6350

File tree

4 files changed

+177
-2
lines changed

4 files changed

+177
-2
lines changed

server/config_check_test.go

+17
Original file line numberDiff line numberDiff line change
@@ -2008,6 +2008,23 @@ func TestConfigCheck(t *testing.T) {
20082008
errorLine: 9,
20092009
errorPos: 9,
20102010
},
2011+
{
2012+
name: "invalid duration for remote leafnode first info timeout",
2013+
config: `
2014+
leafnodes {
2015+
port: -1
2016+
remotes [
2017+
{
2018+
url: "nats://127.0.0.1:123"
2019+
first_info_timeout: abc
2020+
}
2021+
]
2022+
}
2023+
`,
2024+
err: fmt.Errorf("error parsing first_info_timeout: time: invalid duration %q", "abc"),
2025+
errorLine: 7,
2026+
errorPos: 8,
2027+
},
20112028
{
20122029
name: "show warnings on empty configs without values",
20132030
config: ``,

server/leafnode.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -982,6 +982,7 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf
982982
c.Noticef("Leafnode connection created%s %s", remoteSuffix, c.opts.Name)
983983

984984
var tlsFirst bool
985+
var infoTimeout time.Duration
985986
if remote != nil {
986987
solicited = true
987988
remote.Lock()
@@ -991,6 +992,7 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf
991992
c.leaf.isSpoke = true
992993
}
993994
tlsFirst = remote.TLSHandshakeFirst
995+
infoTimeout = remote.FirstInfoTimeout
994996
remote.Unlock()
995997
c.acc = acc
996998
} else {
@@ -1048,7 +1050,7 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf
10481050
}
10491051
}
10501052
// We need to wait for the info, but not for too long.
1051-
c.nc.SetReadDeadline(time.Now().Add(DEFAULT_LEAFNODE_INFO_WAIT))
1053+
c.nc.SetReadDeadline(time.Now().Add(infoTimeout))
10521054
}
10531055

10541056
// We will process the INFO from the readloop and finish by
@@ -2831,6 +2833,7 @@ func (c *client) leafNodeSolicitWSConnection(opts *Options, rURL *url.URL, remot
28312833
compress := remote.Websocket.Compression
28322834
// By default the server will mask outbound frames, but it can be disabled with this option.
28332835
noMasking := remote.Websocket.NoMasking
2836+
infoTimeout := remote.FirstInfoTimeout
28342837
remote.RUnlock()
28352838
// Will do the client-side TLS handshake if needed.
28362839
tlsRequired, err := c.leafClientHandshakeIfNeeded(remote, opts)
@@ -2883,14 +2886,14 @@ func (c *client) leafNodeSolicitWSConnection(opts *Options, rURL *url.URL, remot
28832886
if noMasking {
28842887
req.Header.Add(wsNoMaskingHeader, wsNoMaskingValue)
28852888
}
2889+
c.nc.SetDeadline(time.Now().Add(infoTimeout))
28862890
if err := req.Write(c.nc); err != nil {
28872891
return nil, WriteError, err
28882892
}
28892893

28902894
var resp *http.Response
28912895

28922896
br := bufio.NewReaderSize(c.nc, MAX_CONTROL_LINE_SIZE)
2893-
c.nc.SetReadDeadline(time.Now().Add(DEFAULT_LEAFNODE_INFO_WAIT))
28942897
resp, err = http.ReadResponse(br, req)
28952898
if err == nil &&
28962899
(resp.StatusCode != 101 ||

server/leafnode_test.go

+144
Original file line numberDiff line numberDiff line change
@@ -7607,3 +7607,147 @@ func TestLeafNodeLoopDetectionOnActualLoop(t *testing.T) {
76077607
t.Fatalf("Did not get any error regarding loop")
76087608
}
76097609
}
7610+
7611+
func TestLeafNodeConnectionSucceedsEvenWithDelayedFirstINFO(t *testing.T) {
7612+
for _, test := range []struct {
7613+
name string
7614+
websocket bool
7615+
}{
7616+
{"regular", false},
7617+
{"websocket", true},
7618+
} {
7619+
t.Run(test.name, func(t *testing.T) {
7620+
ob := DefaultOptions()
7621+
ob.ServerName = "HUB"
7622+
ob.LeafNode.Host = "127.0.0.1"
7623+
ob.LeafNode.Port = -1
7624+
ob.LeafNode.AuthTimeout = 10
7625+
if test.websocket {
7626+
ob.Websocket.Host = "127.0.0.1"
7627+
ob.Websocket.Port = -1
7628+
ob.Websocket.HandshakeTimeout = 10 * time.Second
7629+
ob.Websocket.AuthTimeout = 10
7630+
ob.Websocket.NoTLS = true
7631+
}
7632+
sb := RunServer(ob)
7633+
defer sb.Shutdown()
7634+
7635+
var port int
7636+
var scheme string
7637+
if test.websocket {
7638+
port = ob.Websocket.Port
7639+
scheme = wsSchemePrefix
7640+
} else {
7641+
port = ob.LeafNode.Port
7642+
scheme = "nats"
7643+
}
7644+
7645+
urlStr := fmt.Sprintf("%s://127.0.0.1:%d", scheme, port)
7646+
proxy := createNetProxy(1100*time.Millisecond, 1024*1024*1024, 1024*1024*1024, urlStr, true)
7647+
defer proxy.stop()
7648+
proxyURL := proxy.clientURL()
7649+
_, proxyPort, err := net.SplitHostPort(proxyURL[len(scheme)+3:])
7650+
require_NoError(t, err)
7651+
7652+
lnBURL, err := url.Parse(fmt.Sprintf("%s://127.0.0.1:%s", scheme, proxyPort))
7653+
require_NoError(t, err)
7654+
7655+
oa := DefaultOptions()
7656+
oa.ServerName = "SPOKE"
7657+
oa.Cluster.Name = "xyz"
7658+
remote := &RemoteLeafOpts{
7659+
URLs: []*url.URL{lnBURL},
7660+
FirstInfoTimeout: 3 * time.Second,
7661+
}
7662+
oa.LeafNode.Remotes = []*RemoteLeafOpts{remote}
7663+
sa := RunServer(oa)
7664+
defer sa.Shutdown()
7665+
7666+
checkLeafNodeConnected(t, sa)
7667+
})
7668+
}
7669+
}
7670+
7671+
type captureLeafConnClosed struct {
7672+
DummyLogger
7673+
ch chan struct{}
7674+
}
7675+
7676+
func (l *captureLeafConnClosed) Noticef(format string, v ...any) {
7677+
msg := fmt.Sprintf(format, v...)
7678+
if strings.Contains(msg, "Leafnode connection closed: Read Error") {
7679+
select {
7680+
case l.ch <- struct{}{}:
7681+
default:
7682+
}
7683+
}
7684+
}
7685+
7686+
func TestLeafNodeDetectsStaleConnectionIfNoInfo(t *testing.T) {
7687+
for _, test := range []struct {
7688+
name string
7689+
websocket bool
7690+
}{
7691+
{"regular", false},
7692+
{"websocket", true},
7693+
} {
7694+
t.Run(test.name, func(t *testing.T) {
7695+
l, err := net.Listen("tcp", "127.0.0.1:0")
7696+
require_NoError(t, err)
7697+
defer l.Close()
7698+
7699+
ch := make(chan struct{})
7700+
wg := sync.WaitGroup{}
7701+
wg.Add(1)
7702+
go func() {
7703+
defer wg.Done()
7704+
c, err := l.Accept()
7705+
if err != nil {
7706+
return
7707+
}
7708+
defer c.Close()
7709+
<-ch
7710+
}()
7711+
7712+
var scheme string
7713+
if test.websocket {
7714+
scheme = wsSchemePrefix
7715+
} else {
7716+
scheme = "nats"
7717+
}
7718+
urlStr := fmt.Sprintf("%s://%s", scheme, l.Addr())
7719+
lnBURL, err := url.Parse(urlStr)
7720+
require_NoError(t, err)
7721+
7722+
oa := DefaultOptions()
7723+
oa.ServerName = "SPOKE"
7724+
oa.Cluster.Name = "xyz"
7725+
remote := &RemoteLeafOpts{
7726+
URLs: []*url.URL{lnBURL},
7727+
FirstInfoTimeout: 250 * time.Millisecond,
7728+
}
7729+
oa.LeafNode.Remotes = []*RemoteLeafOpts{remote}
7730+
oa.DisableShortFirstPing = false
7731+
oa.NoLog = false
7732+
sa, err := NewServer(oa)
7733+
require_NoError(t, err)
7734+
defer sa.Shutdown()
7735+
7736+
log := &captureLeafConnClosed{ch: make(chan struct{}, 1)}
7737+
sa.SetLogger(log, false, false)
7738+
sa.Start()
7739+
7740+
select {
7741+
case <-log.ch:
7742+
// OK
7743+
case <-time.After(750 * time.Millisecond):
7744+
t.Fatalf("Connection was not closed")
7745+
}
7746+
7747+
sa.Shutdown()
7748+
close(ch)
7749+
wg.Wait()
7750+
sa.WaitForShutdown()
7751+
})
7752+
}
7753+
}

server/opts.go

+11
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,11 @@ type RemoteLeafOpts struct {
205205
DenyImports []string `json:"-"`
206206
DenyExports []string `json:"-"`
207207

208+
// FirstInfoTimeout is the amount of time the server will wait for the
209+
// initial INFO protocol from the remote server before closing the
210+
// connection.
211+
FirstInfoTimeout time.Duration `json:"-"`
212+
208213
// Compression options for this remote. Each remote could have a different
209214
// setting and also be different from the LeafNode options.
210215
Compression CompressionOpts `json:"-"`
@@ -2668,6 +2673,8 @@ func parseRemoteLeafNodes(v any, errors *[]error, warnings *[]error) ([]*RemoteL
26682673
*errors = append(*errors, err)
26692674
continue
26702675
}
2676+
case "first_info_timeout":
2677+
remote.FirstInfoTimeout = parseDuration(k, tk, v, errors, warnings)
26712678
default:
26722679
if !tk.IsUsedVariable() {
26732680
err := &unknownConfigFieldErr{
@@ -5376,6 +5383,10 @@ func setBaselineOptions(opts *Options) {
53765383
c.Mode = CompressionS2Auto
53775384
}
53785385
}
5386+
// Set default first info timeout value if not set.
5387+
if r.FirstInfoTimeout <= 0 {
5388+
r.FirstInfoTimeout = DEFAULT_LEAFNODE_INFO_WAIT
5389+
}
53795390
}
53805391
}
53815392

0 commit comments

Comments
 (0)