Skip to content

Commit

Permalink
[USMON-1425] usm: Fix sketches object pool cleanup order (#34959)
Browse files Browse the repository at this point in the history
  • Loading branch information
guyarb authored Mar 11, 2025
1 parent 1f1e36c commit 7df95a6
Show file tree
Hide file tree
Showing 11 changed files with 577 additions and 150 deletions.
18 changes: 12 additions & 6 deletions cmd/system-probe/modules/network_tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,13 @@ func (nt *networkTracer) Register(httpMux *module.Router) error {
httpMux.HandleFunc("/connections", utils.WithConcurrencyLimit(utils.DefaultMaxConcurrentRequests, func(w http.ResponseWriter, req *http.Request) {
start := time.Now()
id := getClientID(req)
cs, err := nt.tracer.GetActiveConnections(id)
cs, cleanup, err := nt.tracer.GetActiveConnections(id)
if err != nil {
log.Errorf("unable to retrieve connections: %s", err)
w.WriteHeader(500)
return
}
defer cleanup()
contentType := req.Header.Get("Accept")
marshaler := marshal.GetMarshaler(contentType)
writeConnections(w, marshaler, cs)
Expand Down Expand Up @@ -157,12 +158,13 @@ func (nt *networkTracer) Register(httpMux *module.Router) error {
return
}
id := getClientID(req)
cs, err := nt.tracer.GetActiveConnections(id)
cs, cleanup, err := nt.tracer.GetActiveConnections(id)
if err != nil {
log.Errorf("unable to retrieve connections: %s", err)
w.WriteHeader(500)
return
}
defer cleanup()

utils.WriteAsJSON(w, httpdebugging.HTTP(cs.HTTP, cs.DNS))
})
Expand All @@ -173,12 +175,13 @@ func (nt *networkTracer) Register(httpMux *module.Router) error {
return
}
id := getClientID(req)
cs, err := nt.tracer.GetActiveConnections(id)
cs, cleanup, err := nt.tracer.GetActiveConnections(id)
if err != nil {
log.Errorf("unable to retrieve connections: %s", err)
w.WriteHeader(500)
return
}
defer cleanup()

utils.WriteAsJSON(w, kafkadebugging.Kafka(cs.Kafka))
})
Expand All @@ -189,12 +192,13 @@ func (nt *networkTracer) Register(httpMux *module.Router) error {
return
}
id := getClientID(req)
cs, err := nt.tracer.GetActiveConnections(id)
cs, cleanup, err := nt.tracer.GetActiveConnections(id)
if err != nil {
log.Errorf("unable to retrieve connections: %s", err)
w.WriteHeader(500)
return
}
defer cleanup()

utils.WriteAsJSON(w, postgresdebugging.Postgres(cs.Postgres))
})
Expand All @@ -205,12 +209,13 @@ func (nt *networkTracer) Register(httpMux *module.Router) error {
return
}
id := getClientID(req)
cs, err := nt.tracer.GetActiveConnections(id)
cs, cleanup, err := nt.tracer.GetActiveConnections(id)
if err != nil {
log.Errorf("unable to retrieve connections: %s", err)
w.WriteHeader(500)
return
}
defer cleanup()

utils.WriteAsJSON(w, redisdebugging.Redis(cs.Redis))
})
Expand All @@ -221,12 +226,13 @@ func (nt *networkTracer) Register(httpMux *module.Router) error {
return
}
id := getClientID(req)
cs, err := nt.tracer.GetActiveConnections(id)
cs, cleanup, err := nt.tracer.GetActiveConnections(id)
if err != nil {
log.Errorf("unable to retrieve connections: %s", err)
w.WriteHeader(500)
return
}
defer cleanup()

