Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

graph: move topology subscription to ChannelGraph #9577

Open
wants to merge 4 commits into
base: elle-graphCacheBase
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions autopilot/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/graph"
graphdb "github.com/lightningnetwork/lnd/graph/db"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
)
Expand Down Expand Up @@ -36,7 +36,7 @@ type ManagerCfg struct {

// SubscribeTopology is used to get a subscription for topology changes
// on the network.
SubscribeTopology func() (*graph.TopologyClient, error)
SubscribeTopology func() (*graphdb.TopologyClient, error)
}

// Manager is struct that manages an autopilot agent, making it possible to
Expand Down
2 changes: 0 additions & 2 deletions docs/release-notes/release-notes-0.19.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,6 @@ The underlying functionality between those two options remain the same.
- [Abstract autopilot access](https://github.com/lightningnetwork/lnd/pull/9480)
- [Abstract invoicerpc server access](https://github.com/lightningnetwork/lnd/pull/9516)
- [Refactor to hide DB transactions](https://github.com/lightningnetwork/lnd/pull/9513)
- Move the [graph cache out of the graph
CRUD](https://github.com/lightningnetwork/lnd/pull/9544) layer.

* [Golang was updated to
`v1.22.11`](https://github.com/lightningnetwork/lnd/pull/9462).
Expand Down
9 changes: 9 additions & 0 deletions docs/release-notes/release-notes-0.20.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@

## Code Health

* Graph abstraction and refactoring work:
- Move the [graph cache out of the graph
CRUD](https://github.com/lightningnetwork/lnd/pull/9544) layer.
- Move [topology
subscription](https://github.com/lightningnetwork/lnd/pull/9577) and
notification handling from the graph.Builder to the ChannelGraph.

## Tooling and Documentation

# Contributors (Alphabetical Order)

* Elle Mouton
117 changes: 6 additions & 111 deletions graph/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,7 @@ type Builder struct {
started atomic.Bool
stopped atomic.Bool

ntfnClientCounter atomic.Uint64
bestHeight atomic.Uint32
bestHeight atomic.Uint32

cfg *Config

Expand All @@ -123,22 +122,6 @@ type Builder struct {
// of our currently known best chain are sent over.
staleBlocks <-chan *chainview.FilteredBlock

// topologyUpdates is a channel that carries new topology updates
// messages from outside the Builder to be processed by the
// networkHandler.
topologyUpdates chan any

// topologyClients maps a client's unique notification ID to a
// topologyClient client that contains its notification dispatch
// channel.
topologyClients *lnutils.SyncMap[uint64, *topologyClient]

// ntfnClientUpdates is a channel that's used to send new updates to
// topology notification clients to the Builder. Updates either
// add a new notification client, or cancel notifications for an
// existing client.
ntfnClientUpdates chan *topologyClientUpdate

// channelEdgeMtx is a mutex we use to make sure we process only one
// ChannelEdgePolicy at a time for a given channelID, to ensure
// consistency between the various database accesses.
Expand All @@ -163,14 +146,11 @@ var _ ChannelGraphSource = (*Builder)(nil)
// NewBuilder constructs a new Builder.
func NewBuilder(cfg *Config) (*Builder, error) {
return &Builder{
cfg: cfg,
topologyUpdates: make(chan any),
topologyClients: &lnutils.SyncMap[uint64, *topologyClient]{},
ntfnClientUpdates: make(chan *topologyClientUpdate),
channelEdgeMtx: multimutex.NewMutex[uint64](),
statTicker: ticker.New(defaultStatInterval),
stats: new(builderStats),
quit: make(chan struct{}),
cfg: cfg,
channelEdgeMtx: multimutex.NewMutex[uint64](),
statTicker: ticker.New(defaultStatInterval),
stats: new(builderStats),
quit: make(chan struct{}),
}, nil
}

Expand Down Expand Up @@ -656,28 +636,6 @@ func (b *Builder) pruneZombieChans() error {
return nil
}

// handleTopologyUpdate is responsible for sending any topology changes
// notifications to registered clients.
//
// NOTE: must be run inside goroutine.
func (b *Builder) handleTopologyUpdate(update any) {
defer b.wg.Done()

topChange := &TopologyChange{}
err := addToTopologyChange(b.cfg.Graph, topChange, update)
if err != nil {
log.Errorf("unable to update topology change notification: %v",
err)
return
}

if topChange.isEmpty() {
return
}

b.notifyTopologyChange(topChange)
}

// networkHandler is the primary goroutine for the Builder. The roles of
// this goroutine include answering queries related to the state of the
// network, pruning the graph on new block notification, applying network
Expand All @@ -701,16 +659,6 @@ func (b *Builder) networkHandler() {
}

select {
// A new fully validated topology update has just arrived.
// We'll notify any registered clients.
case update := <-b.topologyUpdates:
b.wg.Add(1)
go b.handleTopologyUpdate(update)

// TODO(roasbeef): remove all unconnected vertexes
// after N blocks pass with no corresponding
// announcements.

case chainUpdate, ok := <-b.staleBlocks:
// If the channel has been closed, then this indicates
// the daemon is shutting down, so we exit ourselves.
Expand Down Expand Up @@ -783,31 +731,6 @@ func (b *Builder) networkHandler() {
" processed.", chainUpdate.Height)
}

// A new notification client update has arrived. We're either
// gaining a new client, or cancelling notifications for an
// existing client.
case ntfnUpdate := <-b.ntfnClientUpdates:
clientID := ntfnUpdate.clientID

if ntfnUpdate.cancel {
client, ok := b.topologyClients.LoadAndDelete(
clientID,
)
if ok {
close(client.exit)
client.wg.Wait()

close(client.ntfnChan)
}

continue
}

b.topologyClients.Store(clientID, &topologyClient{
ntfnChan: ntfnUpdate.ntfnChan,
exit: make(chan struct{}),
})

// The graph prune ticker has ticked, so we'll examine the
// state of the known graph to filter out any zombie channels
// for pruning.
Expand Down Expand Up @@ -934,16 +857,6 @@ func (b *Builder) updateGraphWithClosedChannels(
log.Infof("Block %v (height=%v) closed %v channels", chainUpdate.Hash,
blockHeight, len(chansClosed))

if len(chansClosed) == 0 {
return err
}

// Notify all currently registered clients of the newly closed channels.
closeSummaries := createCloseSummaries(blockHeight, chansClosed...)
b.notifyTopologyChange(&TopologyChange{
ClosedChannels: closeSummaries,
})

return nil
}

Expand Down Expand Up @@ -1067,12 +980,6 @@ func (b *Builder) AddNode(node *models.LightningNode,
return err
}

select {
case b.topologyUpdates <- node:
case <-b.quit:
return ErrGraphBuilderShuttingDown
}

return nil
}

Expand Down Expand Up @@ -1117,12 +1024,6 @@ func (b *Builder) AddEdge(edge *models.ChannelEdgeInfo,
return err
}

select {
case b.topologyUpdates <- edge:
case <-b.quit:
return ErrGraphBuilderShuttingDown
}

return nil
}

Expand Down Expand Up @@ -1224,12 +1125,6 @@ func (b *Builder) UpdateEdge(update *models.ChannelEdgePolicy,
return err
}

select {
case b.topologyUpdates <- update:
case <-b.quit:
return ErrGraphBuilderShuttingDown
}

return nil
}

Expand Down
Loading
Loading