Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lntest: wait for ChanUpdate req to be fully processed before sending another #9574

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/release-notes/release-notes-0.19.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,9 @@ The underlying functionality between those two options remain the same.
* Add a new CI-step to do some basic [backwards compatibility
testing](https://github.com/lightningnetwork/lnd/pull/9540) for each PR.

* [Fix](https://github.com/lightningnetwork/lnd/pull/9574) an integration test
flake that could lead to a "close of a closed channel" panic.

## Database

* [Migrate the mission control
Expand Down
62 changes: 45 additions & 17 deletions lntest/node/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ type chanWatchRequest struct {
advertisingNode string
policy *lnrpc.RoutingPolicy
includeUnannounced bool

// handled is a channel that will be closed once the request has been
// handled by the topologyWatcher goroutine.
handled chan struct{}
}

// nodeWatcher is a topology watcher for a HarnessNode. It keeps track of all
Expand Down Expand Up @@ -154,6 +158,7 @@ func (nw *nodeWatcher) WaitForChannelOpen(chanPoint *lnrpc.ChannelPoint) error {
chanPoint: op,
eventChan: eventChan,
chanWatchType: watchOpenChannel,
handled: make(chan struct{}),
}

timer := time.After(wait.DefaultTimeout)
Expand Down Expand Up @@ -185,6 +190,7 @@ func (nw *nodeWatcher) WaitForChannelClose(
chanPoint: op,
eventChan: eventChan,
chanWatchType: watchCloseChannel,
handled: make(chan struct{}),
}

timer := time.After(wait.DefaultTimeout)
Expand Down Expand Up @@ -216,7 +222,27 @@ func (nw *nodeWatcher) WaitForChannelPolicyUpdate(
timer := time.After(wait.DefaultTimeout)
defer ticker.Stop()

eventChan := make(chan struct{})
// onTimeout is a helper function that will be called in case the
// expected policy is not found before the timeout.
onTimeout := func() error {
expected, err := json.MarshalIndent(policy, "", "\t")
if err != nil {
return fmt.Errorf("encode policy err: %w", err)
}

policies, err := syncMapToJSON(&nw.state.policyUpdates.Map)
if err != nil {
return err
}

return fmt.Errorf("policy not updated before timeout:"+
"\nchannel: %v \nadvertisingNode: %s:%v"+
"\nwant policy:%s\nhave updates:%s", op,
advertisingNode.Name(), advertisingNode.PubKeyStr,
expected, policies)
}

var eventChan = make(chan struct{})
for {
select {
// Send a watch request every second.
Expand All @@ -230,35 +256,33 @@ func (nw *nodeWatcher) WaitForChannelPolicyUpdate(
default:
}

var handled = make(chan struct{})
nw.chanWatchRequests <- &chanWatchRequest{
chanPoint: op,
eventChan: eventChan,
chanWatchType: watchPolicyUpdate,
policy: policy,
advertisingNode: advertisingNode.PubKeyStr,
includeUnannounced: includeUnannounced,
handled: handled,
}

// We wait for the topologyWatcher to signal that
// it has completed the handling of the request so that
// we don't send a new request before the previous one
// has been processed as this could lead to a double
// closure of the eventChan channel.
select {
case <-handled:
case <-timer:
return onTimeout()
}

case <-eventChan:
return nil

case <-timer:
expected, err := json.MarshalIndent(policy, "", "\t")
if err != nil {
return fmt.Errorf("encode policy err: %w", err)
}
policies, err := syncMapToJSON(
&nw.state.policyUpdates.Map,
)
if err != nil {
return err
}

return fmt.Errorf("policy not updated before timeout:"+
"\nchannel: %v \nadvertisingNode: %s:%v"+
"\nwant policy:%s\nhave updates:%s", op,
advertisingNode.Name(),
advertisingNode.PubKeyStr, expected, policies)
return onTimeout()
}
}
}
Expand Down Expand Up @@ -341,6 +365,10 @@ func (nw *nodeWatcher) topologyWatcher(ctxb context.Context,
nw.handlePolicyUpdateWatchRequest(watchRequest)
}

// Signal to the caller that the request has been
// handled.
close(watchRequest.handled)

case <-ctxb.Done():
return
}
Expand Down
Loading