Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(dcutr): Fix end to end tests and add legacy behavior flag (default=true) #3044

Merged
merged 33 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
0c0547b
fix: dcutr: fix roles in tcp sim-open
MarcoPolo Nov 15, 2024
603b8dd
Skip end-to-end test for now
MarcoPolo Nov 15, 2024
b8997e6
wip add legacyBehavior bool
MarcoPolo Jan 21, 2025
48d3ce2
Add simconn
MarcoPolo Jan 21, 2025
8321f2b
feat(quicreuse): Support custom listenUDP methods
MarcoPolo Dec 23, 2024
a54163a
allow overrides for quicreuse methods
MarcoPolo Feb 3, 2025
22b9a3b
better logs
MarcoPolo Feb 3, 2025
c598fb7
firewall router should stringify addrs
MarcoPolo Feb 3, 2025
ec3ba1a
add firewall router
MarcoPolo Feb 3, 2025
a135d96
fix unset router nodes
MarcoPolo Feb 4, 2025
592964c
wip
MarcoPolo Feb 4, 2025
672a55a
cleanup e2e test
MarcoPolo Feb 4, 2025
dd71c11
Always emit initial reachability state
MarcoPolo Feb 4, 2025
4f91aae
Remove ForceReachability
MarcoPolo Feb 4, 2025
f0ff150
another log
MarcoPolo Feb 5, 2025
e1b84b9
fixup tests
MarcoPolo Feb 5, 2025
36c2699
Add flag for legacy behavior
MarcoPolo Feb 5, 2025
7cefb30
remove unused code
MarcoPolo Feb 5, 2025
2b3c29a
explicit reachability
MarcoPolo Feb 5, 2025
12565b4
Revert "Always emit initial reachability state"
MarcoPolo Feb 5, 2025
7689110
rename
MarcoPolo Feb 5, 2025
773a8c0
fix race
MarcoPolo Feb 5, 2025
c002e90
fix race in simconn_test
MarcoPolo Feb 6, 2025
370b887
change default direct dial timeout
MarcoPolo Feb 25, 2025
19c0b80
remove override naming convention
MarcoPolo Feb 25, 2025
deafc2d
rename public to publiclyReachable
MarcoPolo Feb 25, 2025
c238b94
nits
MarcoPolo Feb 25, 2025
0ca7365
drop data if whole packet is not read
MarcoPolo Feb 25, 2025
951c076
latency on the write side
MarcoPolo Feb 25, 2025
aa899c7
add MarkPacketSentOut
MarcoPolo Feb 25, 2025
a0d03f4
remove unused port
MarcoPolo Feb 25, 2025
e0a57d7
remove deadline from SendPacket method
MarcoPolo Feb 25, 2025
f0705e4
Update tests
MarcoPolo Feb 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 179 additions & 0 deletions p2p/net/simconn/router.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
package simconn

import (
"errors"
"fmt"
"net"
"sync"
"time"
)

type PacketReciever interface {
RecvPacket(p Packet)
}

// PerfectRouter is a router that has no latency or jitter and can route to
// every node
type PerfectRouter struct {
mu sync.Mutex
nodes map[net.Addr]PacketReciever
}

// SendPacket implements Router.
func (r *PerfectRouter) SendPacket(p Packet) error {
r.mu.Lock()
defer r.mu.Unlock()
conn, ok := r.nodes[p.To]
if !ok {
return errors.New("unknown destination")
}

conn.RecvPacket(p)
return nil
}

func (r *PerfectRouter) AddNode(addr net.Addr, conn PacketReciever) {
r.mu.Lock()
defer r.mu.Unlock()
if r.nodes == nil {
r.nodes = make(map[net.Addr]PacketReciever)
}
r.nodes[addr] = conn
}

func (r *PerfectRouter) RemoveNode(addr net.Addr) {
delete(r.nodes, addr)
}

var _ Router = &PerfectRouter{}

type DelayedPacketReciever struct {
inner PacketReciever
delay time.Duration
}

func (r *DelayedPacketReciever) RecvPacket(p Packet) {
time.AfterFunc(r.delay, func() { r.inner.RecvPacket(p) })
}

type FixedLatencyRouter struct {
PerfectRouter
latency time.Duration
}

func (r *FixedLatencyRouter) SendPacket(p Packet) error {
return r.PerfectRouter.SendPacket(p)
}

