Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: return chunk size and count with GetRecord #312

Merged
5 commits merged into from
Oct 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
590 changes: 301 additions & 289 deletions api/open_saves.pb.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions api/open_saves.proto
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,9 @@ message ChunkMetadata {
message CommitChunkedUploadRequest {
// session_id is the ID of a chunked upload session to commit.
string session_id = 1;

// Performance hints.
Hint hint = 2;
}

message AbortChunkedUploadRequest {
Expand Down
5 changes: 3 additions & 2 deletions internal/app/server/open_saves.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ func (s *openSavesServer) UploadChunk(stream pb.OpenSaves_UploadChunkServer) err
// Do not delete the chunk object here. Leave it to the garbage collector.
return err
}
return stream.SendAndClose(meta)
return stream.SendAndClose(chunk.ToProto())
}

func (s *openSavesServer) CommitChunkedUpload(ctx context.Context, req *pb.CommitChunkedUploadRequest) (*pb.BlobMetadata, error) {
Expand All @@ -597,12 +597,13 @@ func (s *openSavesServer) CommitChunkedUpload(ctx context.Context, req *pb.Commi
log.Errorf("Cannot retrieve chunked blob metadata for session (%v): %v", blobKey, err)
return nil, err
}
_, blob, err = s.metaDB.PromoteBlobRefToCurrent(ctx, blob)
record, blob, err := s.metaDB.PromoteBlobRefToCurrent(ctx, blob)
if err != nil {
log.Errorf("PromoteBlobRefToCurrent failed for object %v: %v", blob.ObjectPath(), err)
// Do not delete the blob object here. Leave it to the garbage collector.
return nil, err
}
s.cacheRecord(ctx, record, req.GetHint())
return blob.ToProto(), nil
}

Expand Down
209 changes: 201 additions & 8 deletions internal/app/server/open_saves_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ package server

