Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add streaming methods to BlobStore #120

Merged
merged 1 commit into from
Sep 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion internal/pkg/blob/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,31 @@

package blob

import "context"
import (
"context"
"io"
)

// BlobStore is a public interface for Blob operations within Triton.
// Use one of the structs defined in the package.
// Currently available drivers:
// - BlobGCP: Google Cloud Storage
type BlobStore interface {
Put(ctx context.Context, path string, data []byte) error

// NewWriter creates a new object with path and returns an io.WriteCloser
// instance for the object.
// Make sure to close the writer after all operations to the writer.
NewWriter(ctx context.Context, path string) (io.WriteCloser, error)
Get(ctx context.Context, path string) ([]byte, error)

// NewReader is an alias to NewRangeReader(ctx, path, 0, -1), which creates
// a reader from the beginning of an object to EOF.
NewReader(ctx context.Context, path string) (io.ReadCloser, error)

// NewRangeReader returns an io.ReadCloser instance for the object specified by path,
// beginning at the offset-th byte and length bytes long. length = -1 means until EOF.
// Make sure to close the reader after all operations to the reader.
NewRangeReader(ctx context.Context, path string, offset, length int64) (io.ReadCloser, error)
Delete(ctx context.Context, path string) error
}
31 changes: 31 additions & 0 deletions internal/pkg/blob/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,20 @@ package blob
import (
"context"
"fmt"
"io"

"gocloud.dev/blob"
// Register the gocloud blob GCS driver
_ "gocloud.dev/blob/gcsblob"
)

type BlobGCP struct {
bucket *blob.Bucket
}

// Assert BlobGCP implements the Blob interface
var _ BlobStore = new(BlobGCP)

func NewBlobGCP(bucketURL string) (*BlobGCP, error) {
ctx := context.Background()
// blob.OpenBucket creates a *blob.Bucket from url.
Expand All @@ -48,13 +53,39 @@ func (b *BlobGCP) Put(ctx context.Context, path string, data []byte) error {
return b.bucket.WriteAll(ctx, path, data, nil)
}

// NewWriter creates a new object with path and returns an io.WriteCloser
// instance for the object. The object is not committed and visible until
// you close the writer.
func (b *BlobGCP) NewWriter(ctx context.Context, path string) (io.WriteCloser, error) {
if b.bucket == nil {
return nil, fmt.Errorf("could not find bucket for storage provider")
}
return b.bucket.NewWriter(ctx, path, nil)
}

func (b *BlobGCP) Get(ctx context.Context, path string) ([]byte, error) {
if b.bucket == nil {
return []byte{}, fmt.Errorf("could not find bucket for storage provider")
}
return b.bucket.ReadAll(ctx, path)
}

// NewReader is an alias to NewRangeReader(ctx, path, 0, -1), which creates
// a reader from the beginning of an object to EOF.
func (b *BlobGCP) NewReader(ctx context.Context, path string) (io.ReadCloser, error) {
return b.NewRangeReader(ctx, path, 0, -1)
}

// NewRangeReader returns an io.ReadCloser instance for the object specified by path,
// beginning at the offset-th byte and length bytes long. length = -1 means until EOF.
// Make sure to close the reader after all operations to the reader.
func (b *BlobGCP) NewRangeReader(ctx context.Context, path string, offset, length int64) (io.ReadCloser, error) {
if b.bucket == nil {
return nil, fmt.Errorf("could not find bucket for storage provider")
}
return b.bucket.NewRangeReader(ctx, path, offset, length, nil)
}

func (b *BlobGCP) Delete(ctx context.Context, path string) error {
if b.bucket == nil {
return fmt.Errorf("could not find bucket for storage provider")
Expand Down
59 changes: 59 additions & 0 deletions internal/pkg/blob/gcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ package blob
import (
"bytes"
"context"
"io"
"testing"

"github.com/stretchr/testify/assert"
)

func getBucket(t *testing.T) *BlobGCP {
Expand Down Expand Up @@ -83,3 +86,59 @@ func TestGCS_Delete(t *testing.T) {
t.Fatalf("Get should fail after file has been deleted, got nil")
}
}

func TestGCS_SimpleStreamTests(t *testing.T) {
ctx := context.Background()
gcs := getBucket(t)
filePath := "simple-stream-tests.txt"
testBlob := []byte("Lorem ipsum dolor sit amet, consectetur adipiscing elit")

writer, err := gcs.NewWriter(ctx, filePath)
if err != nil {
t.Fatalf("NewWriter(%q) in GCS got error: %v", filePath, err)
}
assert.NotNil(t, writer)
n, err := writer.Write(testBlob[:10])
assert.NoError(t, err)
assert.Equal(t, 10, n)
n, err = writer.Write(testBlob[10:])
assert.NoError(t, err)
assert.Equal(t, len(testBlob)-10, n)
if err := writer.Close(); err != nil {
t.Fatalf("Failed to close writer: %v", err)
}
t.Cleanup(func() { assert.NoError(t, gcs.Delete(ctx, filePath)) })

reader, err := gcs.NewReader(ctx, filePath)
if err != nil {
t.Fatalf("NewReader(%q) in GCS got error: %v", filePath, err)
}
assert.NotNil(t, reader)
readBuf := make([]byte, 1)
n, err = reader.Read(readBuf)
assert.NoError(t, err)
assert.Equal(t, 1, n)
assert.Equal(t, testBlob[:1], readBuf)
readBuf = make([]byte, 100)
n, err = reader.Read(readBuf)
assert.NoError(t, err)
assert.Equal(t, len(testBlob)-1, n)
assert.Equal(t, testBlob[1:], readBuf[:n])
n, err = reader.Read(readBuf)
assert.Zero(t, n)
assert.Equal(t, io.EOF, err)
assert.NoError(t, reader.Close())

rangeReader, err := gcs.NewRangeReader(ctx, filePath, 3, 5)
if err != nil {
t.Fatalf("NewRangeReader(%q, %q, %q) in GCS got error: %v", filePath, 3, 5, err)
}
n, err = rangeReader.Read(readBuf)
assert.NoError(t, err)
assert.Equal(t, 5, n)
assert.Equal(t, testBlob[3:3+5], readBuf[:5])
n, err = rangeReader.Read(readBuf)
assert.Zero(t, n)
assert.Equal(t, io.EOF, err)
assert.NoError(t, rangeReader.Close())
}