func (r *FixedLatencyRouter) AddNode(addr net.Addr, conn PacketReciever) {
r.PerfectRouter.AddNode(addr, &DelayedPacketReciever{
inner: conn,
delay: r.latency,
})
}

var _ Router = &FixedLatencyRouter{}

type simpleNodeFirewall struct {
mu sync.Mutex
publiclyReachable bool
packetsOutTo map[string]struct{}
node *SimConn
}

func (f *simpleNodeFirewall) MarkPacketSentOut(p Packet) {
f.mu.Lock()
defer f.mu.Unlock()
if f.packetsOutTo == nil {
f.packetsOutTo = make(map[string]struct{})
}
f.packetsOutTo[p.To.String()] = struct{}{}
}

func (f *simpleNodeFirewall) IsPacketInAllowed(p Packet) bool {
f.mu.Lock()
defer f.mu.Unlock()
if f.publiclyReachable {
return true
}

_, ok := f.packetsOutTo[p.From.String()]
return ok
}

func (f *simpleNodeFirewall) String() string {
return fmt.Sprintf("public: %v, packetsOutTo: %v", f.publiclyReachable, f.packetsOutTo)
}

type SimpleFirewallRouter struct {
mu sync.Mutex
nodes map[string]*simpleNodeFirewall
}

func (r *SimpleFirewallRouter) String() string {
r.mu.Lock()
defer r.mu.Unlock()
nodes := make([]string, 0, len(r.nodes))
for _, node := range r.nodes {
nodes = append(nodes, node.String())
}
return fmt.Sprintf("%v", nodes)
}

func (r *SimpleFirewallRouter) SendPacket(p Packet) error {
r.mu.Lock()
defer r.mu.Unlock()
toNode, exists := r.nodes[p.To.String()]
if !exists {
return errors.New("unknown destination")
}

// Record that this node is sending a packet to the destination
fromNode, exists := r.nodes[p.From.String()]
if !exists {
return errors.New("unknown source")
}
fromNode.MarkPacketSentOut(p)

if !toNode.IsPacketInAllowed(p) {
return nil // Silently drop blocked packets
}

toNode.node.RecvPacket(p)
return nil
}

func (r *SimpleFirewallRouter) AddNode(addr net.Addr, conn *SimConn) {
r.mu.Lock()
defer r.mu.Unlock()
if r.nodes == nil {
r.nodes = make(map[string]*simpleNodeFirewall)
}
r.nodes[addr.String()] = &simpleNodeFirewall{
packetsOutTo: make(map[string]struct{}),
node: conn,
}
}

func (r *SimpleFirewallRouter) AddPubliclyReachableNode(addr net.Addr, conn *SimConn) {
r.mu.Lock()
defer r.mu.Unlock()
if r.nodes == nil {
r.nodes = make(map[string]*simpleNodeFirewall)
}
r.nodes[addr.String()] = &simpleNodeFirewall{
publiclyReachable: true,
node: conn,
}
}

func (r *SimpleFirewallRouter) RemoveNode(addr net.Addr) {
r.mu.Lock()
defer r.mu.Unlock()
if r.nodes == nil {
return
}
delete(r.nodes, addr.String())
}

var _ Router = &SimpleFirewallRouter{}
218 changes: 218 additions & 0 deletions p2p/net/simconn/simconn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
package simconn

import (
"errors"
"net"
"slices"
"sync"
"sync/atomic"
"time"
)

var ErrDeadlineExceeded = errors.New("deadline exceeded")

type Router interface {
SendPacket(p Packet) error
}

type Packet struct {
To net.Addr
From net.Addr
buf []byte
}

type SimConn struct {
mu sync.Mutex
closed bool
closedChan chan struct{}

packetsSent atomic.Uint64
packetsRcvd atomic.Uint64
bytesSent atomic.Int64
bytesRcvd atomic.Int64

router Router

myAddr *net.UDPAddr
myLocalAddr net.Addr
packetsToRead chan Packet

readDeadline time.Time
writeDeadline time.Time
}

// NewSimConn creates a new simulated connection with the specified parameters
func NewSimConn(addr *net.UDPAddr, rtr Router) *SimConn {
return &SimConn{
router: rtr,
myAddr: addr,
packetsToRead: make(chan Packet, 512), // buffered channel to prevent blocking
closedChan: make(chan struct{}),
}
}

