Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

telemetry: add TCP RTT info collection #4745

Merged
merged 20 commits into from
Nov 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions logging/telemetryspec/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package telemetryspec

import (
"time"

"github.com/algorand/go-algorand/util"
)

// Telemetry Events
Expand Down Expand Up @@ -302,6 +304,8 @@ type PeerConnectionDetails struct {
MessageDelay int64 `json:",omitempty"`
// DuplicateFilterCount is the number of times this peer has sent us a message hash to filter that it had already sent before.
DuplicateFilterCount uint64
// TCPInfo provides connection measurements from TCP.
TCP util.TCPInfo `json:",omitempty"`
}

// CatchpointGenerationEvent event
Expand Down
4 changes: 4 additions & 0 deletions network/limitlistener/rejectingLimitListener.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,7 @@ func (l *rejectingLimitListenerConn) Close() error {
l.releaseOnce.Do(l.release)
return err
}

func (l *rejectingLimitListenerConn) UnderlyingConn() net.Conn {
return l.Conn
}
4 changes: 4 additions & 0 deletions network/requestTracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,10 @@ type requestTrackedConnection struct {
tracker *RequestTracker
}

func (c *requestTrackedConnection) UnderlyingConn() net.Conn {
return c.Conn
}

// Close removes the connection from the tracker's connections map and call the underlaying Close function.
func (c *requestTrackedConnection) Close() error {
c.tracker.hostRequestsMu.Lock()
Expand Down
24 changes: 22 additions & 2 deletions network/wsNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -1805,14 +1805,23 @@ func (wn *WebsocketNetwork) OnNetworkAdvance() {
// to the telemetry server. Internally, it's using a timer to ensure that it would only
// send the information once every hour ( configurable via PeerConnectionsUpdateInterval )
func (wn *WebsocketNetwork) sendPeerConnectionsTelemetryStatus() {
if !wn.log.GetTelemetryEnabled() {
return
}
now := time.Now()
if wn.lastPeerConnectionsSent.Add(time.Duration(wn.config.PeerConnectionsUpdateInterval)*time.Second).After(now) || wn.config.PeerConnectionsUpdateInterval <= 0 {
// it's not yet time to send the update.
return
}
wn.lastPeerConnectionsSent = now

var peers []*wsPeer
peers, _ = wn.peerSnapshot(peers)
connectionDetails := wn.getPeerConnectionTelemetryDetails(now, peers)
wn.log.EventWithDetails(telemetryspec.Network, telemetryspec.PeerConnectionsEvent, connectionDetails)
}

func (wn *WebsocketNetwork) getPeerConnectionTelemetryDetails(now time.Time, peers []*wsPeer) telemetryspec.PeersConnectionDetails {
var connectionDetails telemetryspec.PeersConnectionDetails
for _, peer := range peers {
connDetail := telemetryspec.PeerConnectionDetails{
Expand All @@ -1821,6 +1830,18 @@ func (wn *WebsocketNetwork) sendPeerConnectionsTelemetryStatus() {
InstanceName: peer.InstanceName,
DuplicateFilterCount: peer.duplicateFilterCount,
}
// unwrap websocket.Conn, requestTrackedConnection, rejectingLimitListenerConn
var uconn net.Conn = peer.conn.UnderlyingConn()
for i := 0; i < 10; i++ {
wconn, ok := uconn.(wrappedConn)
if !ok {
break
}
uconn = wconn.UnderlyingConn()
}
if tcpInfo, err := util.GetConnTCPInfo(uconn); err == nil && tcpInfo != nil {
connDetail.TCP = *tcpInfo
}
if peer.outgoing {
connDetail.Address = justHost(peer.conn.RemoteAddr().String())
connDetail.Endpoint = peer.GetAddress()
Expand All @@ -1831,8 +1852,7 @@ func (wn *WebsocketNetwork) sendPeerConnectionsTelemetryStatus() {
connectionDetails.IncomingPeers = append(connectionDetails.IncomingPeers, connDetail)
}
}

wn.log.EventWithDetails(telemetryspec.Network, telemetryspec.PeerConnectionsEvent, connectionDetails)
return connectionDetails
}

// prioWeightRefreshTime controls how often we refresh the weights
Expand Down
218 changes: 108 additions & 110 deletions network/wsNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"context"
"encoding/binary"
"encoding/json"
"fmt"
"io"
"math/rand"
Expand Down Expand Up @@ -268,24 +269,30 @@ func netStop(t testing.TB, wn *WebsocketNetwork, name string) {
t.Logf("%s done", name)
}

// Set up two nodes, test that a.Broadcast is received by B
func TestWebsocketNetworkBasic(t *testing.T) {
partitiontest.PartitionTest(t)
func setupWebsocketNetworkAB(t *testing.T, countTarget int) (*WebsocketNetwork, *WebsocketNetwork, *messageCounterHandler, func()) {
success := false

netA := makeTestWebsocketNode(t)
netA.config.GossipFanout = 1
netA.Start()
defer netStop(t, netA, "A")
defer func() {
if !success {
netStop(t, netA, "A")
}
}()
netB := makeTestWebsocketNode(t)
netB.config.GossipFanout = 1
addrA, postListen := netA.Address()
require.True(t, postListen)
t.Log(addrA)
netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole)
netB.Start()
defer netStop(t, netB, "B")
counter := newMessageCounter(t, 2)
counterDone := counter.done
defer func() {
if !success {
netStop(t, netB, "B")
}
}()
counter := newMessageCounter(t, countTarget)
netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}})

readyTimeout := time.NewTimer(2 * time.Second)
Expand All @@ -294,6 +301,21 @@ func TestWebsocketNetworkBasic(t *testing.T) {
waitReady(t, netB, readyTimeout.C)
t.Log("b ready")

success = true
closeFunc := func() {
netStop(t, netB, "B")
netStop(t, netB, "A")
}
return netA, netB, counter, closeFunc
}

// Set up two nodes, test that a.Broadcast is received by B
func TestWebsocketNetworkBasic(t *testing.T) {
partitiontest.PartitionTest(t)

netA, _, counter, closeFunc := setupWebsocketNetworkAB(t, 2)
defer closeFunc()
counterDone := counter.done
netA.Broadcast(context.Background(), protocol.TxnTag, []byte("foo"), false, nil)
netA.Broadcast(context.Background(), protocol.TxnTag, []byte("bar"), false, nil)

Expand Down Expand Up @@ -384,27 +406,9 @@ func TestWebsocketProposalPayloadCompression(t *testing.T) {
func TestWebsocketNetworkUnicast(t *testing.T) {
partitiontest.PartitionTest(t)

netA := makeTestWebsocketNode(t)
netA.config.GossipFanout = 1
netA.Start()
defer netStop(t, netA, "A")
netB := makeTestWebsocketNode(t)
netB.config.GossipFanout = 1
addrA, postListen := netA.Address()
require.True(t, postListen)
t.Log(addrA)
netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole)
netB.Start()
defer netStop(t, netB, "B")
counter := newMessageCounter(t, 2)
netA, _, counter, closeFunc := setupWebsocketNetworkAB(t, 2)
defer closeFunc()
counterDone := counter.done
netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}})

readyTimeout := time.NewTimer(2 * time.Second)
waitReady(t, netA, readyTimeout.C)
t.Log("a ready")
waitReady(t, netB, readyTimeout.C)
t.Log("b ready")

require.Equal(t, 1, len(netA.peers))
require.Equal(t, 1, len(netA.GetPeers(PeersConnectedIn)))
Expand All @@ -425,26 +429,8 @@ func TestWebsocketNetworkUnicast(t *testing.T) {
func TestWebsocketPeerData(t *testing.T) {
partitiontest.PartitionTest(t)

netA := makeTestWebsocketNode(t)
netA.config.GossipFanout = 1
netA.Start()
defer netStop(t, netA, "A")
netB := makeTestWebsocketNode(t)
netB.config.GossipFanout = 1
addrA, postListen := netA.Address()
require.True(t, postListen)
t.Log(addrA)
netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole)
netB.Start()
defer netStop(t, netB, "B")
counter := newMessageCounter(t, 2)
netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}})