utils.WriteAsJSON(w, httpdebugging.HTTP(cs.HTTP2, cs.DNS))
})
Expand Down
3 changes: 2 additions & 1 deletion pkg/network/nettop/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,11 @@ func main() {

printConns := func(now time.Time) {
fmt.Printf("-- %s --\n", now)
cs, err := t.GetActiveConnections(fmt.Sprintf("%d", os.Getpid()))
cs, cleanup, err := t.GetActiveConnections(fmt.Sprintf("%d", os.Getpid()))
if err != nil {
fmt.Println(err)
}
defer cleanup()
for _, c := range cs.Conns {
fmt.Println(network.ConnectionSummary(&c, cs.DNS))
}
Expand Down
64 changes: 64 additions & 0 deletions pkg/network/protocols/http2/testutils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2025-present Datadog, Inc.

//go:build linux && test

package http2

import (
"context"
"io"
"net"
"net/http"
"testing"
"time"

"github.com/stretchr/testify/require"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"

"github.com/DataDog/datadog-agent/pkg/network/protocols/http/testutil"
)

// StartH2CServer starts a new HTTP/2 server with the given address and returns a function to stop it.
func StartH2CServer(t *testing.T, address string, isTLS bool) func() {
srv := &http.Server{
Addr: address,
Handler: h2c.NewHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
statusCode := testutil.StatusFromPath(r.URL.Path)
if statusCode == 0 {
w.WriteHeader(http.StatusOK)
} else {
w.WriteHeader(int(statusCode))
}
defer func() { _ = r.Body.Close() }()
_, _ = io.Copy(w, r.Body)
}), &http2.Server{}),
IdleTimeout: 2 * time.Second,
}

require.NoError(t, http2.ConfigureServer(srv, nil), "could not configure server")

l, err := net.Listen("tcp", address)
require.NoError(t, err, "could not listen")

if isTLS {
cert, key, err := testutil.GetCertsPaths()
require.NoError(t, err, "could not get certs paths")
go func() {
if err := srv.ServeTLS(l, cert, key); err != http.ErrServerClosed {
require.NoError(t, err, "could not serve TLS")
}
}()
} else {
go func() {
if err := srv.Serve(l); err != http.ErrServerClosed {
require.NoError(t, err, "could not serve")
}
}()
}

return func() { _ = srv.Shutdown(context.Background()) }
}
9 changes: 4 additions & 5 deletions pkg/network/tracer/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ func (t *Tracer) Stop() {
}

// GetActiveConnections returns the delta for connection info from the last time it was called with the same clientID
func (t *Tracer) GetActiveConnections(clientID string) (*network.Connections, error) {
func (t *Tracer) GetActiveConnections(clientID string) (*network.Connections, func(), error) {
t.bufferLock.Lock()
defer t.bufferLock.Unlock()
if log.ShouldLog(log.TraceLvl) {
Expand All @@ -430,11 +430,10 @@ func (t *Tracer) GetActiveConnections(clientID string) (*network.Connections, er
buffer := network.ClientPool.Get(clientID)
latestTime, active, err := t.getConnections(buffer.ConnectionBuffer)
if err != nil {
return nil, fmt.Errorf("error retrieving connections: %s", err)
return nil, nil, fmt.Errorf("error retrieving connections: %s", err)
}

usmStats, cleaners := t.usmMonitor.GetProtocolStats()
defer cleaners()
usmStats, cleanup := t.usmMonitor.GetProtocolStats()
delta := t.state.GetDelta(clientID, latestTime, active, t.reverseDNS.GetDNSStats(), usmStats)

ips := make(map[util.Address]struct{}, len(delta.Conns)/2)
Expand Down Expand Up @@ -469,7 +468,7 @@ func (t *Tracer) GetActiveConnections(clientID string) (*network.Connections, er
conns.PrebuiltAssets = netebpf.GetModulesInUse()
t.lastCheck.Store(time.Now().Unix())

return conns, nil
return conns, cleanup, nil
}

// RegisterClient registers a clientID with the tracer
Expand Down
Loading

0 comments on commit 7df95a6

Please sign in to comment.