From d5579e4a007b2f29081eb2bbd6b84bbc897c3fa5 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Wed, 18 Dec 2024 09:11:16 -0800 Subject: [PATCH 1/3] Revert "remove atomic" This reverts commit ee0082a44a67efb23b01bb6a9c10c8644cb4231e. --- bitswap/network/ipfs_impl.go | 39 ++++++++++++++++++++++++++++-------- 1 file changed, 31 insertions(+), 8 deletions(-) diff --git a/bitswap/network/ipfs_impl.go b/bitswap/network/ipfs_impl.go index 46b4120c0..dcc8849da 100644 --- a/bitswap/network/ipfs_impl.go +++ b/bitswap/network/ipfs_impl.go @@ -82,9 +82,32 @@ type impl struct { receivers []Receiver } +// interfaceWrapper is concrete type that wraps an interface. Necessary because +// atomic.Value needs the same type and can not Store(nil). This indirection +// allows us to store nil. +type interfaceWrapper[T any] struct { + t T +} +type atomicInterface[T any] struct { + iface atomic.Value +} + +func (a *atomicInterface[T]) Load() T { + var v T + x := a.iface.Load() + if x != nil { + return x.(interfaceWrapper[T]).t + } + return v +} + +func (a *atomicInterface[T]) Store(v T) { + a.iface.Store(interfaceWrapper[T]{v}) +} + type streamMessageSender struct { to peer.ID - stream network.Stream + stream atomicInterface[network.Stream] bsnet *impl opts *MessageSenderOpts } @@ -95,7 +118,7 @@ type HasContext interface { // Open a stream to the remote peer func (s *streamMessageSender) Connect(ctx context.Context) (network.Stream, error) { - stream := s.stream + stream := s.stream.Load() if stream != nil { return stream, nil } @@ -112,16 +135,16 @@ func (s *streamMessageSender) Connect(ctx context.Context) (network.Stream, erro return nil, err } - s.stream = stream + s.stream.Store(stream) return stream, nil } // Reset the stream func (s *streamMessageSender) Reset() error { - stream := s.stream + stream := s.stream.Load() if stream != nil { err := stream.Reset() - s.stream = nil + s.stream.Store(nil) return err } return nil @@ -129,10 +152,10 @@ func (s *streamMessageSender) Reset() error { // Close the stream func (s *streamMessageSender) Close() error { - stream := s.stream + stream := s.stream.Load() if stream != nil { err := stream.Close() - s.stream = nil + s.stream.Store(nil) return err } return nil @@ -140,7 +163,7 @@ func (s *streamMessageSender) Close() error { // Indicates whether the peer supports HAVE / DONT_HAVE messages func (s *streamMessageSender) SupportsHave() bool { - stream := s.stream + stream := s.stream.Load() if stream == nil { return false } From f7d0ea702bd1c5f67ccee845073380e70e48aaf5 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Mon, 16 Dec 2024 15:03:45 -0800 Subject: [PATCH 2/3] refactor: if the conn has a context, use afterfunc for cleanup --- bitswap/network/ipfs_impl.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/bitswap/network/ipfs_impl.go b/bitswap/network/ipfs_impl.go index dcc8849da..4a60aaf6b 100644 --- a/bitswap/network/ipfs_impl.go +++ b/bitswap/network/ipfs_impl.go @@ -134,6 +134,11 @@ func (s *streamMessageSender) Connect(ctx context.Context) (network.Stream, erro if err != nil { return nil, err } + if withCtx, ok := stream.Conn().(HasContext); ok { + context.AfterFunc(withCtx.Context(), func() { + s.stream.Store(nil) + }) + } s.stream.Store(stream) return stream, nil From 48971b917dea847db7376b211846609b0bbe8d48 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Wed, 18 Dec 2024 07:35:34 -1000 Subject: [PATCH 3/3] update changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9cc528ec7..7c8250528 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,11 +27,11 @@ The following emojis are used to highlight certain changes: * 🛠 `blockstore` and `blockservice`'s `WriteThrough()` option now takes an "enabled" parameter: `WriteThrough(enabled bool)`. * Replaced unmaintained mock time implementation uses in tests: [from](github.com/benbjohnson/clock) => [to](github.com/filecoin-project/go-clock) +* `bitswap/client`: if a libp2p connection has a context, use `context.AfterFunc` to cleanup the connection. ### Removed - ### Fixed * `mfs`: directory cache is now cleared on Flush(), liberating the memory used by the otherwise ever-growing cache. References to directories and sub-directories should be renewed after flushing.