Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcoPolo committed Feb 3, 2025
1 parent 3a951e1 commit cc7e359
Show file tree
Hide file tree
Showing 9 changed files with 174 additions and 46 deletions.
26 changes: 20 additions & 6 deletions p2p/net/simconn/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package simconn

import (
"errors"
"fmt"
"net"
"sync"
"time"
Expand Down Expand Up @@ -73,21 +74,28 @@ func (f *simpleNodeFirewall) IsPacketInAllowed(p Packet) bool {
}

type SimpleFirewallRouter struct {
nodes map[net.Addr]*simpleNodeFirewall
nodes map[string]*simpleNodeFirewall
}

func (r *SimpleFirewallRouter) String() string {
return fmt.Sprintf("%+v", r.nodes)
}

func (r *SimpleFirewallRouter) SendPacket(deadline time.Time, p Packet) error {
toNode, exists := r.nodes[p.To]
toNode, exists := r.nodes[p.To.String()]
if !exists {
return errors.New("unknown destination")
}

// Record that this node is sending a packet to the destination
fromNode, exists := r.nodes[p.From]
fromNode, exists := r.nodes[p.From.String()]
if !exists {
return errors.New("unknown source")
}
fromNode.mu.Lock()
if fromNode.packetsOutTo == nil {
fromNode.packetsOutTo = make(map[string]struct{})
}
fromNode.packetsOutTo[p.To.String()] = struct{}{}
fromNode.mu.Unlock()

Expand All @@ -100,21 +108,27 @@ func (r *SimpleFirewallRouter) SendPacket(deadline time.Time, p Packet) error {
}

func (r *SimpleFirewallRouter) AddNode(addr net.Addr, conn *SimConn) {
r.nodes[addr] = &simpleNodeFirewall{
if r.nodes == nil {
r.nodes = make(map[string]*simpleNodeFirewall)
}
r.nodes[addr.String()] = &simpleNodeFirewall{
packetsOutTo: make(map[string]struct{}),
node: conn,
}
}

func (r *SimpleFirewallRouter) AddPublicNode(addr net.Addr, conn *SimConn) {
r.nodes[addr] = &simpleNodeFirewall{
r.nodes[addr.String()] = &simpleNodeFirewall{
public: true,
node: conn,
}
}

func (r *SimpleFirewallRouter) RemoveNode(addr net.Addr) {
delete(r.nodes, addr)
if r.nodes == nil {
return
}
delete(r.nodes, addr.String())
}

var _ Router = &SimpleFirewallRouter{}
2 changes: 1 addition & 1 deletion p2p/net/simconn/simconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func TestSimConnDeadlinesWithLatency(t *testing.T) {

func TestSimpleHolePunch(t *testing.T) {
router := &SimpleFirewallRouter{
nodes: make(map[net.Addr]*simpleNodeFirewall),
nodes: make(map[string]*simpleNodeFirewall),
}

// Create two peers
Expand Down
122 changes: 96 additions & 26 deletions p2p/protocol/holepunch/holepunch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package holepunch_test

import (
"context"
"fmt"
"net"
"slices"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -14,12 +15,14 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/p2p/net/simconn"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto"
relayv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
"github.com/libp2p/go-libp2p/p2p/protocol/holepunch"
holepunch_pb "github.com/libp2p/go-libp2p/p2p/protocol/holepunch/pb"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
"github.com/libp2p/go-libp2p/p2p/transport/quicreuse"

"github.com/libp2p/go-msgio/pbio"
ma "github.com/multiformats/go-multiaddr"
Expand Down Expand Up @@ -130,15 +133,34 @@ func TestDirectDialWorks(t *testing.T) {
}

func TestEndToEndSimConnect(t *testing.T) {
t.Skip("This test is broken. It is hard to do an end-to-end test without mocking the separate networks that holepunching is supposed to connect. It only worked previously because one of the hosts was able to learn about a non-holepunched direct connection via identify.")

h1tr := &mockEventTracer{}
h2tr := &mockEventTracer{}
h1, h2, relay, _ := makeRelayedHosts(t, []holepunch.Option{holepunch.WithTracer(h1tr)}, []holepunch.Option{holepunch.WithTracer(h2tr)}, true)
defer h1.Close()
defer h2.Close()
defer relay.Close()

// time.Sleep(100 * time.Millisecond)
p1 := ping.NewPingService(h1)
require.NoError(t, h1.Connect(context.Background(), peer.AddrInfo{
ID: relay.ID(),
Addrs: relay.Addrs(),
}))
// var raddr ma.Multiaddr
// for _, a := range h2.Addrs() {
// if _, err := a.ValueForProtocol(ma.P_CIRCUIT); err == nil {
// raddr = a
// break
// }
// }
// require.NoError(t, h1.Connect(context.Background(), peer.AddrInfo{
// ID: h2.ID(),
// Addrs: []ma.Multiaddr{raddr},
// }))
res := p1.Ping(network.WithAllowLimitedConn(context.Background(), "test"), h2.ID())
result := <-res
require.NoError(t, result.Error)

// wait till a direct connection is complete
ensureDirectConn(t, h1, h2)
// ensure no hole-punching streams are open on either side
Expand All @@ -147,14 +169,15 @@ func TestEndToEndSimConnect(t *testing.T) {
require.Eventually(t,
func() bool {
h2Events = h2tr.getEvents()
return len(h2Events) == 3
return len(h2Events) == 4
},
time.Second,
10*time.Millisecond,
100*time.Millisecond,
)
require.Equal(t, holepunch.StartHolePunchEvtT, h2Events[0].Type)
require.Equal(t, holepunch.HolePunchAttemptEvtT, h2Events[1].Type)
require.Equal(t, holepunch.EndHolePunchEvtT, h2Events[2].Type)
require.Equal(t, holepunch.DirectDialEvtT, h2Events[0].Type)
require.Equal(t, holepunch.StartHolePunchEvtT, h2Events[1].Type)
require.Equal(t, holepunch.HolePunchAttemptEvtT, h2Events[2].Type)
require.Equal(t, holepunch.EndHolePunchEvtT, h2Events[3].Type)

h1Events := h1tr.getEvents()
// We don't really expect a hole-punched connection to be established in this test,
Expand Down Expand Up @@ -230,7 +253,7 @@ func TestFailuresOnInitiator(t *testing.T) {
opts = append(opts, holepunch.WithAddrFilter(f))
}

hps := addHolePunchService(t, h2, opts...)
hps := addHolePunchService(t, h2, nil, opts...)
// wait until the hole punching protocol has actually started
require.Eventually(t, func() bool {
protos, _ := h2.Peerstore().SupportsProtocols(h1.ID(), holepunch.Protocol)
Expand Down Expand Up @@ -409,7 +432,7 @@ func ensureDirectConn(t *testing.T, h1, h2 host.Host) {
}, 5*time.Second, 50*time.Millisecond)
}

func mkHostWithStaticAutoRelay(t *testing.T, relay host.Host) host.Host {
func mkHostWithStaticAutoRelay(t *testing.T, ipAddr string, port int, relay host.Host, router *simconn.SimpleFirewallRouter) host.Host {
if race.WithRace() {
t.Skip("modifying manet.Private4 is racy")
}
Expand All @@ -418,16 +441,13 @@ func mkHostWithStaticAutoRelay(t *testing.T, relay host.Host) host.Host {
Addrs: relay.Addrs(),
}

cpy := manet.Private4
manet.Private4 = []*net.IPNet{}
defer func() { manet.Private4 = cpy }()

h, err := libp2p.New(
libp2p.ListenAddrs(ma.StringCast("/ip4/127.0.0.1/tcp/0")),
libp2p.ListenAddrs(ma.StringCast(fmt.Sprintf("/ip4/%s/udp/%d/quic-v1", ipAddr, port))),
libp2p.EnableRelay(),
libp2p.EnableAutoRelayWithStaticRelays([]peer.AddrInfo{pi}),
libp2p.ForceReachabilityPrivate(),
libp2p.ResourceManager(&network.NullResourceManager{}),
quicReuseOpts(false, router),
)
require.NoError(t, err)

Expand All @@ -443,23 +463,58 @@ func mkHostWithStaticAutoRelay(t *testing.T, relay host.Host) host.Host {
return h
}

var lastPort atomic.Uint32

type MockSourceIPSelector struct {
ip atomic.Pointer[net.IP]
}

func (m *MockSourceIPSelector) PreferredSourceIPForDestination(dst *net.UDPAddr) (net.IP, error) {
return *m.ip.Load(), nil
}

func quicReuseOpts(isPublic bool, router *simconn.SimpleFirewallRouter) libp2p.Option {
m := &MockSourceIPSelector{}
return libp2p.QUICReuse(
quicreuse.NewConnManager,
quicreuse.CustomSourceIPSelector(func() (quicreuse.SourceIPSelector, error) {
return m, nil
}),
quicreuse.CustomListenUDP(func(network string, address *net.UDPAddr) (net.PacketConn, error) {
m.ip.Store(&address.IP)
if address.Port == 0 {
address.Port = int(lastPort.Add(1))
}
c := simconn.NewSimConn(address, router)
if isPublic {
router.AddPublicNode(address, c)
} else {
router.AddNode(address, c)
}
return c, nil
}))
}

func makeRelayedHosts(t *testing.T, h1opt, h2opt []holepunch.Option, addHolePuncher bool) (h1, h2, relay host.Host, hps *holepunch.Service) {
t.Helper()
h1, _ = mkHostWithHolePunchSvc(t, h1opt...)
router := &simconn.SimpleFirewallRouter{}
h1, _ = mkHostWithHolePunchSvc2(t, "2.0.0.1", 8001, router, h1opt...)
var err error

relay, err = libp2p.New(
libp2p.ListenAddrs(ma.StringCast("/ip4/127.0.0.1/tcp/0")),
libp2p.ListenAddrs(ma.StringCast("/ip4/1.2.0.1/udp/8000/quic-v1")),
libp2p.DisableRelay(),
libp2p.ResourceManager(&network.NullResourceManager{}),
quicReuseOpts(true, router),
)
require.NoError(t, err)
_, err = relayv2.New(relay)
require.NoError(t, err)

// make sure the relay service is started and advertised by Identify
h, err := libp2p.New(
libp2p.NoListenAddrs,
libp2p.Transport(tcp.NewTCPTransport),
libp2p.ListenAddrs(ma.StringCast("/ip4/1.2.0.2/udp/8000/quic-v1")),
quicReuseOpts(false, router),
libp2p.DisableRelay(),
)
require.NoError(t, err)
Expand All @@ -470,9 +525,9 @@ func makeRelayedHosts(t *testing.T, h1opt, h2opt []holepunch.Option, addHolePunc
return err == nil && len(supported) > 0
}, 3*time.Second, 100*time.Millisecond)

h2 = mkHostWithStaticAutoRelay(t, relay)
h2 = mkHostWithStaticAutoRelay(t, "2.0.0.2", 8002, relay, router)
if addHolePuncher {
hps = addHolePunchService(t, h2, h2opt...)
hps = addHolePunchService(t, h2, []ma.Multiaddr{ma.StringCast("/ip4/2.0.0.2/udp/8002/quic-v1")}, h2opt...)
}

// h2 has a relay addr
Expand All @@ -492,12 +547,14 @@ func makeRelayedHosts(t *testing.T, h1opt, h2opt []holepunch.Option, addHolePunc
return
}

func addHolePunchService(t *testing.T, h host.Host, opts ...holepunch.Option) *holepunch.Service {
func addHolePunchService(t *testing.T, h host.Host, extraAddrs []ma.Multiaddr, opts ...holepunch.Option) *holepunch.Service {
t.Helper()
hps, err := holepunch.NewService(h, newMockIDService(t, h), func() []ma.Multiaddr {
addrs := h.Addrs()
addrs = slices.DeleteFunc(addrs, func(a ma.Multiaddr) bool { return !manet.IsPublicAddr(a) })
return append(addrs, ma.StringCast("/ip4/1.2.3.4/tcp/1234"))
addrs = append(addrs, extraAddrs...)
return addrs
// addrs = slices.DeleteFunc(addrs, func(a ma.Multiaddr) bool { return !manet.IsPublicAddr(a) })
// return append(addrs, ma.StringCast("/ip4/1.2.3.4/tcp/1234"))
}, opts...)
require.NoError(t, err)
return hps
Expand All @@ -511,6 +568,19 @@ func mkHostWithHolePunchSvc(t *testing.T, opts ...holepunch.Option) (host.Host,
libp2p.ResourceManager(&network.NullResourceManager{}),
)
require.NoError(t, err)
hps := addHolePunchService(t, h, opts...)
hps := addHolePunchService(t, h, nil, opts...)
return h, hps
}

func mkHostWithHolePunchSvc2(t *testing.T, ipAddr string, port int, router *simconn.SimpleFirewallRouter, opts ...holepunch.Option) (host.Host, *holepunch.Service) {
t.Helper()
h, err := libp2p.New(
libp2p.ListenAddrs(ma.StringCast(fmt.Sprintf("/ip4/%s/udp/%d/quic-v1", ipAddr, port))),
libp2p.ForceReachabilityPrivate(),
libp2p.ResourceManager(&network.NullResourceManager{}),
quicReuseOpts(false, router),
)
require.NoError(t, err)
hps := addHolePunchService(t, h, nil, opts...)
return h, hps
}
7 changes: 6 additions & 1 deletion p2p/protocol/holepunch/holepuncher.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func (hp *holePuncher) beginDirectConnect(p peer.ID) error {
// It first attempts a direct dial (if we have a public address of that peer), and then
// coordinates a hole punch over the given relay connection.
func (hp *holePuncher) DirectConnect(p peer.ID) error {
log.Debugw("beginDirectConnect", "host", hp.host.ID(), "peer", p)
if err := hp.beginDirectConnect(p); err != nil {
return err
}
Expand All @@ -107,14 +108,17 @@ func (hp *holePuncher) DirectConnect(p peer.ID) error {
func (hp *holePuncher) directConnect(rp peer.ID) error {
// short-circuit check to see if we already have a direct connection
if getDirectConnection(hp.host, rp) != nil {
log.Debugw("already connected", "host", hp.host.ID(), "peer", rp)
return nil
}

log.Debugw("attempting direct dial", "host", hp.host.ID(), "peer", rp, "addrs", hp.host.Peerstore().Addrs(rp))
// short-circuit hole punching if a direct dial works.
// attempt a direct connection ONLY if we have a public address for the remote peer
for _, a := range hp.host.Peerstore().Addrs(rp) {
if !isRelayAddress(a) && manet.IsPublicAddr(a) {
forceDirectConnCtx := network.WithForceDirectDial(hp.ctx, "hole-punching")
dialCtx, cancel := context.WithTimeout(forceDirectConnCtx, dialTimeout)
dialCtx, cancel := context.WithTimeout(forceDirectConnCtx, 100*time.Millisecond)

tstart := time.Now()
// This dials *all* addresses, public and private, from the peerstore.
Expand Down Expand Up @@ -186,6 +190,7 @@ func (hp *holePuncher) initiateHolePunch(rp peer.ID) ([]ma.Multiaddr, []ma.Multi
return nil, nil, 0, fmt.Errorf("failed to open hole-punching stream: %w", err)
}
defer str.Close()
log.Debugf("initiateHolePunch: %s, %s", str.Conn().RemotePeer(), str.Conn().RemoteMultiaddr())

addr, obsAddr, rtt, err := hp.initiateHolePunchImpl(str)
if err != nil {
Expand Down
8 changes: 6 additions & 2 deletions p2p/protocol/holepunch/svc.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func NewService(h host.Host, ids identify.IDService, listenAddrs func() []ma.Mul
func (s *Service) waitForPublicAddr() {
defer s.refCount.Done()

log.Debug("waiting until we have at least one public address", "peer", s.host.ID())
log.Debugw("waiting until we have at least one public address", "peer", s.host.ID())

// TODO: We should have an event here that fires when identify discovers a new
// address.
Expand All @@ -114,7 +114,7 @@ func (s *Service) waitForPublicAddr() {
defer t.Stop()
for {
if len(s.listenAddrs()) > 0 {
log.Debug("Host now has a public address. Starting holepunch protocol.")
log.Debugf("Host %s now has a public address. Starting holepunch protocol.", s.host.ID())
s.host.SetStreamHandler(Protocol, s.handleNewStream)
break
}
Expand Down Expand Up @@ -185,12 +185,14 @@ func (s *Service) incomingHolePunch(str network.Stream) (rtt time.Duration, remo

str.SetDeadline(time.Now().Add(StreamTimeout))

log.Debugf("incomingHolePunch reading messsage")
if err := rd.ReadMsg(msg); err != nil {
return 0, nil, nil, fmt.Errorf("failed to read message from initiator: %w", err)
}
if t := msg.GetType(); t != pb.HolePunch_CONNECT {
return 0, nil, nil, fmt.Errorf("expected CONNECT message from initiator but got %d", t)
}
log.Debugf("incomingHolePunch read CONNECT message")

obsDial := removeRelayAddrs(addrsFromBytes(msg.ObsAddrs))
if s.filter != nil {
Expand Down Expand Up @@ -223,6 +225,8 @@ func (s *Service) incomingHolePunch(str network.Stream) (rtt time.Duration, remo
}

func (s *Service) handleNewStream(str network.Stream) {
log.Debugf("handleNewStream: %s, %s", str.Conn().RemotePeer(), str.Conn().RemoteMultiaddr())

// Check directionality of the underlying connection.
// Peer A receives an inbound connection from peer B.
// Peer A opens a new hole punch stream to peer B.
Expand Down
Loading

0 comments on commit cc7e359

Please sign in to comment.