Skip to content

Commit

Permalink
Merge pull request #427 from cannium/track-more-observed-address-info
Browse files Browse the repository at this point in the history
Track more info for observed addresses
  • Loading branch information
Stebalien authored Oct 1, 2018
2 parents 2787133 + 92ec4b2 commit 57ed88e
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 61 deletions.
3 changes: 1 addition & 2 deletions p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"io"
"time"

identify "github.com/libp2p/go-libp2p/p2p/protocol/identify"

logging "github.com/ipfs/go-log"
goprocess "github.com/jbenet/goprocess"
goprocessctx "github.com/jbenet/goprocess/context"
Expand All @@ -15,6 +13,7 @@ import (
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
protocol "github.com/libp2p/go-libp2p-protocol"
identify "github.com/libp2p/go-libp2p/p2p/protocol/identify"
ma "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns"
msmux "github.com/multiformats/go-multistream"
Expand Down
3 changes: 2 additions & 1 deletion p2p/protocol/identify/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,8 @@ func (ids *IDService) consumeObservedAddress(observed []byte, c inet.Conn) {

// ok! we have the observed version of one of our ListenAddresses!
log.Debugf("added own observed listen addr: %s --> %s", c.LocalMultiaddr(), maddr)
ids.observedAddrs.Add(maddr, c.RemoteMultiaddr())
ids.observedAddrs.Add(maddr, c.LocalMultiaddr(), c.RemoteMultiaddr(),
c.Stat().Direction)
}

func addrInAddrs(a ma.Multiaddr, as []ma.Multiaddr) bool {
Expand Down
88 changes: 54 additions & 34 deletions p2p/protocol/identify/obsaddr.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,34 @@ import (
"sync"
"time"

net "github.com/libp2p/go-libp2p-net"
pstore "github.com/libp2p/go-libp2p-peerstore"
ma "github.com/multiformats/go-multiaddr"
)

const ActivationThresh = 4

type observation struct {
seenTime time.Time
connDirection net.Direction
}

// ObservedAddr is an entry for an address reported by our peers.
// We only use addresses that:
// - have been observed at least 4 times in last 1h. (counter symmetric nats)
// - have been observed at least once recently (1h), because our position in the
// network, or network port mapppings, may have changed.
type ObservedAddr struct {
Addr ma.Multiaddr
SeenBy map[string]time.Time
LastSeen time.Time
Activated bool
Addr ma.Multiaddr
SeenBy map[string]observation // peer(observer) address -> observation info
LastSeen time.Time
}

func (oa *ObservedAddr) TryActivate(ttl time.Duration) bool {
func (oa *ObservedAddr) activated(ttl time.Duration) bool {
// cleanup SeenBy set
now := time.Now()
for k, t := range oa.SeenBy {
if now.Sub(t) > ttl*ActivationThresh {
for k, ob := range oa.SeenBy {
if now.Sub(ob.seenTime) > ttl*ActivationThresh {
delete(oa.SeenBy, k)
}
}
Expand All @@ -41,60 +46,75 @@ func (oa *ObservedAddr) TryActivate(ttl time.Duration) bool {
type ObservedAddrSet struct {
sync.Mutex // guards whole datastruct.

addrs map[string]*ObservedAddr
// local(internal) address -> list of observed(external) addresses
addrs map[string][]*ObservedAddr
ttl time.Duration
}

func (oas *ObservedAddrSet) Addrs() []ma.Multiaddr {
// Addrs return all activated observed addresses
func (oas *ObservedAddrSet) Addrs() (addrs []ma.Multiaddr) {
oas.Lock()
defer oas.Unlock()

// for zero-value.
if oas.addrs == nil {
if len(oas.addrs) == 0 {
return nil
}

now := time.Now()
addrs := make([]ma.Multiaddr, 0, len(oas.addrs))
for s, a := range oas.addrs {
// remove timed out addresses.
if now.Sub(a.LastSeen) > oas.ttl {
delete(oas.addrs, s)
continue
}

if a.Activated || a.TryActivate(oas.ttl) {
addrs = append(addrs, a.Addr)
for local, observedAddrs := range oas.addrs {
filteredAddrs := make([]*ObservedAddr, 0, len(observedAddrs))
for _, a := range observedAddrs {
// leave only alive observed addresses
if now.Sub(a.LastSeen) <= oas.ttl {
filteredAddrs = append(filteredAddrs, a)
if a.activated(oas.ttl) {
addrs = append(addrs, a.Addr)
}
}
}
oas.addrs[local] = filteredAddrs
}
return addrs
}

func (oas *ObservedAddrSet) Add(addr ma.Multiaddr, observer ma.Multiaddr) {
func (oas *ObservedAddrSet) Add(observed, local, observer ma.Multiaddr,
direction net.Direction) {

oas.Lock()
defer oas.Unlock()

// for zero-value.
if oas.addrs == nil {
oas.addrs = make(map[string]*ObservedAddr)
oas.addrs = make(map[string][]*ObservedAddr)
oas.ttl = pstore.OwnObservedAddrTTL
}

s := addr.String()
oa, found := oas.addrs[s]
now := time.Now()
observerString := observerGroup(observer)
localString := local.String()
ob := observation{
seenTime: now,
connDirection: direction,
}

// first time seeing address.
if !found {
oa = &ObservedAddr{
Addr: addr,
SeenBy: make(map[string]time.Time),
observedAddrs := oas.addrs[localString]
// check if observed address seen yet, if so, update it
for i, previousObserved := range observedAddrs {
if previousObserved.Addr.Equal(observed) {
observedAddrs[i].SeenBy[observerString] = ob
observedAddrs[i].LastSeen = now
return
}
oas.addrs[s] = oa
}

// mark the observer
oa.SeenBy[observerGroup(observer)] = time.Now()
oa.LastSeen = time.Now()
// observed address not seen yet, append it
oas.addrs[localString] = append(oas.addrs[localString], &ObservedAddr{
Addr: observed,
SeenBy: map[string]observation{
observerString: ob,
},
LastSeen: now,
})
}

// observerGroup is a function that determines what part of
Expand Down
56 changes: 32 additions & 24 deletions p2p/protocol/identify/obsaddr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"
"time"

net "github.com/libp2p/go-libp2p-net"
ma "github.com/multiformats/go-multiaddr"
)

Expand Down Expand Up @@ -49,55 +50,62 @@ func TestObsAddrSet(t *testing.T) {
b4 := m("/ip4/1.2.3.9/tcp/1237")
b5 := m("/ip4/1.2.3.10/tcp/1237")

oas := ObservedAddrSet{}
oas := &ObservedAddrSet{}

if !addrsMarch(oas.Addrs(), nil) {
t.Error("addrs should be empty")
}

oas.Add(a1, a4)
oas.Add(a2, a4)
oas.Add(a3, a4)
add := func(oas *ObservedAddrSet, observed, observer ma.Multiaddr) {
dummyLocal := m("/ip4/127.0.0.1/tcp/10086")
dummyDirection := net.DirOutbound

oas.Add(observed, dummyLocal, observer, dummyDirection)
}

add(oas, a1, a4)
add(oas, a2, a4)
add(oas, a3, a4)

// these are all different so we should not yet get them.
if !addrsMarch(oas.Addrs(), nil) {
t.Error("addrs should _still_ be empty (once)")
}

// same observer, so should not yet get them.
oas.Add(a1, a4)
oas.Add(a2, a4)
oas.Add(a3, a4)
add(oas, a1, a4)
add(oas, a2, a4)
add(oas, a3, a4)
if !addrsMarch(oas.Addrs(), nil) {
t.Error("addrs should _still_ be empty (same obs)")
}

// different observer, but same observer group.
oas.Add(a1, a5)
oas.Add(a2, a5)
oas.Add(a3, a5)
add(oas, a1, a5)
add(oas, a2, a5)
add(oas, a3, a5)
if !addrsMarch(oas.Addrs(), nil) {
t.Error("addrs should _still_ be empty (same obs group)")
}

oas.Add(a1, b1)
oas.Add(a1, b2)
oas.Add(a1, b3)
add(oas, a1, b1)
add(oas, a1, b2)
add(oas, a1, b3)
if !addrsMarch(oas.Addrs(), []ma.Multiaddr{a1}) {
t.Error("addrs should only have a1")
}

oas.Add(a2, a5)
oas.Add(a1, a5)
oas.Add(a1, a5)
oas.Add(a2, b1)
oas.Add(a1, b1)
oas.Add(a1, b1)
oas.Add(a2, b2)
oas.Add(a1, b2)
oas.Add(a1, b2)
oas.Add(a2, b4)
oas.Add(a2, b5)
add(oas, a2, a5)
add(oas, a1, a5)
add(oas, a1, a5)
add(oas, a2, b1)
add(oas, a1, b1)
add(oas, a1, b1)
add(oas, a2, b2)
add(oas, a1, b2)
add(oas, a1, b2)
add(oas, a2, b4)
add(oas, a2, b5)
if !addrsMarch(oas.Addrs(), []ma.Multiaddr{a1, a2}) {
t.Error("addrs should only have a1, a2")
}
Expand Down

0 comments on commit 57ed88e

Please sign in to comment.