Skip to content

Commit

Permalink
fix: Drop stream references on Close/Reset
Browse files Browse the repository at this point in the history
Lets connections get GC'd sooner (possibly before the
streamMessageSender is GC'd)
  • Loading branch information
MarcoPolo authored and gammazero committed Dec 18, 2024
1 parent 2a5c7d0 commit ecbf66e
Showing 1 changed file with 52 additions and 15 deletions.
67 changes: 52 additions & 15 deletions bitswap/network/ipfs_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,45 @@ type impl struct {
receivers []Receiver
}

// interfaceWrapper is concrete type that wraps an interface. Necessary because
// atomic.Value needs the same type and can not Store(nil). This indirection
// allows us to store nil.
type interfaceWrapper[T any] struct {
t T
}
type atomicInterface[T any] struct {
iface atomic.Value
}

func (a *atomicInterface[T]) Load() T {
var v T
x := a.iface.Load()
if x != nil {
return x.(interfaceWrapper[T]).t
}
return v
}

func (a *atomicInterface[T]) Store(v T) {
a.iface.Store(interfaceWrapper[T]{v})
}

type streamMessageSender struct {
to peer.ID
stream network.Stream
connected bool
bsnet *impl
opts *MessageSenderOpts
to peer.ID
stream atomicInterface[network.Stream]
bsnet *impl
opts *MessageSenderOpts
}

type HasContext interface {
Context() context.Context
}

// Open a stream to the remote peer
func (s *streamMessageSender) Connect(ctx context.Context) (network.Stream, error) {
if s.connected {
return s.stream, nil
stream := s.stream.Load()
if stream != nil {
return stream, nil
}

tctx, cancel := context.WithTimeout(ctx, s.opts.SendTimeout)
Expand All @@ -108,29 +135,39 @@ func (s *streamMessageSender) Connect(ctx context.Context) (network.Stream, erro
return nil, err
}

s.stream = stream
s.connected = true
return s.stream, nil
s.stream.Store(stream)
return stream, nil
}

// Reset the stream
func (s *streamMessageSender) Reset() error {
if s.stream != nil {
err := s.stream.Reset()
s.connected = false
stream := s.stream.Load()
if stream != nil {
err := stream.Reset()
s.stream.Store(nil)
return err
}
return nil
}

// Close the stream
func (s *streamMessageSender) Close() error {
return s.stream.Close()
stream := s.stream.Load()
if stream != nil {
err := stream.Close()
s.stream.Store(nil)
return err
}
return nil
}

// Indicates whether the peer supports HAVE / DONT_HAVE messages
func (s *streamMessageSender) SupportsHave() bool {
return s.bsnet.SupportsHave(s.stream.Protocol())
stream := s.stream.Load()
if stream == nil {
return false
}
return s.bsnet.SupportsHave(stream.Protocol())
}

// Send a message to the peer, attempting multiple times
Expand Down

0 comments on commit ecbf66e

Please sign in to comment.