-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
Copy pathgraph_cache.go
450 lines (373 loc) · 13.4 KB
/
graph_cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
package graphdb
import (
"fmt"
"sync"
"github.com/btcsuite/btcd/btcutil"
"github.com/lightningnetwork/lnd/graph/db/models"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route"
)
// GraphCacheNode is an interface for all the information the cache needs to know
// about a lightning node.
type GraphCacheNode interface {
// PubKey is the node's public identity key.
PubKey() route.Vertex
// Features returns the node's p2p features.
Features() *lnwire.FeatureVector
// ForEachChannel iterates through all channels of a given node,
// executing the passed callback with an edge info structure and the
// policies of each end of the channel. The first edge policy is the
// outgoing edge *to* the connecting node, while the second is the
// incoming edge *from* the connecting node. If the callback returns an
// error, then the iteration is halted with the error propagated back up
// to the caller.
ForEachChannel(kvdb.RTx,
func(kvdb.RTx, *models.ChannelEdgeInfo,
*models.ChannelEdgePolicy,
*models.ChannelEdgePolicy) error) error
}
// DirectedChannel is a type that stores the channel information as seen from
// one side of the channel.
type DirectedChannel struct {
// ChannelID is the unique identifier of this channel.
ChannelID uint64
// IsNode1 indicates if this is the node with the smaller public key.
IsNode1 bool
// OtherNode is the public key of the node on the other end of this
// channel.
OtherNode route.Vertex
// Capacity is the announced capacity of this channel in satoshis.
Capacity btcutil.Amount
// OutPolicySet is a boolean that indicates whether the node has an
// outgoing policy set. For pathfinding only the existence of the policy
// is important to know, not the actual content.
OutPolicySet bool
// InPolicy is the incoming policy *from* the other node to this node.
// In path finding, we're walking backward from the destination to the
// source, so we're always interested in the edge that arrives to us
// from the other node.
InPolicy *models.CachedEdgePolicy
// Inbound fees of this node.
InboundFee lnwire.Fee
}
// DeepCopy creates a deep copy of the channel, including the incoming policy.
func (c *DirectedChannel) DeepCopy() *DirectedChannel {
channelCopy := *c
if channelCopy.InPolicy != nil {
inPolicyCopy := *channelCopy.InPolicy
channelCopy.InPolicy = &inPolicyCopy
// The fields for the ToNode can be overwritten by the path
// finding algorithm, which is why we need a deep copy in the
// first place. So we always start out with nil values, just to
// be sure they don't contain any old data.
channelCopy.InPolicy.ToNodePubKey = nil
channelCopy.InPolicy.ToNodeFeatures = nil
}
return &channelCopy
}
// GraphCache is a type that holds a minimal set of information of the public
// channel graph that can be used for pathfinding.
type GraphCache struct {
nodeChannels map[route.Vertex]map[uint64]*DirectedChannel
nodeFeatures map[route.Vertex]*lnwire.FeatureVector
mtx sync.RWMutex
}
// NewGraphCache creates a new graphCache.
func NewGraphCache(preAllocNumNodes int) *GraphCache {
return &GraphCache{
nodeChannels: make(
map[route.Vertex]map[uint64]*DirectedChannel,
// A channel connects two nodes, so we can look it up
// from both sides, meaning we get double the number of
// entries.
preAllocNumNodes*2,
),
nodeFeatures: make(
map[route.Vertex]*lnwire.FeatureVector,
preAllocNumNodes,
),
}
}
// Stats returns statistics about the current cache size.
func (c *GraphCache) Stats() string {
c.mtx.RLock()
defer c.mtx.RUnlock()
numChannels := 0
for node := range c.nodeChannels {
numChannels += len(c.nodeChannels[node])
}
return fmt.Sprintf("num_node_features=%d, num_nodes=%d, "+
"num_channels=%d", len(c.nodeFeatures), len(c.nodeChannels),
numChannels)
}
// AddNodeFeatures adds a graph node and its features to the cache.
func (c *GraphCache) AddNodeFeatures(node GraphCacheNode) {
nodePubKey := node.PubKey()
// Only hold the lock for a short time. The `ForEachChannel()` below is
// possibly slow as it has to go to the backend, so we can unlock
// between the calls. And the AddChannel() method will acquire its own
// lock anyway.
c.mtx.Lock()
c.nodeFeatures[nodePubKey] = node.Features()
c.mtx.Unlock()
}
// AddNode adds a graph node, including all the (directed) channels of that
// node.
func (c *GraphCache) AddNode(tx kvdb.RTx, node GraphCacheNode) error {
c.AddNodeFeatures(node)
return node.ForEachChannel(
tx, func(tx kvdb.RTx, info *models.ChannelEdgeInfo,
outPolicy *models.ChannelEdgePolicy,
inPolicy *models.ChannelEdgePolicy) error {
c.AddChannel(info, outPolicy, inPolicy)
return nil
},
)
}
// AddChannel adds a non-directed channel, meaning that the order of policy 1
// and policy 2 does not matter, the directionality is extracted from the info
// and policy flags automatically. The policy will be set as the outgoing policy
// on one node and the incoming policy on the peer's side.
func (c *GraphCache) AddChannel(info *models.ChannelEdgeInfo,
policy1 *models.ChannelEdgePolicy, policy2 *models.ChannelEdgePolicy) {
if info == nil {
return
}
if policy1 != nil && policy1.IsDisabled() &&
policy2 != nil && policy2.IsDisabled() {
return
}
// Create the edge entry for both nodes.
c.mtx.Lock()
c.updateOrAddEdge(info.NodeKey1Bytes, &DirectedChannel{
ChannelID: info.ChannelID,
IsNode1: true,
OtherNode: info.NodeKey2Bytes,
Capacity: info.Capacity,
})
c.updateOrAddEdge(info.NodeKey2Bytes, &DirectedChannel{
ChannelID: info.ChannelID,
IsNode1: false,
OtherNode: info.NodeKey1Bytes,
Capacity: info.Capacity,
})
c.mtx.Unlock()
// The policy's node is always the to_node. So if policy 1 has to_node
// of node 2 then we have the policy 1 as seen from node 1.
if policy1 != nil {
fromNode, toNode := info.NodeKey1Bytes, info.NodeKey2Bytes
if policy1.ToNode != info.NodeKey2Bytes {
fromNode, toNode = toNode, fromNode
}
isEdge1 := policy1.ChannelFlags&lnwire.ChanUpdateDirection == 0
c.UpdatePolicy(policy1, fromNode, toNode, isEdge1)
}
if policy2 != nil {
fromNode, toNode := info.NodeKey2Bytes, info.NodeKey1Bytes
if policy2.ToNode != info.NodeKey1Bytes {
fromNode, toNode = toNode, fromNode
}
isEdge1 := policy2.ChannelFlags&lnwire.ChanUpdateDirection == 0
c.UpdatePolicy(policy2, fromNode, toNode, isEdge1)
}
}
// updateOrAddEdge makes sure the edge information for a node is either updated
// if it already exists or is added to that node's list of channels.
func (c *GraphCache) updateOrAddEdge(node route.Vertex, edge *DirectedChannel) {
if len(c.nodeChannels[node]) == 0 {
c.nodeChannels[node] = make(map[uint64]*DirectedChannel)
}
c.nodeChannels[node][edge.ChannelID] = edge
}
// UpdatePolicy updates a single policy on both the from and to node. The order
// of the from and to node is not strictly important. But we assume that a
// channel edge was added beforehand so that the directed channel struct already
// exists in the cache.
func (c *GraphCache) UpdatePolicy(policy *models.ChannelEdgePolicy, fromNode,
toNode route.Vertex, edge1 bool) {
// Extract inbound fee if possible and available. If there is a decoding
// error, ignore this policy.
var inboundFee lnwire.Fee
_, err := policy.ExtraOpaqueData.ExtractRecords(&inboundFee)
if err != nil {
log.Errorf("Failed to extract records from edge policy %v: %v",
policy.ChannelID, err)
return
}
c.mtx.Lock()
defer c.mtx.Unlock()
updatePolicy := func(nodeKey route.Vertex) {
if len(c.nodeChannels[nodeKey]) == 0 {
log.Warnf("Node=%v not found in graph cache", nodeKey)
return
}
channel, ok := c.nodeChannels[nodeKey][policy.ChannelID]
if !ok {
log.Warnf("Channel=%v not found in graph cache",
policy.ChannelID)
return
}
// Edge 1 is defined as the policy for the direction of node1 to
// node2.
switch {
// This is node 1, and it is edge 1, so this is the outgoing
// policy for node 1.
case channel.IsNode1 && edge1:
channel.OutPolicySet = true
channel.InboundFee = inboundFee
// This is node 2, and it is edge 2, so this is the outgoing
// policy for node 2.
case !channel.IsNode1 && !edge1:
channel.OutPolicySet = true
channel.InboundFee = inboundFee
// The other two cases left mean it's the inbound policy for the
// node.
default:
channel.InPolicy = models.NewCachedPolicy(policy)
}
}
updatePolicy(fromNode)
updatePolicy(toNode)
}
// RemoveNode completely removes a node and all its channels (including the
// peer's side).
func (c *GraphCache) RemoveNode(node route.Vertex) {
c.mtx.Lock()
defer c.mtx.Unlock()
delete(c.nodeFeatures, node)
// First remove all channels from the other nodes' lists.
for _, channel := range c.nodeChannels[node] {
c.removeChannelIfFound(channel.OtherNode, channel.ChannelID)
}
// Then remove our whole node completely.
delete(c.nodeChannels, node)
}
// RemoveChannel removes a single channel between two nodes.
func (c *GraphCache) RemoveChannel(node1, node2 route.Vertex, chanID uint64) {
c.mtx.Lock()
defer c.mtx.Unlock()
// Remove that one channel from both sides.
c.removeChannelIfFound(node1, chanID)
c.removeChannelIfFound(node2, chanID)
}
// removeChannelIfFound removes a single channel from one side.
func (c *GraphCache) removeChannelIfFound(node route.Vertex, chanID uint64) {
if len(c.nodeChannels[node]) == 0 {
return
}
delete(c.nodeChannels[node], chanID)
}
// UpdateChannel updates the channel edge information for a specific edge. We
// expect the edge to already exist and be known. If it does not yet exist, this
// call is a no-op.
func (c *GraphCache) UpdateChannel(info *models.ChannelEdgeInfo) {
c.mtx.Lock()
defer c.mtx.Unlock()
if len(c.nodeChannels[info.NodeKey1Bytes]) == 0 ||
len(c.nodeChannels[info.NodeKey2Bytes]) == 0 {
return
}
channel, ok := c.nodeChannels[info.NodeKey1Bytes][info.ChannelID]
if ok {
// We only expect to be called when the channel is already
// known.
channel.Capacity = info.Capacity
channel.OtherNode = info.NodeKey2Bytes
}
channel, ok = c.nodeChannels[info.NodeKey2Bytes][info.ChannelID]
if ok {
channel.Capacity = info.Capacity
channel.OtherNode = info.NodeKey1Bytes
}
}
// getChannels returns a copy of the passed node's channels or nil if there
// isn't any.
func (c *GraphCache) getChannels(node route.Vertex) []*DirectedChannel {
c.mtx.RLock()
defer c.mtx.RUnlock()
channels, ok := c.nodeChannels[node]
if !ok {
return nil
}
features, ok := c.nodeFeatures[node]
if !ok {
// If the features were set to nil explicitly, that's fine here.
// The router will overwrite the features of the destination
// node with those found in the invoice if necessary. But if we
// didn't yet get a node announcement we want to mimic the
// behavior of the old DB based code that would always set an
// empty feature vector instead of leaving it nil.
features = lnwire.EmptyFeatureVector()
}
toNodeCallback := func() route.Vertex {
return node
}
i := 0
channelsCopy := make([]*DirectedChannel, len(channels))
for _, channel := range channels {
// We need to copy the channel and policy to avoid it being
// updated in the cache if the path finding algorithm sets
// fields on it (currently only the ToNodeFeatures of the
// policy).
channelCopy := channel.DeepCopy()
if channelCopy.InPolicy != nil {
channelCopy.InPolicy.ToNodePubKey = toNodeCallback
channelCopy.InPolicy.ToNodeFeatures = features
}
channelsCopy[i] = channelCopy
i++
}
return channelsCopy
}
// ForEachChannel invokes the given callback for each channel of the given node.
func (c *GraphCache) ForEachChannel(node route.Vertex,
cb func(channel *DirectedChannel) error) error {
// Obtain a copy of the node's channels. We need do this in order to
// avoid deadlocks caused by interaction with the graph cache, channel
// state and the graph database from multiple goroutines. This snapshot
// is only used for path finding where being stale is acceptable since
// the real world graph and our representation may always become
// slightly out of sync for a short time and the actual channel state
// is stored separately.
channels := c.getChannels(node)
for _, channel := range channels {
if err := cb(channel); err != nil {
return err
}
}
return nil
}
// ForEachNode iterates over the adjacency list of the graph, executing the
// call back for each node and the set of channels that emanate from the given
// node.
//
// NOTE: This method should be considered _read only_, the channels or nodes
// passed in MUST NOT be modified.
func (c *GraphCache) ForEachNode(cb func(node route.Vertex,
channels map[uint64]*DirectedChannel) error) error {
c.mtx.RLock()
defer c.mtx.RUnlock()
for node, channels := range c.nodeChannels {
// We don't make a copy here since this is a read-only RPC
// call. We also don't need the node features either for this
// call.
if err := cb(node, channels); err != nil {
return err
}
}
return nil
}
// GetFeatures returns the features of the node with the given ID. If no
// features are known for the node, an empty feature vector is returned.
func (c *GraphCache) GetFeatures(node route.Vertex) *lnwire.FeatureVector {
c.mtx.RLock()
defer c.mtx.RUnlock()
features, ok := c.nodeFeatures[node]
if !ok || features == nil {
// The router expects the features to never be nil, so we return
// an empty feature set instead.
return lnwire.EmptyFeatureVector()
}
return features
}