diff --git a/CHANGELOG.md b/CHANGELOG.md index 9cc528ec7..7c8250528 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,11 +27,11 @@ The following emojis are used to highlight certain changes: * 🛠 `blockstore` and `blockservice`'s `WriteThrough()` option now takes an "enabled" parameter: `WriteThrough(enabled bool)`. * Replaced unmaintained mock time implementation uses in tests: [from](github.com/benbjohnson/clock) => [to](github.com/filecoin-project/go-clock) +* `bitswap/client`: if a libp2p connection has a context, use `context.AfterFunc` to cleanup the connection. ### Removed - ### Fixed * `mfs`: directory cache is now cleared on Flush(), liberating the memory used by the otherwise ever-growing cache. References to directories and sub-directories should be renewed after flushing. diff --git a/bitswap/network/ipfs_impl.go b/bitswap/network/ipfs_impl.go index 46b4120c0..4a60aaf6b 100644 --- a/bitswap/network/ipfs_impl.go +++ b/bitswap/network/ipfs_impl.go @@ -82,9 +82,32 @@ type impl struct { receivers []Receiver } +// interfaceWrapper is concrete type that wraps an interface. Necessary because +// atomic.Value needs the same type and can not Store(nil). This indirection +// allows us to store nil. +type interfaceWrapper[T any] struct { + t T +} +type atomicInterface[T any] struct { + iface atomic.Value +} + +func (a *atomicInterface[T]) Load() T { + var v T + x := a.iface.Load() + if x != nil { + return x.(interfaceWrapper[T]).t + } + return v +} + +func (a *atomicInterface[T]) Store(v T) { + a.iface.Store(interfaceWrapper[T]{v}) +} + type streamMessageSender struct { to peer.ID - stream network.Stream + stream atomicInterface[network.Stream] bsnet *impl opts *MessageSenderOpts } @@ -95,7 +118,7 @@ type HasContext interface { // Open a stream to the remote peer func (s *streamMessageSender) Connect(ctx context.Context) (network.Stream, error) { - stream := s.stream + stream := s.stream.Load() if stream != nil { return stream, nil } @@ -111,17 +134,22 @@ func (s *streamMessageSender) Connect(ctx context.Context) (network.Stream, erro if err != nil { return nil, err } + if withCtx, ok := stream.Conn().(HasContext); ok { + context.AfterFunc(withCtx.Context(), func() { + s.stream.Store(nil) + }) + } - s.stream = stream + s.stream.Store(stream) return stream, nil } // Reset the stream func (s *streamMessageSender) Reset() error { - stream := s.stream + stream := s.stream.Load() if stream != nil { err := stream.Reset() - s.stream = nil + s.stream.Store(nil) return err } return nil @@ -129,10 +157,10 @@ func (s *streamMessageSender) Reset() error { // Close the stream func (s *streamMessageSender) Close() error { - stream := s.stream + stream := s.stream.Load() if stream != nil { err := stream.Close() - s.stream = nil + s.stream.Store(nil) return err } return nil @@ -140,7 +168,7 @@ func (s *streamMessageSender) Close() error { // Indicates whether the peer supports HAVE / DONT_HAVE messages func (s *streamMessageSender) SupportsHave() bool { - stream := s.stream + stream := s.stream.Load() if stream == nil { return false }