Skip to content

Commit

Permalink
httpu: add context.Context and related interface
Browse files Browse the repository at this point in the history
This adds a new interface for httpu that supports a Context, and uses
that context to set a deadline/timeout and also cancel the request if
the context is canceled. Additionally, implement the Do method in terms
of the DoWithContext method for the HTTPUClient implementation.

Updates huin#55

Signed-off-by: Andrew Dunham <[email protected]>
  • Loading branch information
andrew-d committed Aug 22, 2023
1 parent 8ca2329 commit 981f6b1
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 4 deletions.
59 changes: 57 additions & 2 deletions httpu/httpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package httpu
import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"log"
Expand All @@ -26,6 +27,25 @@ type ClientInterface interface {
) ([]*http.Response, error)
}

// ClientInterfaceCtx is the equivalent of ClientInterface, except with methods
// taking a context.Context parameter.
type ClientInterfaceCtx interface {
// ClientInterfaceCtx embeds and is a superset of ClientInterface.
ClientInterface

// DoWithContext performs a request. If the input request has a
// deadline, then that value will be used as the timeout for how long
// to wait before returning the responses that were received. If the
// request's context is canceled, this method will return immediately.
//
// An error is only returned for failing to send the request. Failures
// in receipt simply do not add to the resulting responses.
DoWithContext(
req *http.Request,
numSends int,
) ([]*http.Response, error)
}

// HTTPUClient is a client for dealing with HTTPU (HTTP over UDP). Its typical
// function is for HTTPMU, and particularly SSDP.
type HTTPUClient struct {
Expand All @@ -34,6 +54,7 @@ type HTTPUClient struct {
}

var _ ClientInterface = &HTTPUClient{}
var _ ClientInterfaceCtx = &HTTPUClient{}

// NewHTTPUClient creates a new HTTPUClient, opening up a new UDP socket for the
// purpose.
Expand Down Expand Up @@ -75,6 +96,22 @@ func (httpu *HTTPUClient) Do(
req *http.Request,
timeout time.Duration,
numSends int,
) ([]*http.Response, error) {
ctx := req.Context()
if timeout > 0 {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
req = req.WithContext(ctx)
}

return httpu.DoWithContext(req, numSends)
}

// DoWithContext implements ClientInterfaceCtx.DoWithContext.
func (httpu *HTTPUClient) DoWithContext(
req *http.Request,
numSends int,
) ([]*http.Response, error) {
httpu.connLock.Lock()
defer httpu.connLock.Unlock()
Expand All @@ -101,10 +138,28 @@ func (httpu *HTTPUClient) Do(
if err != nil {
return nil, err
}
if err = httpu.conn.SetDeadline(time.Now().Add(timeout)); err != nil {
return nil, err

// Handle context deadline/timeout
ctx := req.Context()
deadline, ok := ctx.Deadline()
if ok {
if err = httpu.conn.SetDeadline(deadline); err != nil {
return nil, err
}
}

// Handle context cancelation
done := make(chan struct{})
defer close(done)
go func() {
select {
case <-ctx.Done():
// if context is cancelled, stop any connections by setting time in the past.
httpu.conn.SetDeadline(time.Now().Add(-time.Second))
case <-done:
}
}()

// Send request.
for i := 0; i < numSends; i++ {
if n, err := httpu.conn.WriteTo(requestBuf.Bytes(), destAddr); err != nil {
Expand Down
64 changes: 62 additions & 2 deletions httpu/multiclient.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package httpu

import (
"fmt"
"net/http"
"time"

Expand Down Expand Up @@ -49,14 +50,14 @@ func (mc *MultiClient) Do(
}

func (mc *MultiClient) sendRequests(
results chan<-[]*http.Response,
results chan<- []*http.Response,
req *http.Request,
timeout time.Duration,
numSends int,
) error {
tasks := &errgroup.Group{}
for _, d := range mc.delegates {
d := d // copy for closure
d := d // copy for closure
tasks.Go(func() error {
responses, err := d.Do(req, timeout, numSends)
if err != nil {
Expand All @@ -68,3 +69,62 @@ func (mc *MultiClient) sendRequests(
}
return tasks.Wait()
}

// DoWithContext implements ClientInterfaceCtx.DoWithContext.
//
// If the underlying client(s) do not implement DoWithContext, an error is
// returned before any requests are sent.
func (mc *MultiClient) DoWithContext(
req *http.Request,
numSends int,
) ([]*http.Response, error) {
var ctxDelegates []ClientInterfaceCtx
for _, del := range mc.delegates {
if cd, ok := del.(ClientInterfaceCtx); ok {
ctxDelegates = append(ctxDelegates, cd)
} else {
return nil, fmt.Errorf("client does not implement ClientInterfaceCtx")
}
}

tasks, ctx := errgroup.WithContext(req.Context())
req = req.WithContext(ctx) // so we cancel if the errgroup errors
results := make(chan []*http.Response)

// For each client, send the request to it and collect results.
tasks.Go(func() error {
defer close(results)
return mc.sendRequestsCtx(results, ctxDelegates, req, numSends)
})

var responses []*http.Response
tasks.Go(func() error {
for rs := range results {
responses = append(responses, rs...)
}
return nil
})

return responses, tasks.Wait()
}

func (mc *MultiClient) sendRequestsCtx(
results chan<- []*http.Response,
delegates []ClientInterfaceCtx,
req *http.Request,
numSends int,
) error {
tasks := &errgroup.Group{}
for _, d := range delegates {
d := d // copy for closure
tasks.Go(func() error {
responses, err := d.DoWithContext(req, numSends)
if err != nil {
return err
}
results <- responses
return nil
})
}
return tasks.Wait()
}

0 comments on commit 981f6b1

Please sign in to comment.