readyTimeout := time.NewTimer(2 * time.Second)
waitReady(t, netA, readyTimeout.C)
t.Log("a ready")
waitReady(t, netB, readyTimeout.C)
t.Log("b ready")
netA, _, _, closeFunc := setupWebsocketNetworkAB(t, 2)
defer closeFunc()

require.Equal(t, 1, len(netA.peers))
require.Equal(t, 1, len(netA.GetPeers(PeersConnectedIn)))
Expand All @@ -463,27 +449,9 @@ func TestWebsocketPeerData(t *testing.T) {
func TestWebsocketNetworkArray(t *testing.T) {
partitiontest.PartitionTest(t)

netA := makeTestWebsocketNode(t)
netA.config.GossipFanout = 1
netA.Start()
defer netStop(t, netA, "A")
netB := makeTestWebsocketNode(t)
netB.config.GossipFanout = 1
addrA, postListen := netA.Address()
require.True(t, postListen)
t.Log(addrA)
netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole)
netB.Start()
defer netStop(t, netB, "B")
counter := newMessageCounter(t, 3)
netA, _, counter, closeFunc := setupWebsocketNetworkAB(t, 3)
defer closeFunc()
counterDone := counter.done
netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}})

readyTimeout := time.NewTimer(2 * time.Second)
waitReady(t, netA, readyTimeout.C)
t.Log("a ready")
waitReady(t, netB, readyTimeout.C)
t.Log("b ready")

