Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

network: Allow short-lived connections to query /status endpoint when at full capacity #6009

Merged
2 changes: 1 addition & 1 deletion config/localTemplate.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ type Local struct {
LogArchiveDir string `version[31]:""`

// IncomingConnectionsLimit specifies the max number of incoming connections
// for the port configured in NetAddress. 0 means no connections allowed. Must be non-negative.
// for the gossip protocol configured in NetAddress. 0 means no connections allowed. Must be non-negative.
// Estimating 1.5MB per incoming connection, 1.5MB*2400 = 3.6GB
IncomingConnectionsLimit int `version[0]:"-1" version[1]:"10000" version[17]:"800" version[27]:"2400"`

Expand Down
3 changes: 2 additions & 1 deletion daemon/algod/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/logging/telemetryspec"
"github.com/algorand/go-algorand/network"
"github.com/algorand/go-algorand/network/limitlistener"
"github.com/algorand/go-algorand/node"
"github.com/algorand/go-algorand/util"
Expand Down Expand Up @@ -146,7 +147,7 @@ func (s *Server) Initialize(cfg config.Local, phonebookAddresses []string, genes

if cfg.IsGossipServer() {
var ot basics.OverflowTracker
fdRequired = ot.Add(fdRequired, uint64(cfg.IncomingConnectionsLimit))
fdRequired = ot.Add(fdRequired, uint64(cfg.IncomingConnectionsLimit)+network.ReservedHealthServiceConnections)
if ot.Overflowed {
return errors.New("Initialize() overflowed when adding up IncomingConnectionsLimit to the existing RLIMIT_NOFILE value; decrease RestConnectionsHardLimit or IncomingConnectionsLimit")
}
Expand Down
24 changes: 20 additions & 4 deletions network/requestTracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,13 +486,30 @@ func (rt *RequestTracker) GetRequestConnection(request *http.Request) net.Conn {
}

func (rt *RequestTracker) ServeHTTP(response http.ResponseWriter, request *http.Request) {
// this function is called only after we've fetched all the headers. on some malicious clients, this could get delayed, so we can't rely on the
// tcp-connection established time to align with current time.
rateLimitingWindowStartTime := time.Now().Add(-time.Duration(rt.config.ConnectionsRateLimitingWindowSeconds) * time.Second)
// Check if the number of connections exceeds the limit
rt.httpConnectionsMu.Lock()
acceptedConnections := len(rt.acceptedConnections)
rt.httpConnectionsMu.Unlock()

healthServiceInvocation := request.URL.Path == HealthServiceStatusPath

// get the connection local address. Note that it's the interface of a immutable object, so it will be unique and matching the original connection interface.
localAddr := request.Context().Value(http.LocalAddrContextKey).(net.Addr)

if acceptedConnections > rt.config.IncomingConnectionsLimit && !healthServiceInvocation {
// If the limit is exceeded, reject the connection
networkConnectionsDroppedTotal.Inc(map[string]string{"reason": "rt_incoming_connection_limit"})
rt.log.EventWithDetails(telemetryspec.Network, telemetryspec.ConnectPeerFailEvent,
telemetryspec.ConnectPeerFailEventDetails{
Address: localAddr.String(), Incoming: true, Reason: "RequestTracker Connection Limit"})
response.WriteHeader(http.StatusServiceUnavailable)
return
}

// this function is called only after we've fetched all the headers. on some malicious clients, this could get delayed, so we can't rely on the
// tcp-connection established time to align with current time.
rateLimitingWindowStartTime := time.Now().Add(-time.Duration(rt.config.ConnectionsRateLimitingWindowSeconds) * time.Second)

rt.hostRequestsMu.Lock()
trackedRequest := rt.acceptedConnections[localAddr]
if trackedRequest != nil {
Expand Down Expand Up @@ -550,7 +567,6 @@ func (rt *RequestTracker) ServeHTTP(response http.ResponseWriter, request *http.

// send the request downstream; in our case, it would go to the router.
rt.downstreamHandler.ServeHTTP(response, request)

}

// remoteHostProxyFix updates the origin IP address in the trackedRequest
Expand Down
10 changes: 9 additions & 1 deletion network/wsNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ const testingPublicAddress = "testing"
// Maximum number of bytes to read from a header when trying to establish a websocket connection.
const wsMaxHeaderBytes = 4096

// ReservedHealthServiceConnections reserves additional connections for the health check endpoint. This reserves
// capacity to query the health check service when a node is serving maximum peers. The file descriptors will be
// used from the ReservedFDs pool, as this pool is meant for short-lived usage (dns queries, disk i/o, etc.)
const ReservedHealthServiceConnections = 10

var networkIncomingConnections = metrics.MakeGauge(metrics.NetworkIncomingConnections)
var networkOutgoingConnections = metrics.MakeGauge(metrics.NetworkOutgoingConnections)

Expand Down Expand Up @@ -151,6 +156,9 @@ const peerShutdownDisconnectionAckDuration = 50 * time.Millisecond
// Contains {genesisID} param to be handled by gorilla/mux
const GossipNetworkPath = "/v1/{genesisID}/gossip"

// HealthServiceStatusPath is the path to register HealthService as a handler for when using gorilla/mux
const HealthServiceStatusPath = "/status"

// NodeInfo helps the network get information about the node it is running on
type NodeInfo interface {
// IsParticipating returns true if this node has stake and may vote on blocks or propose blocks.
Expand Down Expand Up @@ -684,7 +692,7 @@ func (wn *WebsocketNetwork) Start() {
}
// wrap the original listener with a limited connection listener
listener = limitlistener.RejectingLimitListener(
listener, uint64(wn.config.IncomingConnectionsLimit), wn.log)
listener, uint64(wn.config.IncomingConnectionsLimit)+ReservedHealthServiceConnections, wn.log)
// wrap the limited connection listener with a requests tracker listener
wn.listener = wn.requestsTracker.Listener(listener)
wn.log.Debugf("listening on %s", wn.listener.Addr().String())
Expand Down
8 changes: 3 additions & 5 deletions rpcs/healthService.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@
package rpcs

import (
"github.com/algorand/go-algorand/network"
"net/http"
)

// HealthServiceStatusPath is the path to register HealthService as a handler for when using gorilla/mux
const HealthServiceStatusPath = "/status"
"github.com/algorand/go-algorand/network"
)

// HealthService is a service that provides health information endpoints for the node
type HealthService struct{}
Expand All @@ -31,7 +29,7 @@ type HealthService struct{}
func MakeHealthService(net network.GossipNode) HealthService {
service := HealthService{}

net.RegisterHTTPHandler(HealthServiceStatusPath, service)
net.RegisterHTTPHandler(network.HealthServiceStatusPath, service)

return service
}
Expand Down
9 changes: 5 additions & 4 deletions rpcs/healthService_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
package rpcs

import (
"github.com/algorand/go-algorand/network"
"github.com/algorand/go-algorand/test/partitiontest"
"github.com/stretchr/testify/require"
"io"
"net/http"
"path"
"testing"

"github.com/algorand/go-algorand/network"
"github.com/algorand/go-algorand/test/partitiontest"
"github.com/stretchr/testify/require"
)

func TestHealthService_ServeHTTP(t *testing.T) {
Expand All @@ -40,7 +41,7 @@ func TestHealthService_ServeHTTP(t *testing.T) {

client := http.Client{}

parsedURL.Path = path.Join(parsedURL.Path, HealthServiceStatusPath)
parsedURL.Path = path.Join(parsedURL.Path, network.HealthServiceStatusPath)

response, err := client.Get(parsedURL.String())
require.NoError(t, err)
Expand Down
Loading