type ConnStats struct {
BytesSent int
BytesRcvd int
PacketsSent int
PacketsRcvd int
}

func (c *SimConn) Stats() ConnStats {
return ConnStats{
BytesSent: int(c.bytesSent.Load()),
BytesRcvd: int(c.bytesRcvd.Load()),
PacketsSent: int(c.packetsSent.Load()),
PacketsRcvd: int(c.packetsRcvd.Load()),
}
}

// SetLocalAddr only changes what `.LocalAddr()` returns.
// Packets will still come From the initially configured addr.
func (c *SimConn) SetLocalAddr(addr net.Addr) {
c.myLocalAddr = addr
}

func (c *SimConn) RecvPacket(p Packet) {
c.mu.Lock()
if c.closed {
c.mu.Unlock()
return
}
c.mu.Unlock()
c.packetsRcvd.Add(1)
c.bytesRcvd.Add(int64(len(p.buf)))

select {
case c.packetsToRead <- p:
default:
// drop the packet if the channel is full
}
}

var _ net.PacketConn = &SimConn{}

// Close implements net.PacketConn
func (c *SimConn) Close() error {
c.mu.Lock()
defer c.mu.Unlock()
if c.closed {
return nil
}
c.closed = true
close(c.closedChan)
return nil
}

// ReadFrom implements net.PacketConn
func (c *SimConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
c.mu.Lock()
if c.closed {
c.mu.Unlock()
return 0, nil, net.ErrClosed
}
deadline := c.readDeadline
c.mu.Unlock()

if !deadline.IsZero() && time.Now().After(deadline) {
return 0, nil, ErrDeadlineExceeded
}

var pkt Packet
if !deadline.IsZero() {
select {
case pkt = <-c.packetsToRead:
case <-time.After(time.Until(deadline)):
return 0, nil, ErrDeadlineExceeded
}
} else {
pkt = <-c.packetsToRead
}

n = copy(p, pkt.buf)
// if the provided buffer is not enough to read the whole packet, we drop
// the rest of the data. this is similar to what `recvfrom` does on Linux
// and macOS.
return n, pkt.From, nil
}

// WriteTo implements net.PacketConn
func (c *SimConn) WriteTo(p []byte, addr net.Addr) (n int, err error) {
c.mu.Lock()
if c.closed {
c.mu.Unlock()
return 0, net.ErrClosed
}
deadline := c.writeDeadline
c.mu.Unlock()

if !deadline.IsZero() && time.Now().After(deadline) {
return 0, ErrDeadlineExceeded
}

c.packetsSent.Add(1)
c.bytesSent.Add(int64(len(p)))

pkt := Packet{
From: c.myAddr,
To: addr,
buf: slices.Clone(p),
}
return len(p), c.router.SendPacket(pkt)
}

func (c *SimConn) UnicastAddr() net.Addr {
return c.myAddr
}

// LocalAddr implements net.PacketConn
func (c *SimConn) LocalAddr() net.Addr {
if c.myLocalAddr != nil {
return c.myLocalAddr
}
return c.myAddr
}

// SetDeadline implements net.PacketConn
func (c *SimConn) SetDeadline(t time.Time) error {
c.mu.Lock()
defer c.mu.Unlock()
c.readDeadline = t
c.writeDeadline = t
return nil
}

// SetReadDeadline implements net.PacketConn
func (c *SimConn) SetReadDeadline(t time.Time) error {
c.mu.Lock()
defer c.mu.Unlock()
c.readDeadline = t
return nil
}

// SetWriteDeadline implements net.PacketConn
func (c *SimConn) SetWriteDeadline(t time.Time) error {
c.mu.Lock()
defer c.mu.Unlock()
c.writeDeadline = t
return nil
}

func IntToPublicIPv4(n int) net.IP {
n += 1
// Avoid private IP ranges
b := make([]byte, 4)
b[0] = byte((n>>24)&0xFF | 1)
b[1] = byte((n >> 16) & 0xFF)
b[2] = byte((n >> 8) & 0xFF)
b[3] = byte(n & 0xFF)

ip := net.IPv4(b[0], b[1], b[2], b[3])

// Check and modify if it's in private ranges
if ip.IsPrivate() {
b[0] = 1 // Use 1.x.x.x as public range
}

return ip
}
Loading
Loading