tags := []protocol.Tag{protocol.TxnTag, protocol.TxnTag, protocol.TxnTag}
data := [][]byte{[]byte("foo"), []byte("bar"), []byte("algo")}
Expand All @@ -500,27 +468,9 @@ func TestWebsocketNetworkArray(t *testing.T) {
func TestWebsocketNetworkCancel(t *testing.T) {
partitiontest.PartitionTest(t)

netA := makeTestWebsocketNode(t)
netA.config.GossipFanout = 1
netA.Start()
defer netStop(t, netA, "A")
netB := makeTestWebsocketNode(t)
netB.config.GossipFanout = 1
addrA, postListen := netA.Address()
require.True(t, postListen)
t.Log(addrA)
netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole)
netB.Start()
defer netStop(t, netB, "B")
counter := newMessageCounter(t, 100)
netA, _, counter, closeFunc := setupWebsocketNetworkAB(t, 100)
defer closeFunc()
counterDone := counter.done
netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counter}})

readyTimeout := time.NewTimer(2 * time.Second)
waitReady(t, netA, readyTimeout.C)
t.Log("a ready")
waitReady(t, netB, readyTimeout.C)
t.Log("b ready")

tags := make([]protocol.Tag, 100)
data := make([][]byte, 100)
Expand Down Expand Up @@ -721,29 +671,15 @@ func TestAddrToGossipAddr(t *testing.T) {

type nopConn struct{}

func (nc *nopConn) RemoteAddr() net.Addr {
return nil
}
func (nc *nopConn) NextReader() (int, io.Reader, error) {
return 0, nil, nil
}
func (nc *nopConn) WriteMessage(int, []byte) error {
return nil
}
func (nc *nopConn) WriteControl(int, []byte, time.Time) error {
return nil
}
func (nc *nopConn) SetReadLimit(limit int64) {
}
func (nc *nopConn) CloseWithoutFlush() error {
return nil
}
func (nc *nopConn) SetPingHandler(h func(appData string) error) {

}
func (nc *nopConn) SetPongHandler(h func(appData string) error) {

}
func (nc *nopConn) RemoteAddr() net.Addr { return nil }
func (nc *nopConn) NextReader() (int, io.Reader, error) { return 0, nil, nil }
func (nc *nopConn) WriteMessage(int, []byte) error { return nil }
func (nc *nopConn) WriteControl(int, []byte, time.Time) error { return nil }
func (nc *nopConn) SetReadLimit(limit int64) {}
func (nc *nopConn) CloseWithoutFlush() error { return nil }
func (nc *nopConn) SetPingHandler(h func(appData string) error) {}
func (nc *nopConn) SetPongHandler(h func(appData string) error) {}
func (nc *nopConn) UnderlyingConn() net.Conn { return nil }

var nopConnSingleton = nopConn{}

Expand Down Expand Up @@ -2739,3 +2675,65 @@ func TestPreparePeerData(t *testing.T) {
}
}
}

