Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support CRC32C and MD5 checksums #309

Merged
merged 2 commits into from
Oct 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 33 additions & 6 deletions internal/app/server/open_saves.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/googleforgames/open-saves/internal/pkg/metadb"
"github.com/googleforgames/open-saves/internal/pkg/metadb/blobref"
"github.com/googleforgames/open-saves/internal/pkg/metadb/blobref/chunkref"
"github.com/googleforgames/open-saves/internal/pkg/metadb/checksums"
"github.com/googleforgames/open-saves/internal/pkg/metadb/record"
"github.com/googleforgames/open-saves/internal/pkg/metadb/store"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -253,10 +254,19 @@ func (s *openSavesServer) insertInlineBlob(ctx context.Context, stream pb.OpenSa
)
}
blob := buffer.Bytes()
digest := checksums.NewDigest()
digest.Write(blob)
checksums := digest.Checksums()
if err := checksums.ValidateIfPresent(meta); err != nil {
log.Error(err)
return err
}
// UpdateRecord also marks any associated external blob for deletion.
record, err := s.metaDB.UpdateRecord(ctx, meta.GetStoreKey(), meta.GetRecordKey(),
func(record *record.Record) (*record.Record, error) {
record.Blob = blob
record.BlobSize = size
record.Checksums = checksums
return record, nil
})
if err != nil {
Expand Down Expand Up @@ -302,6 +312,7 @@ func (s *openSavesServer) insertExternalBlob(ctx context.Context, stream pb.Open
}()

written := int64(0)
digest := checksums.NewDigest()
for {
req, err := stream.Recv()
if err == io.EOF {
Expand All @@ -321,6 +332,7 @@ func (s *openSavesServer) insertExternalBlob(ctx context.Context, stream pb.Open
return err
}
written += int64(n)
digest.Write(fragment)
}
err = writer.Close()
writer = nil
Expand All @@ -339,6 +351,11 @@ func (s *openSavesServer) insertExternalBlob(ctx context.Context, stream pb.Open
return status.Errorf(codes.DataLoss,
"Written byte length (%v) != blob length in metadata sent from client (%v)", written, meta.GetSize())
}
blobref.Checksums = digest.Checksums()
if err := blobref.ValidateIfPresent(meta); err != nil {
log.Error(err)
return err
}
record, _, err := s.metaDB.PromoteBlobRefToCurrent(ctx, blobref)
if err != nil {
log.Errorf("PromoteBlobRefToCurrent failed for object %v: %v", blobref.ObjectPath(), err)
Expand Down Expand Up @@ -382,6 +399,9 @@ func (s *openSavesServer) getExternalBlob(ctx context.Context, req *pb.GetBlobRe
return err
}

meta := blobref.ToProto()
stream.Send(&pb.GetBlobResponse{Response: &pb.GetBlobResponse_Metadata{Metadata: meta}})

reader, err := s.blobStore.NewReader(ctx, blobref.ObjectPath())
if err != nil {
log.Errorf("BlobStore.NewReader returned error for object (%v): %v", blobref.ObjectPath(), err)
Expand Down Expand Up @@ -426,15 +446,13 @@ func (s *openSavesServer) GetBlob(req *pb.GetBlobRequest, stream pb.OpenSaves_Ge
return err
}

meta := &pb.BlobMetadata{
StoreKey: req.GetStoreKey(),
RecordKey: rr.Key,
Size: rr.BlobSize,
}
stream.Send(&pb.GetBlobResponse{Response: &pb.GetBlobResponse_Metadata{Metadata: meta}})
if rr.ExternalBlob != uuid.Nil {
return s.getExternalBlob(ctx, req, stream, rr)
}

// Handle the inline blob here.
meta := rr.GetInlineBlobMetadata()
stream.Send(&pb.GetBlobResponse{Response: &pb.GetBlobResponse_Metadata{Metadata: meta}})
err = stream.Send(&pb.GetBlobResponse{Response: &pb.GetBlobResponse_Content{Content: rr.Blob}})
if err != nil {
log.Errorf("GetBlob: Stream send error for store (%v), record (%v): %v", req.GetRecordKey(), rr.Key, err)
Expand Down Expand Up @@ -508,6 +526,7 @@ func (s *openSavesServer) UploadChunk(stream pb.OpenSaves_UploadChunkServer) err
}()

written := 0
digest := checksums.NewDigest()
for {
req, err := stream.Recv()
if err == io.EOF {
Expand All @@ -526,6 +545,7 @@ func (s *openSavesServer) UploadChunk(stream pb.OpenSaves_UploadChunkServer) err
log.Errorf("UploadChunk: BlobStore write error: %v", err)
return err
}
digest.Write(fragment)
written += n
// TODO(yuryu): This is not suitable for unit tests until we make the value
// configurable, or have a BlobStore mock.
Expand All @@ -550,6 +570,13 @@ func (s *openSavesServer) UploadChunk(stream pb.OpenSaves_UploadChunkServer) err
// Update the chunk size based on the actual bytes written
// MarkChunkRefReady commits the new change to Datastore
chunk.Size = int32(written)
chunk.Checksums = digest.Checksums()

if err := chunk.ValidateIfPresent(meta); err != nil {
log.Error(err)
return err
}

if err := s.metaDB.MarkChunkRefReady(ctx, chunk); err != nil {
log.Errorf("Failed to update chunkref metadata (%v): %v", chunk.Key, err)
s.chunkRefFail(ctx, chunk)
Expand Down
38 changes: 36 additions & 2 deletions internal/app/server/open_saves_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package server

import (
"context"
"crypto/md5"
"hash/crc32"
"io"
"net"
"testing"
Expand All @@ -26,6 +28,7 @@ import (
pb "github.com/googleforgames/open-saves/api"
"github.com/googleforgames/open-saves/internal/pkg/blob"
"github.com/googleforgames/open-saves/internal/pkg/metadb/blobref"
"github.com/googleforgames/open-saves/internal/pkg/metadb/checksums"
"github.com/googleforgames/open-saves/internal/pkg/metadb/record"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -476,8 +479,11 @@ func TestOpenSaves_Ping(t *testing.T) {
assert.Equal(t, testString, pong.GetPong())
}

func createBlob(ctx context.Context, t *testing.T, client pb.OpenSavesClient,
storeKey, recordKey string, content []byte) {
// createBlobWithChecksums creates a blob with optional checksums.
// Passing nil to md5 or crc32c skips integrity checking on the server side.
// Passing an incorrect value fails the test.
func createBlobWithChecksums(ctx context.Context, t *testing.T, client pb.OpenSavesClient,
storeKey, recordKey string, content []byte, cs checksums.Checksums) {
t.Helper()

cbc, err := client.CreateBlob(ctx)
Expand All @@ -492,6 +498,9 @@ func createBlob(ctx context.Context, t *testing.T, client pb.OpenSavesClient,
StoreKey: storeKey,
RecordKey: recordKey,
Size: int64(len(content)),
Md5: cs.MD5,
Crc32C: cs.GetCRC32C(),
HasCrc32C: cs.HasCRC32C,
},
},
})
Expand Down Expand Up @@ -530,6 +539,9 @@ func createBlob(ctx context.Context, t *testing.T, client pb.OpenSavesClient,
assert.Equal(t, storeKey, meta.StoreKey)
assert.Equal(t, recordKey, meta.RecordKey)
assert.Equal(t, int64(len(content)), meta.Size)
// The server always sets the checksums regardless of what the client sends.
assert.NotEmpty(t, meta.Md5)
assert.True(t, meta.HasCrc32C)
}

t.Cleanup(func() {
Expand All @@ -541,6 +553,14 @@ func createBlob(ctx context.Context, t *testing.T, client pb.OpenSavesClient,
})
}

// createBlob calls createBlobWithChecksums with both MD5 and CRC32C.
func createBlob(ctx context.Context, t *testing.T, client pb.OpenSavesClient,
storeKey, recordKey string, content []byte) {
digest := checksums.NewDigest()
digest.Write(content)
createBlobWithChecksums(ctx, t, client, storeKey, recordKey, content, digest.Checksums())
}

func verifyBlob(ctx context.Context, t *testing.T, client pb.OpenSavesClient,
storeKey, recordKey string, expectedContent []byte) {
t.Helper()
Expand All @@ -562,6 +582,20 @@ func verifyBlob(ctx context.Context, t *testing.T, client pb.OpenSavesClient,
assert.Equal(t, storeKey, meta.StoreKey)
assert.Equal(t, recordKey, meta.RecordKey)
assert.Equal(t, int64(len(expectedContent)), meta.Size)

if len(expectedContent) > 0 {
md5 := md5.New()
md5.Write(expectedContent)
assert.Equal(t, md5.Sum(nil), meta.Md5)

crc32c := crc32.New(crc32.MakeTable(crc32.Castagnoli))
crc32c.Write(expectedContent)
assert.True(t, meta.HasCrc32C)
assert.Equal(t, crc32c.Sum32(), meta.Crc32C)
} else {
assert.Empty(t, meta.Md5)
assert.False(t, meta.HasCrc32C)
}
}

recvd := 0
Expand Down
9 changes: 9 additions & 0 deletions internal/pkg/metadb/blobref/blobref.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"cloud.google.com/go/datastore"
"github.com/google/uuid"
pb "github.com/googleforgames/open-saves/api"
"github.com/googleforgames/open-saves/internal/pkg/metadb/checksums"
"github.com/googleforgames/open-saves/internal/pkg/metadb/timestamps"
)

Expand All @@ -38,6 +39,11 @@ type BlobRef struct {
// Chunked is whether the BlobRef is chunked or not.
Chunked bool

// Checksums have checksums for each blob object associated with the BlobRef entity.
// Record.{MD5,CRC32C} must be used for inline blobs, and
// ChunkRef.{MD5,CRC32C} must be used for chunked blobs.
checksums.Checksums `datastore:",flatten"`

// Timestamps keeps track of creation and modification times and stores a randomly
// generated UUID to maintain consistency.
Timestamps timestamps.Timestamps
Expand Down Expand Up @@ -109,5 +115,8 @@ func (b *BlobRef) ToProto() *pb.BlobMetadata {
StoreKey: b.StoreKey,
RecordKey: b.RecordKey,
Size: b.Size,
Md5: b.MD5,
Crc32C: b.GetCRC32C(),
HasCrc32C: b.HasCRC32C,
}
}
22 changes: 16 additions & 6 deletions internal/pkg/metadb/blobref/blobref_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (

"cloud.google.com/go/datastore"
"github.com/google/uuid"
"github.com/googleforgames/open-saves/internal/pkg/metadb/checksums/checksumstest"
"github.com/googleforgames/open-saves/internal/pkg/metadb/timestamps"
"github.com/stretchr/testify/assert"
)

Expand All @@ -38,11 +40,14 @@ func TestBlobRef_Save(t *testing.T) {
)

blob := BlobRef{
Size: size,
Status: StatusInitializing,
StoreKey: store,
RecordKey: record,
Size: size,
Status: StatusInitializing,
StoreKey: store,
RecordKey: record,
Checksums: checksumstest.RandomChecksums(t),
Timestamps: timestamps.New(),
}

expected := []datastore.Property{
{
Name: "Size",
Expand All @@ -69,8 +74,9 @@ func TestBlobRef_Save(t *testing.T) {
assert.NoError(t, err)
if assert.NotNil(t, actual) {
assert.Equal(t, expected, actual[:len(expected)])
if assert.Equal(t, 6, len(actual)) {
assert.Equal(t, "Timestamps", actual[5].Name)
if assert.Equal(t, 9, len(actual)) {
checksumstest.AssertPropertyListMatch(t, blob.Checksums, actual[5:8])
assert.Equal(t, "Timestamps", actual[8].Name)
}
}
}
Expand Down Expand Up @@ -105,7 +111,9 @@ func TestBlobRef_Load(t *testing.T) {
Status: StatusReady,
StoreKey: store,
RecordKey: record,
Checksums: checksumstest.RandomChecksums(t),
}
properties = append(properties, checksumstest.ChecksumsToProperties(t, expected.Checksums)...)
actual := new(BlobRef)
err := actual.Load(properties)
if assert.NoError(t, err) {
Expand All @@ -126,11 +134,13 @@ func TestBlobRef_ToProto(t *testing.T) {
record = "record"
)
b := NewBlobRef(size, store, record)
b.Checksums = checksumstest.RandomChecksums(t)

proto := b.ToProto()
if assert.NotNil(t, proto) {
assert.Equal(t, b.StoreKey, proto.GetStoreKey())
assert.Equal(t, b.RecordKey, proto.GetRecordKey())
assert.Equal(t, b.Size, proto.GetSize())
checksumstest.AssertProtoEqual(t, b.Checksums, proto)
}
}
13 changes: 12 additions & 1 deletion internal/pkg/metadb/blobref/chunkref/chunkref.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
pb "github.com/googleforgames/open-saves/api"
"github.com/googleforgames/open-saves/internal/pkg/cache"
"github.com/googleforgames/open-saves/internal/pkg/metadb/blobref"
"github.com/googleforgames/open-saves/internal/pkg/metadb/checksums"
"github.com/googleforgames/open-saves/internal/pkg/metadb/timestamps"
"github.com/vmihailenco/msgpack/v5"
)
Expand All @@ -31,13 +32,16 @@ type ChunkRef struct {
Key uuid.UUID `datastore:"-"`
// BlobRef is the key of parent BlobRef.
BlobRef uuid.UUID `datastore:"-"`

// Number is the position of the chunk in the BlobRef.
Number int32
// Size is the byte size of the chunk.
Size int32
// Status is the current status of the chunk.
blobref.Status

// Checksums contains checksums for the chunk object.
checksums.Checksums `datastore:",flatten"`

// Timestamps keeps track of creation and modification times and stores a randomly
// generated UUID to maintain consistency.
Timestamps timestamps.Timestamps
Expand Down Expand Up @@ -69,10 +73,14 @@ func (c *ChunkRef) LoadKey(k *datastore.Key) error {
// Save and Load replicates the default behaviors, however, they are required
// for the KeyLoader interface.

// Load implements the Datastore PropertyLoadSaver interface and converts Datastore
// properties to corresponding struct fields.
func (c *ChunkRef) Load(ps []datastore.Property) error {
return datastore.LoadStruct(c, ps)
}

// Save implements the Datastore PropertyLoadSaver interface and converts struct fields
// to Datastore properties.
func (c *ChunkRef) Save() ([]datastore.Property, error) {
return datastore.SaveStruct(c)
}
Expand Down Expand Up @@ -127,5 +135,8 @@ func (c *ChunkRef) ToProto() *pb.ChunkMetadata {
SessionId: c.BlobRef.String(),
Number: int64(c.Number),
Size: int64(c.Size),
Md5: c.MD5,
Crc32C: c.GetCRC32C(),
HasCrc32C: c.HasCRC32C,
}
}
Loading