Skip to content

Commit c80e630

Browse files
[Libp2p] Add Libp2p module (part 4) (#545)
## Description This is another of a series of PRs split out from #500. Here we add a new `modules.P2PModule` implementation which utilizes the `typesP2P.Network` implementation which was added in #540. It will be utilized together with the config changes introduced by #535 in forthcoming changes to the node and debug CLI. ## Issue #347 ## Type of change Please mark the relevant option(s): - [x] New feature, functionality or library - [ ] Bug fix - [ ] Code health or cleanup - [ ] Major breaking change - [ ] Documentation - [ ] Other <!-- add details here if it a different type of change --> ## List of changes - Added a new `modules.P2PModule` implementation to the `libp2p` module directory ## Testing - [x] `make develop_test` - [x] [LocalNet](https://github.com/pokt-network/pocket/blob/main/docs/development/README.md) w/ all of the steps outlined in the `README` ## Required Checklist - [x] I have performed a self-review of my own code - [x] I have commented my code, particularly in hard-to-understand areas - [ ] I have tested my changes using the available tooling - [x] I have updated the corresponding CHANGELOG ### If Applicable Checklist - [ ] I have updated the corresponding README(s); local and/or global - [ ] I have added tests that prove my fix is effective or that my feature works - [ ] I have added, or updated, [mermaid.js](https://mermaid-js.github.io) diagrams in the corresponding README(s) - [ ] I have added, or updated, documentation and [mermaid.js](https://mermaid-js.github.io) diagrams in `shared/docs/*` if I updated `shared/*`README(s)
1 parent 43d5c2b commit c80e630

File tree

5 files changed

+388
-1
lines changed

5 files changed

+388
-1
lines changed

libp2p/docs/CHANGELOG.md

+5-1
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
55

66
## [Unreleased]
77

8+
## [0.0.0.2] - 2023-03-03
9+
10+
- Added a new `modules.P2PModule` implementation to the `libp2p` module directory
11+
812
## [0.0.0.1] - 2023-03-03
913

1014
- Added a new `typesP2P.Network` implementation to the `libp2p` module directory
1115
- Added `PoktProtocolID` for use within the libp2p module or by public API consumers
1216

13-
## [0.0.0.0] - 2023-03-01
17+
## [0.0.0.0] - 2023-02-23
1418

1519
- prepare pocket repo new libp2p module
1620
- add pocket / libp2p identity helpers

libp2p/module.go

+374
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,374 @@
1+
/*
2+
TECHDEBT: This module currently imports types from the "legacy" P2P module.
3+
4+
Migration path:
5+
1. Redefine P2P concrete types in terms of interfaces
6+
- PeersManager (raintree/peersManager)
7+
- Peer (p2p/types/NetworkPeer)
8+
- AddrBook (p2p/types/AddrBook)
9+
- AddrBookMap (p2p/types/NetworkPeer)
10+
- rainTreeNetwork doesn't depend on any concrete p2p types
11+
2. Simplify libp2p module implementation
12+
- Transport likely reduces to nothing
13+
- Network interface can be simplified
14+
- Consider renaming network as it functions more like a "router"
15+
(NB: could be replaced in future iterations with a "raintree pubsub router")
16+
3. Remove "legacy" P2P module & rename libp2p module directory (possibly object names as well)
17+
- P2PModule interface can be simplified
18+
- Clean up TECHDEBT introduced in debug CLI and node startup
19+
*/
20+
package libp2p
21+
22+
import (
23+
"context"
24+
"fmt"
25+
"io"
26+
"time"
27+
28+
"github.com/libp2p/go-libp2p"
29+
pubsub "github.com/libp2p/go-libp2p-pubsub"
30+
"github.com/libp2p/go-libp2p/core/host"
31+
libp2pNetwork "github.com/libp2p/go-libp2p/core/network"
32+
"github.com/multiformats/go-multiaddr"
33+
"google.golang.org/protobuf/proto"
34+
"google.golang.org/protobuf/types/known/anypb"
35+
36+
"github.com/pokt-network/pocket/libp2p/network"
37+
"github.com/pokt-network/pocket/libp2p/protocol"
38+
"github.com/pokt-network/pocket/logger"
39+
typesP2P "github.com/pokt-network/pocket/p2p/types"
40+
"github.com/pokt-network/pocket/runtime/configs"
41+
"github.com/pokt-network/pocket/runtime/configs/types"
42+
"github.com/pokt-network/pocket/shared/crypto"
43+
"github.com/pokt-network/pocket/shared/messaging"
44+
"github.com/pokt-network/pocket/shared/modules"
45+
"github.com/pokt-network/pocket/shared/modules/base_modules"
46+
)
47+
48+
var _ modules.P2PModule = &libp2pModule{}
49+
50+
type libp2pModule struct {
51+
base_modules.IntegratableModule
52+
53+
logger *modules.Logger
54+
cfg *configs.P2PConfig
55+
identity libp2p.Option
56+
listenAddrs libp2p.Option
57+
// host encapsulates libp2p peerstore & connection manager
58+
host host.Host
59+
// pubsub is used for broadcast communication
60+
// (i.e. multiple, unidentified receivers)
61+
pubsub *pubsub.PubSub
62+
// topic similar to pubsub but received messages are filtered by a "topic" string.
63+
// Published messages are also given the respective topic before broadcast.
64+
topic *pubsub.Topic
65+
// subscription provides an interface to continuously read messages from.
66+
subscription *pubsub.Subscription
67+
network typesP2P.Network
68+
}
69+
70+
var (
71+
// TECHDEBT: configure timeouts. Consider security exposure vs. real-world conditions).
72+
// TECHDEBT: parameterize and expose via config.
73+
// readStreamTimeout is the duration to wait for a read operation on a
74+
// stream to complete, after which the stream is closed ("timed out").
75+
readStreamTimeoutDuration = time.Second * 10
76+
)
77+
78+
func Create(bus modules.Bus, options ...modules.ModuleOption) (modules.Module, error) {
79+
return new(libp2pModule).Create(bus, options...)
80+
}
81+
82+
func (mod *libp2pModule) GetModuleName() string {
83+
return modules.P2PModuleName
84+
}
85+
86+
func (_ *libp2pModule) Create(bus modules.Bus, options ...modules.ModuleOption) (modules.Module, error) {
87+
logger.Global.Debug().Msg("Creating libp2p-backed network module")
88+
mod := &libp2pModule{
89+
cfg: bus.GetRuntimeMgr().GetConfig().P2P,
90+
logger: logger.Global.CreateLoggerForModule(modules.P2PModuleName),
91+
}
92+
93+
// MUST call before referencing mod.bus to ensure != nil.
94+
bus.RegisterModule(mod)
95+
96+
for _, option := range options {
97+
option(mod)
98+
}
99+
100+
// TECHDEBT: investigate any unnecessary
101+
// key exposure / duplication in memory
102+
privateKey, err := crypto.NewLibP2PPrivateKey(mod.cfg.PrivateKey)
103+
if err != nil {
104+
return nil, fmt.Errorf("loading private key: %w", err)
105+
}
106+
107+
mod.identity = libp2p.Identity(privateKey)
108+
109+
// INCOMPLETE: support RainTree network
110+
if mod.cfg.UseRainTree {
111+
return nil, fmt.Errorf("%s", "raintree is not yet compatible with libp2p")
112+
}
113+
114+
switch mod.cfg.ConnectionType {
115+
case types.ConnectionType_TCPConnection:
116+
addr, err := mod.getMultiaddr()
117+
if err != nil {
118+
return nil, fmt.Errorf("parsing multiaddr from config: %w", err)
119+
}
120+
mod.listenAddrs = libp2p.ListenAddrs(addr)
121+
case types.ConnectionType_EmptyConnection:
122+
mod.listenAddrs = libp2p.NoListenAddrs
123+
default:
124+
return nil, fmt.Errorf(
125+
// DISCUSS: rename to "transport protocol" instead.
126+
"unsupported connection type: %s: %w",
127+
mod.cfg.ConnectionType,
128+
err,
129+
)
130+
}
131+
132+
return mod, nil
133+
}
134+
135+
func (mod *libp2pModule) Start() error {
136+
// IMPROVE: receive context in interface methods.
137+
ctx := context.Background()
138+
139+
// TECHDEBT: metrics integration.
140+
var err error
141+
opts := []libp2p.Option{
142+
mod.identity,
143+
// INCOMPLETE(#544): add transport security!
144+
}
145+
146+
// Disable unused libp2p relay and ping services in client debug mode.
147+
// (see: https://pkg.go.dev/github.com/libp2p/go-libp2p#DisableRelay
148+
// and https://pkg.go.dev/github.com/libp2p/go-libp2p#Ping)
149+
if mod.isClientDebugMode() {
150+
opts = append(opts,
151+
libp2p.DisableRelay(),
152+
libp2p.Ping(false),
153+
libp2p.NoListenAddrs,
154+
)
155+
} else {
156+
opts = append(opts, mod.listenAddrs)
157+
}
158+
159+
// Represents a libp2p network node, `libp2p.New` configures
160+
// and starts listening according to options.
161+
// (see: https://pkg.go.dev/github.com/libp2p/go-libp2p#section-readme)
162+
mod.host, err = libp2p.New(opts...)
163+
if err != nil {
164+
return fmt.Errorf("unable to create libp2p host: %w", err)
165+
}
166+
167+
listenAddrLogEvent := mod.logger.Info()
168+
for i, addr := range host.InfoFromHost(mod.host).Addrs {
169+
listenAddrLogEvent.Str(fmt.Sprintf("listen_addr_%d", i), addr.String())
170+
}
171+
listenAddrLogEvent.Msg("Listening for incoming connections...")
172+
173+
// TECHDEBT: use RandomSub or GossipSub once we're on more stable ground.
174+
// IMPROVE: consider supporting multiple router types via config.
175+
mod.pubsub, err = pubsub.NewFloodSub(ctx, mod.host)
176+
if err != nil {
177+
return fmt.Errorf("unable to create pubsub: %w", err)
178+
}
179+
180+
// Topic is used to `#Publish` messages.
181+
mod.topic, err = mod.pubsub.Join(protocol.DefaultTopicStr)
182+
if err != nil {
183+
return fmt.Errorf("unable to join pubsub topic: %w", err)
184+
}
185+
186+
// Subscription is notified when a new message is received on the topic.
187+
mod.subscription, err = mod.topic.Subscribe()
188+
if err != nil {
189+
return fmt.Errorf("subscribing to pubsub topic: %w", err)
190+
}
191+
192+
mod.network, err = network.NewLibp2pNetwork(mod.GetBus(), mod.logger, mod.host, mod.topic)
193+
if err != nil {
194+
return fmt.Errorf("creating network: %w", err)
195+
}
196+
197+
// Don't handle streams or read from the subscription in client debug mode.
198+
if !mod.isClientDebugMode() {
199+
mod.host.SetStreamHandler(protocol.PoktProtocolID, mod.handleStream)
200+
go mod.readFromSubscription(ctx)
201+
}
202+
return nil
203+
}
204+
205+
func (mod *libp2pModule) Stop() error {
206+
return mod.host.Close()
207+
}
208+
209+
func (mod *libp2pModule) Broadcast(msg *anypb.Any) error {
210+
c := &messaging.PocketEnvelope{
211+
Content: msg,
212+
}
213+
//TECHDEBT: use shared/codec for marshalling
214+
data, err := proto.MarshalOptions{Deterministic: true}.Marshal(c)
215+
if err != nil {
216+
return err
217+
}
218+
mod.logger.Info().Msg("broadcasting message to network")
219+
220+
return mod.network.NetworkBroadcast(data)
221+
}
222+
223+
func (mod *libp2pModule) Send(addr crypto.Address, msg *anypb.Any) error {
224+
c := &messaging.PocketEnvelope{
225+
Content: msg,
226+
}
227+
//TECHDEBT: use shared/codec for marshalling
228+
data, err := proto.MarshalOptions{Deterministic: true}.Marshal(c)
229+
if err != nil {
230+
return err
231+
}
232+
233+
return mod.network.NetworkSend(data, addr)
234+
}
235+
236+
func (mod *libp2pModule) GetAddress() (crypto.Address, error) {
237+
privateKey, err := crypto.NewPrivateKey(mod.cfg.PrivateKey)
238+
if err != nil {
239+
return nil, err
240+
}
241+
242+
return privateKey.Address(), nil
243+
}
244+
245+
// HandleEvent implements the respective `modules.Module` interface method.
246+
func (mod *libp2pModule) HandleEvent(msg *anypb.Any) error {
247+
return nil
248+
}
249+
250+
func (mod *libp2pModule) isClientDebugMode() bool {
251+
return mod.GetBus().GetRuntimeMgr().GetConfig().ClientDebugMode
252+
}
253+
254+
// handleStream is called each time a peer establishes a new stream with this
255+
// module's libp2p `host.Host`.
256+
func (mod *libp2pModule) handleStream(stream libp2pNetwork.Stream) {
257+
peer, err := network.PeerFromLibp2pStream(stream)
258+
if err != nil {
259+
mod.logger.Error().Err(err).
260+
Str("address", peer.Address.String()).
261+
Msg("parsing remote peer public key")
262+
263+
if err = stream.Close(); err != nil {
264+
mod.logger.Error().Err(err)
265+
}
266+
}
267+
268+
if err := mod.network.AddPeerToAddrBook(peer); err != nil {
269+
mod.logger.Error().Err(err).
270+
Str("address", peer.Address.String()).
271+
Msg("adding remote peer to address book")
272+
}
273+
274+
go mod.readStream(stream)
275+
}
276+
277+
// readStream is intended to be called in a goroutine. It continuously reads from
278+
// the given stream for handling at the network level. Used for handling "direct"
279+
// messages (i.e. one specific target node).
280+
func (mod *libp2pModule) readStream(stream libp2pNetwork.Stream) {
281+
closeStream := func() {
282+
if err := stream.Close(); err != nil {
283+
mod.logger.Error().Err(err)
284+
}
285+
}
286+
287+
// NB: time out if no data is sent to free resources.
288+
if err := stream.SetReadDeadline(newReadStreamDeadline()); err != nil {
289+
mod.logger.Error().Err(err).Msg("setting stream read deadline")
290+
// TODO: abort if we can't set a read deadline?
291+
}
292+
293+
data, err := io.ReadAll(stream)
294+
if err != nil {
295+
mod.logger.Error().Err(err).Msg("reading from stream")
296+
closeStream()
297+
// NB: abort this goroutine
298+
// TODO: signal this somewhere?
299+
return
300+
}
301+
defer closeStream()
302+
303+
mod.handleNetworkData(data)
304+
}
305+
306+
// readFromSubscription is intended to be called in a goroutine. It continuously
307+
// reads from the subscribed topic in preparation for handling at the network level.
308+
// Used for handling "broadcast" messages (i.e. no specific target node).
309+
func (mod *libp2pModule) readFromSubscription(ctx context.Context) {
310+
for {
311+
select {
312+
case <-ctx.Done():
313+
return
314+
default:
315+
msg, err := mod.subscription.Next(ctx)
316+
if err != nil {
317+
mod.logger.Error().Err(err).
318+
Bool("TODO", true).
319+
Msg("reading from subscription")
320+
}
321+
322+
// NB: ignore messages from self
323+
if msg.ReceivedFrom == mod.host.ID() {
324+
continue
325+
}
326+
327+
mod.handleNetworkData(msg.Data)
328+
}
329+
}
330+
}
331+
332+
func (mod *libp2pModule) handleNetworkData(data []byte) {
333+
appMsgData, err := mod.network.HandleNetworkData(data)
334+
if err != nil {
335+
mod.logger.Error().Err(err).Msg("handling network data")
336+
return
337+
}
338+
339+
// There was no error, but we don't need to forward this to the app-specific bus.
340+
// For example, the message has already been handled by the application.
341+
if appMsgData == nil {
342+
return
343+
}
344+
345+
networkMessage := messaging.PocketEnvelope{}
346+
if err := proto.Unmarshal(appMsgData, &networkMessage); err != nil {
347+
mod.logger.Error().Err(err).
348+
Bool("TODO", true).
349+
Msg("Error decoding network message")
350+
return
351+
}
352+
353+
event := messaging.PocketEnvelope{
354+
Content: networkMessage.Content,
355+
}
356+
357+
mod.GetBus().PublishEventToBus(&event)
358+
}
359+
360+
// getMultiaddr returns a multiaddr constructed from the `hostname` and `port`
361+
// in the P2P config which pas provided upon creation.
362+
func (mod *libp2pModule) getMultiaddr() (multiaddr.Multiaddr, error) {
363+
// TECHDEBT: as soon as we add support for multiple transports
364+
// (i.e. not just TCP), we'll need to do something else.
365+
return network.Libp2pMultiaddrFromServiceURL(fmt.Sprintf(
366+
"%s:%d", mod.cfg.Hostname, mod.cfg.Port,
367+
))
368+
}
369+
370+
// newReadStreamDeadline returns a future deadline
371+
// based on the read stream timeout duration.
372+
func newReadStreamDeadline() time.Time {
373+
return time.Now().Add(readStreamTimeoutDuration)
374+
}

0 commit comments

Comments
 (0)