Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Marco/with after func #763

Merged
merged 3 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ The following emojis are used to highlight certain changes:

* 🛠 `blockstore` and `blockservice`'s `WriteThrough()` option now takes an "enabled" parameter: `WriteThrough(enabled bool)`.
* Replaced unmaintained mock time implementation uses in tests: [from](github.com/benbjohnson/clock) => [to](github.com/filecoin-project/go-clock)
* `bitswap/client`: if a libp2p connection has a context, use `context.AfterFunc` to cleanup the connection.

### Removed



### Fixed

* `mfs`: directory cache is now cleared on Flush(), liberating the memory used by the otherwise ever-growing cache. References to directories and sub-directories should be renewed after flushing.
Expand Down
44 changes: 36 additions & 8 deletions bitswap/network/ipfs_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,32 @@
receivers []Receiver
}

// interfaceWrapper is concrete type that wraps an interface. Necessary because
// atomic.Value needs the same type and can not Store(nil). This indirection
// allows us to store nil.
type interfaceWrapper[T any] struct {
t T
}
type atomicInterface[T any] struct {
iface atomic.Value
}

func (a *atomicInterface[T]) Load() T {
var v T
x := a.iface.Load()
if x != nil {
return x.(interfaceWrapper[T]).t
}
return v
}

func (a *atomicInterface[T]) Store(v T) {
a.iface.Store(interfaceWrapper[T]{v})
}

type streamMessageSender struct {
to peer.ID
stream network.Stream
stream atomicInterface[network.Stream]
bsnet *impl
opts *MessageSenderOpts
}
Expand All @@ -95,7 +118,7 @@

// Open a stream to the remote peer
func (s *streamMessageSender) Connect(ctx context.Context) (network.Stream, error) {
stream := s.stream
stream := s.stream.Load()
if stream != nil {
return stream, nil
}
Expand All @@ -111,36 +134,41 @@
if err != nil {
return nil, err
}
if withCtx, ok := stream.Conn().(HasContext); ok {
context.AfterFunc(withCtx.Context(), func() {
s.stream.Store(nil)
})

Check warning on line 140 in bitswap/network/ipfs_impl.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/ipfs_impl.go#L138-L140

Added lines #L138 - L140 were not covered by tests
}

s.stream = stream
s.stream.Store(stream)
return stream, nil
}

// Reset the stream
func (s *streamMessageSender) Reset() error {
stream := s.stream
stream := s.stream.Load()
if stream != nil {
err := stream.Reset()
s.stream = nil
s.stream.Store(nil)
return err
}
return nil
}

// Close the stream
func (s *streamMessageSender) Close() error {
stream := s.stream
stream := s.stream.Load()
if stream != nil {
err := stream.Close()
s.stream = nil
s.stream.Store(nil)
return err
}
return nil
}

// Indicates whether the peer supports HAVE / DONT_HAVE messages
func (s *streamMessageSender) SupportsHave() bool {
stream := s.stream
stream := s.stream.Load()
if stream == nil {
return false
}
Expand Down
Loading