func TestWebsocketNetworkTelemetryTCP(t *testing.T) {
partitiontest.PartitionTest(t)

// start two networks and send 2 messages from A to B
closed := false
netA, netB, counter, closeFunc := setupWebsocketNetworkAB(t, 2)
defer func() {
if !closed {
closeFunc()
}
}()
counterDone := counter.done
netA.Broadcast(context.Background(), protocol.TxnTag, []byte("foo"), false, nil)
netA.Broadcast(context.Background(), protocol.TxnTag, []byte("bar"), false, nil)

select {
case <-counterDone:
case <-time.After(2 * time.Second):
t.Errorf("timeout, count=%d, wanted 2", counter.count)
}

// get RTT from both ends and assert nonzero
var peersA, peersB []*wsPeer
peersA, _ = netA.peerSnapshot(peersA)
detailsA := netA.getPeerConnectionTelemetryDetails(time.Now(), peersA)
peersB, _ = netB.peerSnapshot(peersB)
detailsB := netB.getPeerConnectionTelemetryDetails(time.Now(), peersB)
require.Len(t, detailsA.IncomingPeers, 1)
assert.NotZero(t, detailsA.IncomingPeers[0].TCP.RTT)
require.Len(t, detailsB.OutgoingPeers, 1)
assert.NotZero(t, detailsB.OutgoingPeers[0].TCP.RTT)

pcdA, err := json.Marshal(detailsA)
assert.NoError(t, err)
pcdB, err := json.Marshal(detailsB)
assert.NoError(t, err)
t.Log("detailsA", string(pcdA))
t.Log("detailsB", string(pcdB))

// close connections
closeFunc()
closed = true
// open more FDs by starting 2 more networks
_, _, _, closeFunc2 := setupWebsocketNetworkAB(t, 2)
defer closeFunc2()
// use stale peers snapshot from closed networks to get telemetry
// *net.OpError "use of closed network connection" err results in 0 rtt values
detailsA = netA.getPeerConnectionTelemetryDetails(time.Now(), peersA)
detailsB = netB.getPeerConnectionTelemetryDetails(time.Now(), peersB)
require.Len(t, detailsA.IncomingPeers, 1)
assert.Zero(t, detailsA.IncomingPeers[0].TCP.RTT)
require.Len(t, detailsB.OutgoingPeers, 1)
assert.Zero(t, detailsB.OutgoingPeers[0].TCP.RTT)

pcdA, err = json.Marshal(detailsA)
assert.NoError(t, err)
pcdB, err = json.Marshal(detailsB)
assert.NoError(t, err)
t.Log("closed detailsA", string(pcdA))
t.Log("closed detailsB", string(pcdB))
}
5 changes: 5 additions & 0 deletions network/wsPeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ type wsPeerWebsocketConn interface {
CloseWithoutFlush() error
SetPingHandler(h func(appData string) error)
SetPongHandler(h func(appData string) error)
wrappedConn
}

type wrappedConn interface {
UnderlyingConn() net.Conn
}

type sendMessage struct {
Expand Down
Loading