Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to latest azure-amqp-common-go and go-amqp modules #287

Merged
merged 3 commits into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion changelog.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
# Change Log

## `v3.5.0`

- Updated to latest `azure-amqp-common-go` and `go-amqp` modules.

## `v3.4.1`

- Updated `golang.org/x/net` to the latest version. [#286](https://github.com/Azure/azure-event-hubs-go/pull/286)

## `v3.4.0`

-Updated to latest `azure-amqp-common-go` and `go-amqp` modules.
- Updated to latest `azure-amqp-common-go` and `go-amqp` modules.

## `v3.3.20`

Expand Down
8 changes: 5 additions & 3 deletions eph/leasedReceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,16 @@ func (lr *leasedReceiver) periodicallyRenewLease(ctx context.Context) {
select {
case <-ctx.Done():
return
default:
skew := time.Duration(rand.Intn(1000)-500) * time.Millisecond
time.Sleep(DefaultLeaseRenewalInterval + skew)
case <-time.After(DefaultLeaseRenewalInterval + (time.Duration(rand.Intn(1000)-500) * time.Millisecond)):
err := lr.tryRenew(ctx)
if err != nil {
tab.For(ctx).Error(err)
lease := lr.getLease()
// the passed in context gets cancelled when we want the periodic lease renewal to stop.
// we can't pass it to stopReceiver() as that's guaranteed to not work.
ctx, cancel := context.WithTimeout(context.Background(), timeout)
_ = lr.processor.scheduler.stopReceiver(ctx, lease)
cancel()
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion eph/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
"github.com/devigned/tab"
)

var (
const (
timeout = 60 * time.Second
)

Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ module github.com/Azure/azure-event-hubs-go/v3
go 1.18

require (
github.com/Azure/azure-amqp-common-go/v4 v4.0.0
github.com/Azure/azure-amqp-common-go/v4 v4.1.0
github.com/Azure/azure-pipeline-go v0.2.3
github.com/Azure/azure-sdk-for-go v65.0.0+incompatible
github.com/Azure/azure-storage-blob-go v0.15.0
github.com/Azure/go-amqp v0.18.0
github.com/Azure/go-amqp v0.19.1
github.com/Azure/go-autorest/autorest v0.11.28
github.com/Azure/go-autorest/autorest/adal v0.9.21
github.com/Azure/go-autorest/autorest/azure/auth v0.4.2
Expand Down Expand Up @@ -36,7 +36,7 @@ require (
github.com/mattn/go-ieproxy v0.0.1 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa // indirect
golang.org/x/crypto v0.7.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/term v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
Expand Down
11 changes: 6 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
github.com/Azure/azure-amqp-common-go/v4 v4.0.0 h1:mV5O74KYmonn0ZXtwfMjGUtZ9Z+Hv7AIFVS1s03sRvo=
github.com/Azure/azure-amqp-common-go/v4 v4.0.0/go.mod h1:4+qRvizIo4+CbGG552O6a8ONkEwRgWXqes3SUt1Ftrc=
github.com/Azure/azure-amqp-common-go/v4 v4.1.0 h1:gcS6P4q/Qv1nmdq1IWoU3mLYlHnvNxAhVjxReEFmSz8=
github.com/Azure/azure-amqp-common-go/v4 v4.1.0/go.mod h1:HDiTPilyFCWPOT8dBeSjGztqgrW27LctWs/4p6nR9FY=
github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVtCdfBE21U=
github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k=
github.com/Azure/azure-sdk-for-go v65.0.0+incompatible h1:HzKLt3kIwMm4KeJYTdx9EbjRYTySD/t8i1Ee/W5EGXw=
github.com/Azure/azure-sdk-for-go v65.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-storage-blob-go v0.15.0 h1:rXtgp8tN1p29GvpGgfJetavIG0V7OgcSXPpwp3tx6qk=
github.com/Azure/azure-storage-blob-go v0.15.0/go.mod h1:vbjsVbX0dlxnRc4FFMPsS9BsJWPcne7GB7onqlPvz58=
github.com/Azure/go-amqp v0.18.0 h1:95bTiJq0oxjK1RUlt5T3HF/THj6jWTRZpSXMPSOJLz8=
github.com/Azure/go-amqp v0.18.0/go.mod h1:+bg0x3ce5+Q3ahCEXnCsGG3ETpDQe3MEVnOuT2ywPwc=
github.com/Azure/go-amqp v0.19.1 h1:S1l3HiSMU7Rhko2f70lBH6Vd0mLj5UZiTWC6xKY5Kho=
github.com/Azure/go-amqp v0.19.1/go.mod h1:+bg0x3ce5+Q3ahCEXnCsGG3ETpDQe3MEVnOuT2ywPwc=
github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs=
github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
github.com/Azure/go-autorest/autorest v0.9.0/go.mod h1:xyHB1BMZT0cuDHU7I0+g046+BFDTQ8rEZB0s4Yfa6bI=
Expand Down Expand Up @@ -97,8 +97,9 @@ golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa h1:zuSxTR4o9y82ebqCUJYNGJbGPo6sKVl54f/TVDObg1c=
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.7.0 h1:AvwMYaRytfdeVt3u6mLaxYtErKYjxA2OXjJ1HHq6t3A=
golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20191112182307-2180aed22343/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
Expand Down
10 changes: 5 additions & 5 deletions hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ func (h *Hub) GetRuntimeInformation(ctx context.Context) (*HubRuntimeInformation
span, ctx := h.startSpanFromContext(ctx, "eh.Hub.GetRuntimeInformation")
defer span.End()
client := newClient(h.namespace, h.name)
c, err := h.namespace.newConnection()
c, err := h.namespace.newConnection(ctx)
if err != nil {
tab.For(ctx).Error(err)
return nil, err
Expand All @@ -525,7 +525,7 @@ func (h *Hub) GetPartitionInformation(ctx context.Context, partitionID string) (
span, ctx := h.startSpanFromContext(ctx, "eh.Hub.GetPartitionInformation")
defer span.End()
client := newClient(h.namespace, h.name)
c, err := h.namespace.newConnection()
c, err := h.namespace.newConnection(ctx)
if err != nil {
tab.For(ctx).Error(err)
return nil, err
Expand Down Expand Up @@ -776,9 +776,9 @@ func (h *Hub) getSender(ctx context.Context) (*sender, error) {
}

func isRecoverableCloseError(err error) bool {
var detachError *amqp.DetachError
// an *amqp.DetachError with a nil RemoteErr means that the link was closed client-side
return isConnectionClosed(err) || isSessionClosed(err) || (errors.As(err, &detachError) && detachError.RemoteErr != nil)
var linkError *amqp.LinkError
// an *amqp.LinkError with a nil RemoteErr means that the link was closed client-side
return isConnectionClosed(err) || isSessionClosed(err) || (errors.As(err, &linkError) && linkError.RemoteErr != nil)
}

func isConnectionClosed(err error) bool {
Expand Down
4 changes: 2 additions & 2 deletions hub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -768,8 +768,8 @@ func TestNewHub_withAzureEnvironmentVariable(t *testing.T) {
}

func TestIsRecoverableCloseError(t *testing.T) {
require.True(t, isRecoverableCloseError(&amqp.DetachError{RemoteErr: &amqp.Error{}}))
require.True(t, isRecoverableCloseError(&amqp.LinkError{RemoteErr: &amqp.Error{}}))

// if the caller closes a link we shouldn't reopen or create a new one to replace it
require.False(t, isRecoverableCloseError(&amqp.DetachError{}))
require.False(t, isRecoverableCloseError(&amqp.LinkError{}))
}
6 changes: 3 additions & 3 deletions namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func newNamespace(opts ...namespaceOption) (*namespace, error) {
return ns, nil
}

func (ns *namespace) newConnection() (*amqp.Conn, error) {
func (ns *namespace) newConnection(ctx context.Context) (*amqp.Conn, error) {
host := ns.getAmqpsHostURI()

defaultConnOptions := amqp.ConnOptions{
Expand All @@ -112,10 +112,10 @@ func (ns *namespace) newConnection() (*amqp.Conn, error) {

wssConn.PayloadType = websocket.BinaryFrame
defaultConnOptions.HostName = trimmedHost
return amqp.NewConn(wssConn, &defaultConnOptions)
return amqp.NewConn(ctx, wssConn, &defaultConnOptions)
}

return amqp.Dial(host, &defaultConnOptions)
return amqp.Dial(ctx, host, &defaultConnOptions)
}

func (ns *namespace) negotiateClaim(ctx context.Context, conn *amqp.Conn, entityPath string) error {
Expand Down
10 changes: 5 additions & 5 deletions receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,8 @@ func (r *receiver) listenForMessages(ctx context.Context, msgChan chan *amqp.Mes
tab.For(ctx).Debug("context done")
return
default:
var detachErr *amqp.DetachError
if errors.As(err, &detachErr) && detachErr.RemoteErr != nil && detachErr.RemoteErr.Condition == "amqp:link:stolen" {
var linkError *amqp.LinkError
if errors.As(err, &linkError) && linkError.RemoteErr != nil && linkError.RemoteErr.Condition == "amqp:link:stolen" {
tab.For(ctx).Debug("link has been stolen by a higher epoch")
_ = r.Close(ctx)
return
Expand Down Expand Up @@ -349,7 +349,7 @@ func (r *receiver) listenForMessage(ctx context.Context) (*amqp.Message, error)
span, ctx := r.startConsumerSpanFromContext(ctx, "eh.receiver.listenForMessage")
defer span.End()

msg, err := r.receiver.Receive(ctx)
msg, err := r.receiver.Receive(ctx, nil)
if err != nil {
tab.For(ctx).Debug(err.Error())
return nil, err
Expand All @@ -367,7 +367,7 @@ func (r *receiver) newSessionAndLink(ctx context.Context) error {
span, ctx := r.startConsumerSpanFromContext(ctx, "eh.receiver.newSessionAndLink")
defer span.End()

connection, err := r.hub.namespace.newConnection()
connection, err := r.hub.namespace.newConnection(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -402,7 +402,7 @@ func (r *receiver) newSessionAndLink(ctx context.Context) error {
}

opts := amqp.ReceiverOptions{
Credit: r.prefetchCount,
Credit: int32(r.prefetchCount),
SettlementMode: amqp.ReceiverSettleModeFirst.Ptr(),
Filters: []amqp.LinkFilter{amqp.NewSelectorFilter(offsetExpression)},
}
Expand Down
7 changes: 3 additions & 4 deletions sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type (
// Implemented by *amqp.Sender
amqpSender interface {
LinkName() string
Send(ctx context.Context, msg *amqp.Message) error
Send(ctx context.Context, msg *amqp.Message, opts *amqp.SendOptions) error
Close(ctx context.Context) error
}

Expand Down Expand Up @@ -297,7 +297,7 @@ func sendMessage(ctx context.Context, getAmqpSender getAmqpSender, maxRetries in
return ctx.Err()
default:
sender := getAmqpSender()
err := sender.Send(ctx, msg)
err := sender.Send(ctx, msg, nil)
if err == nil {
return err
}
Expand Down Expand Up @@ -346,7 +346,7 @@ func (s *sender) newSessionAndLink(ctx context.Context) error {
span, ctx := s.startProducerSpanFromContext(ctx, "eh.sender.newSessionAndLink")
defer span.End()

connection, err := s.hub.namespace.newConnection()
connection, err := s.hub.namespace.newConnection(ctx)
if err != nil {
tab.For(ctx).Error(err)
return err
Expand All @@ -366,7 +366,6 @@ func (s *sender) newSessionAndLink(ctx context.Context) error {
}

amqpSender, err := amqpSession.NewSender(ctx, s.getAddress(), &amqp.SenderOptions{
IgnoreDispositionErrors: true,
SettlementMode: amqp.SenderSettleModeMixed.Ptr(),
RequestedReceiverSettleMode: amqp.ReceiverSettleModeFirst.Ptr(),
})
Expand Down
22 changes: 11 additions & 11 deletions sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (s *testAmqpSender) LinkName() string {
return "sender-id"
}

func (s *testAmqpSender) Send(ctx context.Context, msg *amqp.Message) error {
func (s *testAmqpSender) Send(ctx context.Context, msg *amqp.Message, opts *amqp.SendOptions) error {
var err error

if len(s.sendErrors) > s.sendCount {
Expand Down Expand Up @@ -71,7 +71,7 @@ func TestSenderRetries(t *testing.T) {
recoverCalls = nil
sender = &testAmqpSender{
sendErrors: []error{
&amqp.DetachError{RemoteErr: &amqp.Error{Condition: amqp.ErrCondDetachForced}},
&amqp.LinkError{RemoteErr: &amqp.Error{Condition: amqp.ErrCondDetachForced}},
&amqp.SessionError{},
errors.New("We'll never attempt to use this one since we ran out of retries")},
}
Expand All @@ -85,7 +85,7 @@ func TestSenderRetries(t *testing.T) {
assert.EqualValues(t, []recoveryCall{
{
linkID: "sender-id",
err: &amqp.DetachError{RemoteErr: &amqp.Error{Condition: amqp.ErrCondDetachForced}},
err: &amqp.LinkError{RemoteErr: &amqp.Error{Condition: amqp.ErrCondDetachForced}},
recover: true,
},
{
Expand Down Expand Up @@ -117,14 +117,14 @@ func TestSenderRetries(t *testing.T) {
recoverCalls = nil
sender = &testAmqpSender{
sendErrors: []error{
&amqp.DetachError{}, // this is no longer considered a retryable error (ErrLinkDetached is, however)
&amqp.LinkError{}, // this is no longer considered a retryable error (ErrLinkDetached is, however)
},
}

actualErr := sendMessage(context.TODO(), getAmqpSender, 5, nil, recover)

var detachErr *amqp.DetachError
assert.ErrorAs(t, actualErr, &detachErr)
var linkErr *amqp.LinkError
assert.ErrorAs(t, actualErr, &linkErr)
assert.EqualValues(t, 1, sender.sendCount)
assert.Empty(t, recoverCalls, "No recovery attempts should happen for non-recoverable errors")
})
Expand Down Expand Up @@ -177,7 +177,7 @@ func TestSenderRetries(t *testing.T) {
recoverCalls = nil
sender = &testAmqpSender{
sendErrors: []error{
&amqp.DetachError{RemoteErr: &amqp.Error{Condition: amqp.ErrCondDetachForced}},
&amqp.LinkError{RemoteErr: &amqp.Error{Condition: amqp.ErrCondDetachForced}},
&net.DNSError{},
},
}
Expand All @@ -188,7 +188,7 @@ func TestSenderRetries(t *testing.T) {
assert.EqualValues(t, []recoveryCall{
{
linkID: "sender-id",
err: &amqp.DetachError{RemoteErr: &amqp.Error{Condition: amqp.ErrCondDetachForced}},
err: &amqp.LinkError{RemoteErr: &amqp.Error{Condition: amqp.ErrCondDetachForced}},
recover: true,
},
{
Expand All @@ -204,7 +204,7 @@ func TestSenderRetries(t *testing.T) {
sender = &testAmqpSender{
sendErrors: []error{
&amqp.ConnError{},
&amqp.DetachError{RemoteErr: &amqp.Error{Condition: amqp.ErrCondDetachForced}},
&amqp.LinkError{RemoteErr: &amqp.Error{Condition: amqp.ErrCondDetachForced}},
&amqp.SessionError{},
},
}
Expand All @@ -220,7 +220,7 @@ func TestSenderRetries(t *testing.T) {
},
{
linkID: "sender-id",
err: &amqp.DetachError{RemoteErr: &amqp.Error{Condition: amqp.ErrCondDetachForced}},
err: &amqp.LinkError{RemoteErr: &amqp.Error{Condition: amqp.ErrCondDetachForced}},
recover: true,
},
{
Expand Down Expand Up @@ -369,7 +369,7 @@ func (s *fakeSender) LinkName() string {
return "the-actual-link-id"
}

func (s *fakeSender) Send(ctx context.Context, msg *amqp.Message) error {
func (s *fakeSender) Send(ctx context.Context, msg *amqp.Message, opts *amqp.SendOptions) error {
return nil
}
func (s *fakeSender) Close(ctx context.Context) error {
Expand Down
2 changes: 1 addition & 1 deletion version.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ package eventhub

const (
// Version is the semantic version number
Version = "3.4.0"
Version = "3.5.0"
)