From 27b5c9a3da7b7c0e75d1905c84a9035479296870 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Mon, 3 Mar 2025 17:24:15 +0200 Subject: [PATCH] lntest: wait for ChanUpdate req to be fully processed before sending another Before this commit, it was possible for a request to be sent on the `chanWatchRequests` channel in `WaitForChannelPolicyUpdate` and then for the `ticker.C` case to select _before_ the `eventChan` select gets triggered when the `topologyWatcher` closes the `eventChan` in its call to `handlePolicyUpdateWatchRequest`. This could lead to a "close of a closed channel" panic. To fix this, this commit ensures that we only move on to the next iteration of the select statement in `WaitForChannelPolicyUpdate` once the request sent on `chanWatchRequests` has been fully handled. --- docs/release-notes/release-notes-0.19.0.md | 3 + lntest/node/watcher.go | 64 ++++++++++++++++------ 2 files changed, 50 insertions(+), 17 deletions(-) diff --git a/docs/release-notes/release-notes-0.19.0.md b/docs/release-notes/release-notes-0.19.0.md index 118c1fde5d..d974d89b5a 100644 --- a/docs/release-notes/release-notes-0.19.0.md +++ b/docs/release-notes/release-notes-0.19.0.md @@ -323,6 +323,9 @@ The underlying functionality between those two options remain the same. * Add a new CI-step to do some basic [backwards compatibility testing](https://github.com/lightningnetwork/lnd/pull/9540) for each PR. +* [Fix](https://github.com/lightningnetwork/lnd/pull/9574) an integration test + flake that could lead to a "close of a closed channel" panic. + ## Database * [Migrate the mission control diff --git a/lntest/node/watcher.go b/lntest/node/watcher.go index f7672b825a..031f99aa3c 100644 --- a/lntest/node/watcher.go +++ b/lntest/node/watcher.go @@ -45,6 +45,10 @@ type chanWatchRequest struct { advertisingNode string policy *lnrpc.RoutingPolicy includeUnannounced bool + + // handled is a channel that will be closed once the request has been + // handled by the topologyWatcher goroutine. + handled chan struct{} } // nodeWatcher is a topology watcher for a HarnessNode. It keeps track of all @@ -154,6 +158,7 @@ func (nw *nodeWatcher) WaitForChannelOpen(chanPoint *lnrpc.ChannelPoint) error { chanPoint: op, eventChan: eventChan, chanWatchType: watchOpenChannel, + handled: make(chan struct{}), } timer := time.After(wait.DefaultTimeout) @@ -185,6 +190,7 @@ func (nw *nodeWatcher) WaitForChannelClose( chanPoint: op, eventChan: eventChan, chanWatchType: watchCloseChannel, + handled: make(chan struct{}), } timer := time.After(wait.DefaultTimeout) @@ -216,7 +222,30 @@ func (nw *nodeWatcher) WaitForChannelPolicyUpdate( timer := time.After(wait.DefaultTimeout) defer ticker.Stop() - eventChan := make(chan struct{}) + // onTimeout is a helper function that will be called in case the + // expected policy is not found before the timeout. + onTimeout := func() error { + expected, err := json.MarshalIndent(policy, "", "\t") + if err != nil { + return fmt.Errorf("encode policy err: %w", err) + } + + policies, err := syncMapToJSON(&nw.state.policyUpdates.Map) + if err != nil { + return err + } + + return fmt.Errorf("policy not updated before timeout:"+ + "\nchannel: %v \nadvertisingNode: %s:%v"+ + "\nwant policy:%s\nhave updates:%s", op, + advertisingNode.Name(), advertisingNode.PubKeyStr, + expected, policies) + } + + var ( + eventChan = make(chan struct{}) + handled = make(chan struct{}) + ) for { select { // Send a watch request every second. @@ -237,28 +266,25 @@ func (nw *nodeWatcher) WaitForChannelPolicyUpdate( policy: policy, advertisingNode: advertisingNode.PubKeyStr, includeUnannounced: includeUnannounced, + handled: handled, + } + + // We wait for the topologyWatcher to signal that + // it has completed the handling of the request so that + // we don't send a new request before the previous one + // has been processed as this could lead to a double + // closure of the eventChan channel. + select { + case <-handled: + case <-timer: + return onTimeout() } case <-eventChan: return nil case <-timer: - expected, err := json.MarshalIndent(policy, "", "\t") - if err != nil { - return fmt.Errorf("encode policy err: %w", err) - } - policies, err := syncMapToJSON( - &nw.state.policyUpdates.Map, - ) - if err != nil { - return err - } - - return fmt.Errorf("policy not updated before timeout:"+ - "\nchannel: %v \nadvertisingNode: %s:%v"+ - "\nwant policy:%s\nhave updates:%s", op, - advertisingNode.Name(), - advertisingNode.PubKeyStr, expected, policies) + return onTimeout() } } } @@ -341,6 +367,10 @@ func (nw *nodeWatcher) topologyWatcher(ctxb context.Context, nw.handlePolicyUpdateWatchRequest(watchRequest) } + // Signal to the caller that the request has been + // handled. + close(watchRequest.handled) + case <-ctxb.Done(): return }