import (
"context"
"crypto/md5"
"hash/crc32"
"io"
"net"
"testing"
Expand Down Expand Up @@ -584,14 +582,13 @@ func verifyBlob(ctx context.Context, t *testing.T, client pb.OpenSavesClient,
assert.Equal(t, int64(len(expectedContent)), meta.Size)

if len(expectedContent) > 0 {
md5 := md5.New()
md5.Write(expectedContent)
assert.Equal(t, md5.Sum(nil), meta.Md5)
digest := checksums.NewDigest()
digest.Write(expectedContent)
checksums := digest.Checksums()

crc32c := crc32.New(crc32.MakeTable(crc32.Castagnoli))
crc32c.Write(expectedContent)
assert.Equal(t, checksums.MD5, meta.Md5)
assert.True(t, meta.HasCrc32C)
assert.Equal(t, crc32c.Sum32(), meta.Crc32C)
assert.Equal(t, checksums.GetCRC32C(), meta.Crc32C)
} else {
assert.Empty(t, meta.Md5)
assert.False(t, meta.HasCrc32C)
Expand Down Expand Up @@ -1086,3 +1083,199 @@ func TestOpenSaves_AtomicIncDecInt(t *testing.T) {
assert.Nil(t, res)
assert.Equal(t, codes.NotFound, status.Code(err))
}

func uploadChunk(ctx context.Context, t *testing.T, client pb.OpenSavesClient,
sessionId string, number int64, content []byte) {
t.Helper()

digest := checksums.NewDigest()
digest.Write(content)
cs := digest.Checksums()

ucc, err := client.UploadChunk(ctx)
if err != nil {
t.Errorf("CreateBlob returned error: %v", err)
return
}

err = ucc.Send(&pb.UploadChunkRequest{
Request: &pb.UploadChunkRequest_Metadata{
Metadata: &pb.ChunkMetadata{
SessionId: sessionId,
Number: number,
Md5: cs.MD5,
Crc32C: cs.GetCRC32C(),
HasCrc32C: cs.HasCRC32C,
},
},
})
if err != nil {
t.Errorf("UploadChunkClient.Send failed on sending metadata: %v", err)
return
}

sent := 0
for {
if sent >= len(content) {
break
}
toSend := streamBufferSize
if toSend > len(content)-sent {
toSend = len(content) - sent
}
err = ucc.Send(&pb.UploadChunkRequest{
Request: &pb.UploadChunkRequest_Content{
Content: content[sent : sent+toSend],
},
})
if err != nil {
t.Errorf("CreateBlobClient.Send failed on sending content: %v", err)
}
sent += toSend
}
assert.Equal(t, len(content), sent)

meta, err := ucc.CloseAndRecv()
if err != nil {
t.Errorf("CreateBlobClient.CloseAndRecv failed: %v", err)
return
}
if assert.NotNil(t, meta) {
assert.Equal(t, sessionId, meta.SessionId)
assert.Equal(t, number, meta.Number)
assert.Equal(t, int64(len(content)), meta.Size)
// The server always sets the checksums regardless of what the client sends.
assert.NotEmpty(t, meta.Md5)
assert.True(t, meta.HasCrc32C)
}
}

func verifyChunk(ctx context.Context, t *testing.T, client pb.OpenSavesClient,
storeKey, recordKey string, sessionId string, number int64, expectedContent []byte) {
t.Helper()
gbc, err := client.GetBlobChunk(ctx, &pb.GetBlobChunkRequest{
StoreKey: storeKey,
RecordKey: recordKey,
ChunkNumber: number,
})
if err != nil {
t.Errorf("GetBlobChunk returned error: %v", err)
return
}
res, err := gbc.Recv()
if err != nil {
t.Errorf("GetBlobChunkClient.Recv returned error: %v", err)
return
}
meta := res.GetMetadata()
if assert.NotNil(t, meta, "First returned message must be metadata") {
assert.Equal(t, sessionId, meta.SessionId)
assert.Equal(t, number, meta.Number)
assert.Equal(t, int64(len(expectedContent)), meta.Size)

if len(expectedContent) > 0 {
digest := checksums.NewDigest()
digest.Write(expectedContent)
checksums := digest.Checksums()

assert.Equal(t, checksums.MD5, meta.Md5)
assert.True(t, meta.HasCrc32C)
assert.Equal(t, checksums.GetCRC32C(), meta.Crc32C)
} else {
assert.Empty(t, meta.Md5)
assert.False(t, meta.HasCrc32C)
}
}

recvd := 0
for {
res, err = gbc.Recv()
if err == io.EOF {
break
}
if err != nil {
t.Errorf("GetBlobChunkClient.Recv returned error: %v", err)
return
}
content := res.GetContent()
if assert.NotNil(t, content, "Second returned message must be content") {
assert.Equal(t, expectedContent[recvd:recvd+len(content)], content)
recvd += len(content)
}
}
assert.Equal(t, int64(recvd), meta.Size, "Received bytes should match")
}

func TestOpenSaves_UploadChunkedBlob(t *testing.T) {
ctx := context.Background()
_, listener := getOpenSavesServer(ctx, t, "gcp")
_, client := getTestClient(ctx, t, listener)
store := &pb.Store{Key: uuid.NewString()}
setupTestStore(ctx, t, client, store)
record := &pb.Record{Key: uuid.NewString()}
setupTestRecord(ctx, t, client, store.Key, record)

const chunkSize = 1*1024*1024 + 13 // 1 Mi + 13 B
const numberOfChunks = 4
testChunk := make([]byte, chunkSize)
for i := 0; i < chunkSize; i++ {
testChunk[i] = byte(i % 256)
}

beforeCreateChunk := time.Now()
var sessionId string
if res, err := client.CreateChunkedBlob(ctx, &pb.CreateChunkedBlobRequest{
StoreKey: store.Key,
RecordKey: record.Key,
ChunkSize: chunkSize,
}); assert.NoError(t, err) {
if assert.NotNil(t, res) {
_, err := uuid.Parse(res.SessionId)
assert.NoError(t, err)
sessionId = res.SessionId
}
}
t.Cleanup(func() {
client.DeleteBlob(ctx, &pb.DeleteBlobRequest{StoreKey: store.Key, RecordKey: record.Key})
})

for i := 0; i < numberOfChunks; i++ {
uploadChunk(ctx, t, client, sessionId, int64(i), testChunk)
}

if meta, err := client.CommitChunkedUpload(ctx, &pb.CommitChunkedUploadRequest{
SessionId: sessionId,
}); assert.NoError(t, err) {
assert.Equal(t, int64(len(testChunk)*numberOfChunks), meta.Size)
assert.False(t, meta.HasCrc32C)
assert.Empty(t, meta.Md5)
assert.Equal(t, store.Key, meta.StoreKey)
assert.Equal(t, record.Key, meta.RecordKey)
}

// Check if the metadata is reflected to the record as well.
if updatedRecord, err := client.GetRecord(ctx, &pb.GetRecordRequest{
StoreKey: store.Key, Key: record.Key,
}); assert.NoError(t, err) {
if assert.NotNil(t, updatedRecord) {
assert.Equal(t, int64(len(testChunk)*numberOfChunks), updatedRecord.BlobSize)
assert.Equal(t, int64(numberOfChunks), updatedRecord.NumberOfChunks)
assert.True(t, updatedRecord.Chunked)
assert.True(t, record.GetCreatedAt().AsTime().Equal(updatedRecord.GetCreatedAt().AsTime()))
assert.True(t, beforeCreateChunk.Before(updatedRecord.GetUpdatedAt().AsTime()))
}
}

for i := 0; i < numberOfChunks; i++ {
verifyChunk(ctx, t, client, store.Key, record.Key, sessionId, int64(i), testChunk)
}

// Deletion test
if _, err := client.DeleteBlob(ctx, &pb.DeleteBlobRequest{
StoreKey: store.Key,
RecordKey: record.Key,
}); err != nil {
t.Errorf("DeleteBlob failed: %v", err)
}
verifyBlob(ctx, t, client, store.Key, record.Key, make([]byte, 0))
}
15 changes: 11 additions & 4 deletions internal/pkg/metadb/metadb.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,16 +461,17 @@ func (m *MetaDB) getReadyChunks(ctx context.Context, tx *ds.Transaction, blob *b
return chunks, nil
}

func (m *MetaDB) chunkObjectsSizeSum(ctx context.Context, tx *ds.Transaction, blob *blobref.BlobRef) (int64, error) {
// return size, number of chunks, error
func (m *MetaDB) chunkObjectsSizeSum(ctx context.Context, tx *ds.Transaction, blob *blobref.BlobRef) (int64, int64, error) {
chunks, err := m.getReadyChunks(ctx, tx, blob)
if err != nil {
return 0, err
return 0, 0, err
}
size := int64(0)
for _, chunk := range chunks {
size += int64(chunk.Size)
}
return size, nil
return size, int64(len(chunks)), nil
}

// PromoteBlobRefToCurrent promotes the provided BlobRef object as a current
Expand Down Expand Up @@ -503,11 +504,14 @@ func (m *MetaDB) PromoteBlobRefToCurrent(ctx context.Context, blob *blobref.Blob
// Update the blob size for chunked uploads
if blob.Chunked {
// TODO(yuryu): should check if chunks are continuous?
size, err := m.chunkObjectsSizeSum(ctx, tx, blob)
size, num, err := m.chunkObjectsSizeSum(ctx, tx, blob)
if err != nil {
return err
}
record.NumberOfChunks = num
blob.Size = size
} else {
record.NumberOfChunks = 0
}
if blob.Status != blobref.StatusReady {
if blob.Ready() != nil {
Expand All @@ -521,6 +525,7 @@ func (m *MetaDB) PromoteBlobRefToCurrent(ctx context.Context, blob *blobref.Blob

record.BlobSize = blob.Size
record.ExternalBlob = blob.Key
record.Chunked = blob.Chunked
record.Timestamps.Update()
if err := m.mutateSingleInTransaction(tx, ds.NewUpdate(rkey, record)); err != nil {
return err
Expand Down Expand Up @@ -552,6 +557,8 @@ func (m *MetaDB) RemoveBlobFromRecord(ctx context.Context, storeKey string, reco
}

record.BlobSize = 0
record.Chunked = false
record.NumberOfChunks = 0
if record.ExternalBlob == uuid.Nil && len(record.Blob) > 0 {
record = removeInlineBlob(record)
record.Timestamps.Update()
Expand Down
21 changes: 18 additions & 3 deletions internal/pkg/metadb/metadb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,8 @@ func TestMetaDB_SimpleCreateGetDeleteBlobRef(t *testing.T) {
assert.Nil(t, promoRecord.Blob)
assert.Equal(t, blobKey, promoRecord.ExternalBlob)
assert.Equal(t, blob.Size, promoRecord.BlobSize)
assert.Zero(t, promoRecord.Chunked)
assert.Zero(t, promoRecord.NumberOfChunks)
assert.True(t, beforePromo.Before(promoRecord.Timestamps.UpdatedAt))
assert.NotEqual(t, record.Timestamps.Signature, promoRecord.Timestamps.Signature)
}
Expand Down Expand Up @@ -545,6 +547,8 @@ func TestMetaDB_SimpleCreateGetDeleteBlobRef(t *testing.T) {
assert.Equal(t, uuid.Nil, delPendRecord.ExternalBlob)
assert.Empty(t, delPendRecord.Blob)
assert.Zero(t, delPendRecord.BlobSize)
assert.Zero(t, delPendRecord.Chunked)
assert.Zero(t, delPendRecord.NumberOfChunks)
}
if assert.NotNil(t, delPendBlob) {
assert.Equal(t, blobref.StatusPendingDeletion, delPendBlob.Status)
Expand Down Expand Up @@ -935,7 +939,7 @@ func TestMetaDB_SimpleCreateGetDeleteChunkedBlob(t *testing.T) {
metaDB := newMetaDB(ctx, t)
ds := newDatastoreClient(ctx, t)

store, record, blob := setupTestStoreRecordBlobSet(ctx, t, metaDB, true)
store, _, blob := setupTestStoreRecordBlobSet(ctx, t, metaDB, true)

// Create Chunks with Initializing state
chunks := []*chunkref.ChunkRef{}
Expand All @@ -961,10 +965,15 @@ func TestMetaDB_SimpleCreateGetDeleteChunkedBlob(t *testing.T) {
assert.Equal(t, blobref.StatusReady, got.Status)
}
}
_, blobRetrieved, err := metaDB.PromoteBlobRefToCurrent(ctx, blob)
record, blobRetrieved, err := metaDB.PromoteBlobRefToCurrent(ctx, blob)
if err != nil {
t.Fatalf("PromoteBlobRefToCurrent failed: %v", err)
}
if assert.NotNil(t, record) {
assert.EqualValues(t, testChunKSize*testChunkCount, record.BlobSize)
assert.EqualValues(t, testChunkCount, record.NumberOfChunks)
assert.True(t, record.Chunked)
}
if assert.NotNil(t, blobRetrieved) {
assert.EqualValues(t, testChunKSize*testChunkCount, blobRetrieved.Size)
}
Expand All @@ -985,8 +994,14 @@ func TestMetaDB_SimpleCreateGetDeleteChunkedBlob(t *testing.T) {
}

// Delete chunks and blob
if _, _, err := metaDB.RemoveBlobFromRecord(ctx, store.Key, record.Key); err != nil {
if record, _, err := metaDB.RemoveBlobFromRecord(ctx, store.Key, record.Key); err != nil {
t.Fatalf("RemoveBlobFromRecord should work with Chunked blobs")
} else {
if assert.NotNil(t, record) {
assert.Zero(t, record.BlobSize)
assert.False(t, record.Chunked)
assert.Zero(t, record.NumberOfChunks)
}
}

// Check if child chunks are marked as well
Expand Down
Loading