Skip to content

Commit

Permalink
lntest: wait for ChanUpdate req to be fully processed before sending …
Browse files Browse the repository at this point in the history
…another

Before this commit, it was possible for a request to be sent on the
`chanWatchRequests` channel in `WaitForChannelPolicyUpdate` and then for
the `ticker.C` case to select _before_ the `eventChan` select gets
triggered when the `topologyWatcher` closes the `eventChan` in its call
to `handlePolicyUpdateWatchRequest`. This could lead to a "close of a
closed channel" panic.

To fix this, this commit ensures that we only move on to the next
iteration of the select statement in `WaitForChannelPolicyUpdate` once
the request sent on `chanWatchRequests` has been fully handled.
  • Loading branch information
ellemouton committed Mar 3, 2025
1 parent f744a54 commit 1a74ba4
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 17 deletions.
3 changes: 3 additions & 0 deletions docs/release-notes/release-notes-0.19.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,9 @@ The underlying functionality between those two options remain the same.
* Add a new CI-step to do some basic [backwards compatibility
testing](https://github.com/lightningnetwork/lnd/pull/9540) for each PR.

* [Fix]() an integration test flake that could lead to a "close of a closed
channel" panic.

## Database

* [Migrate the mission control
Expand Down
64 changes: 47 additions & 17 deletions lntest/node/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ type chanWatchRequest struct {
advertisingNode string
policy *lnrpc.RoutingPolicy
includeUnannounced bool

// handled is a channel that will be closed once the request has been
// handled by the topologyWatcher goroutine.
handled chan struct{}
}

// nodeWatcher is a topology watcher for a HarnessNode. It keeps track of all
Expand Down Expand Up @@ -154,6 +158,7 @@ func (nw *nodeWatcher) WaitForChannelOpen(chanPoint *lnrpc.ChannelPoint) error {
chanPoint: op,
eventChan: eventChan,
chanWatchType: watchOpenChannel,
handled: make(chan struct{}),
}

timer := time.After(wait.DefaultTimeout)
Expand Down Expand Up @@ -185,6 +190,7 @@ func (nw *nodeWatcher) WaitForChannelClose(
chanPoint: op,
eventChan: eventChan,
chanWatchType: watchCloseChannel,
handled: make(chan struct{}),
}

timer := time.After(wait.DefaultTimeout)
Expand Down Expand Up @@ -216,7 +222,30 @@ func (nw *nodeWatcher) WaitForChannelPolicyUpdate(
timer := time.After(wait.DefaultTimeout)
defer ticker.Stop()

eventChan := make(chan struct{})
// onTimeout is a helper function that will be called in case the
// expected policy is not found before the timeout.
onTimeout := func() error {
expected, err := json.MarshalIndent(policy, "", "\t")
if err != nil {
return fmt.Errorf("encode policy err: %w", err)
}

policies, err := syncMapToJSON(&nw.state.policyUpdates.Map)
if err != nil {
return err
}

return fmt.Errorf("policy not updated before timeout:"+
"\nchannel: %v \nadvertisingNode: %s:%v"+
"\nwant policy:%s\nhave updates:%s", op,
advertisingNode.Name(), advertisingNode.PubKeyStr,
expected, policies)
}

var (
eventChan = make(chan struct{})
handled = make(chan struct{})
)
for {
select {
// Send a watch request every second.
Expand All @@ -237,28 +266,25 @@ func (nw *nodeWatcher) WaitForChannelPolicyUpdate(
policy: policy,
advertisingNode: advertisingNode.PubKeyStr,
includeUnannounced: includeUnannounced,
handled: handled,
}

// We wait for the topologyWatcher to signal that
// it has completed the handling of the request so that
// we don't send a new request before the previous one
// has been processed as this could lead to a double
// closure of the eventChan channel.
select {
case <-handled:
case <-timer:
return onTimeout()
}

case <-eventChan:
return nil

case <-timer:
expected, err := json.MarshalIndent(policy, "", "\t")
if err != nil {
return fmt.Errorf("encode policy err: %w", err)
}
policies, err := syncMapToJSON(
&nw.state.policyUpdates.Map,
)
if err != nil {
return err
}

return fmt.Errorf("policy not updated before timeout:"+
"\nchannel: %v \nadvertisingNode: %s:%v"+
"\nwant policy:%s\nhave updates:%s", op,
advertisingNode.Name(),
advertisingNode.PubKeyStr, expected, policies)
return onTimeout()
}
}
}
Expand Down Expand Up @@ -341,6 +367,10 @@ func (nw *nodeWatcher) topologyWatcher(ctxb context.Context,
nw.handlePolicyUpdateWatchRequest(watchRequest)
}

// Signal to the caller that the request has been
// handled.
close(watchRequest.handled)

case <-ctxb.Done():
return
}
Expand Down

0 comments on commit 1a74ba4

Please sign in to comment.