Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing race condition in LeaserCheckpointer where it can fail with a ContainerAlreadyExists error #253

Merged
merged 9 commits into from
Apr 28, 2022
4 changes: 4 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Change Log

## `v3.3.17`

- Fixing issue where the LeaserCheckpointer could fail with a "ContainerAlreadyExists" error. (#253)

## `v3.3.16`

- Exporting a subset of AMQP message properties for the Dapr project.
Expand Down
2 changes: 1 addition & 1 deletion eng/azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ jobs:
go get github.com/AlekSi/gocov-xml
go get -u github.com/matm/gocov-html
go get -u golang.org/x/lint/golint
go get github.com/fzipp/gocyclo/cmd/gocyclo
go get github.com/fzipp/gocyclo/cmd/gocyclo@v0.3.1
workingDirectory: '$(sdkPath)'
displayName: 'Install Dependencies'
- script: |
Expand Down
10 changes: 5 additions & 5 deletions eng/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ steps:
go get github.com/axw/gocov/gocov
go get github.com/AlekSi/gocov-xml
go get -u github.com/matm/gocov-html
go get github.com/fzipp/gocyclo/cmd/gocyclo
go get github.com/fzipp/gocyclo/cmd/gocyclo@v0.3.1
go get golang.org/x/lint/golint
displayName: 'Install Dependencies'
- script: |
Expand All @@ -47,10 +47,10 @@ steps:
gocov-html < coverage.json > coverage.html
displayName: 'Run Integration Tests'
env:
ARM_SUBSCRIPTION_ID: $(go-live-azure-subscription-id)
ARM_CLIENT_ID: $(go-live-eh-azure-client-id)
ARM_CLIENT_SECRET: $(go-live-eh-azure-client-secret)
ARM_TENANT_ID: $(go-live-tenant-id)
ARM_SUBSCRIPTION_ID: $(azure-subscription-id)
ARM_CLIENT_ID: $(aad-azure-sdk-test-client-id)
ARM_CLIENT_SECRET: $(aad-azure-sdk-test-client-secret)
ARM_TENANT_ID: $(aad-azure-sdk-test-tenant-id)

- task: PublishTestResults@2
inputs:
Expand Down
11 changes: 5 additions & 6 deletions eph/leasedReceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (

"github.com/devigned/tab"

"github.com/Azure/azure-event-hubs-go/v3"
eventhub "github.com/Azure/azure-event-hubs-go/v3"
)

type (
Expand All @@ -58,11 +58,10 @@ func (lr *leasedReceiver) Run(ctx context.Context) error {
epoch := lr.lease.GetEpoch()
lr.dlog(ctx, "running...")

go func() {
ctx, done := context.WithCancel(context.Background())
lr.done = done
lr.periodicallyRenewLease(ctx)
}()
renewLeaseCtx, cancelRenewLease := context.WithCancel(context.Background())
lr.done = cancelRenewLease

go lr.periodicallyRenewLease(renewLeaseCtx)

opts := []eventhub.ReceiveOption{eventhub.ReceiveWithEpoch(epoch)}
if lr.processor.consumerGroup != "" {
Expand Down
39 changes: 27 additions & 12 deletions storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ import (
"encoding/json"
"errors"
"io/ioutil"
"net/http"
"net/url"
"sync"
"time"

"github.com/Azure/azure-amqp-common-go/v3/uuid"
"github.com/devigned/tab"

"github.com/Azure/azure-event-hubs-go/v3"
eventhub "github.com/Azure/azure-event-hubs-go/v3"
"github.com/Azure/azure-event-hubs-go/v3/eph"
"github.com/Azure/azure-event-hubs-go/v3/persist"

Expand Down Expand Up @@ -144,20 +145,22 @@ func (sl *LeaserCheckpointer) StoreExists(ctx context.Context) (bool, error) {
span, ctx := startConsumerSpanFromContext(ctx, "storage.LeaserCheckpointer.StoreExists")
defer span.End()

opts := azblob.ListContainersSegmentOptions{
Prefix: sl.containerName,
}
res, err := sl.serviceURL.ListContainersSegment(ctx, azblob.Marker{}, opts)
if err != nil {
return false, err
containerURL := sl.serviceURL.NewContainerURL(sl.containerName)
_, err := containerURL.GetProperties(ctx, azblob.LeaseAccessConditions{})

if err == nil {
return true, nil
}

for _, container := range res.ContainerItems {
if container.Name == sl.containerName {
return true, nil
var respErr azblob.ResponseError

if errors.As(err, &respErr) {
if respErr.Response().StatusCode == http.StatusNotFound {
return false, nil
}
}
return false, nil

return false, err
}

// EnsureStore creates the container if it does not exist
Expand All @@ -175,9 +178,21 @@ func (sl *LeaserCheckpointer) EnsureStore(ctx context.Context) error {
if !ok {
containerURL := sl.serviceURL.NewContainerURL(sl.containerName)
_, err := containerURL.Create(ctx, azblob.Metadata{}, azblob.PublicAccessNone)

if err != nil {
return err
var storageErr azblob.StorageError

if errors.As(err, &storageErr) {
// we're okay if the container has been created - we're basically racing against
// other LeaserCheckpointers.
if storageErr.ServiceCode() != azblob.ServiceCodeContainerAlreadyExists {
return err
}
} else {
return err
}
}

sl.containerURL = &containerURL
}
return nil
Expand Down
35 changes: 35 additions & 0 deletions storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ package storage
import (
"context"
"strings"
"sync"
"time"

"github.com/Azure/azure-amqp-common-go/v3/aad"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/stretchr/testify/assert"

"github.com/Azure/azure-event-hubs-go/v3/eph"
"github.com/Azure/azure-event-hubs-go/v3/internal/test"
)

const (
Expand Down Expand Up @@ -64,6 +66,35 @@ func (ts *testSuite) TestLeaserStoreCreation() {
ts.True(exists)
}

func (ts *testSuite) TestLeaserStoreCreationConcurrent() {
wg := sync.WaitGroup{}

containerName := test.RandomString("concurrent-container", 4)

// do a simple test that ensures we don't die just because we raced with
// other leasers to create the storage container.
for i := 0; i < 100; i++ {
wg.Add(1)

go func(i int) {
defer wg.Done()

leaser, _ := ts.newLeaserWithContainerName(containerName)

err := leaser.EnsureStore(context.Background())
ts.Require().NoError(err)
}(i)
}

wg.Wait()

leaser, del := ts.newLeaserWithContainerName(containerName)
defer del()
exists, err := leaser.StoreExists(context.Background())
ts.NoError(err)
ts.True(exists)
}

func (ts *testSuite) TestLeaserLeaseEnsure() {
leaser, del := ts.leaserWithEPH()
defer del()
Expand Down Expand Up @@ -189,6 +220,10 @@ func (ts *testSuite) leaserWithEPH() (*LeaserCheckpointer, func()) {

func (ts *testSuite) newLeaser() (*LeaserCheckpointer, func()) {
containerName := strings.ToLower(ts.RandomName("stortest", 4))
return ts.newLeaserWithContainerName(containerName)
}

func (ts *testSuite) newLeaserWithContainerName(containerName string) (*LeaserCheckpointer, func()) {
cred, err := NewAADSASCredential(ts.SubscriptionID, ts.ResourceGroupName, ts.AccountName, containerName, AADSASCredentialWithEnvironmentVars())
ts.Require().NoError(err)
leaser, err := NewStorageLeaserCheckpointer(cred, ts.AccountName, containerName, ts.Env)
Expand Down
2 changes: 1 addition & 1 deletion version.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ package eventhub

const (
// Version is the semantic version number
Version = "3.3.13"
Version = "3.3.17"
)