Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: chunk count checking #361

Merged
merged 2 commits into from
Jul 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
528 changes: 281 additions & 247 deletions api/open_saves.pb.go

Large diffs are not rendered by default.

15 changes: 12 additions & 3 deletions api/open_saves.proto
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,6 @@ message DeleteStoreRequest {
string key = 1;
}


message CreateRecordRequest {
// The key of the store to create the record into.
string store_key = 1;
Expand All @@ -343,7 +342,6 @@ message GetRecordRequest {
// QueryRecordsRequest contains conditions to search particular records.
// Multiple conditions are AND'ed together.
message QueryRecordsRequest {

// store_key is the primary key of the store.
// Optional and the method queries all stores when omitted.
string store_key = 1;
Expand Down Expand Up @@ -498,6 +496,12 @@ message BlobMetadata {

// has_crc32c indicates if crc32c is present.
bool has_crc32c = 7;

// chunked is set true if the attached blob is chunked, otherwise false (read only).
bool chunked = 8;

// Number of chunks (read only).
int64 chunk_count = 9;
}

message CreateChunkedBlobRequest {
Expand All @@ -509,6 +513,11 @@ message CreateChunkedBlobRequest {

// Size of each chunk
int64 chunk_size = 3;

// Expected number of chunks.
// When set to non-zero, the server checks if it has received the exact number of
// chunks when CommitChunkedUpload is called.
int64 chunk_count = 4;
}

message CreateChunkedBlobResponse {
Expand All @@ -533,7 +542,7 @@ message ChunkMetadata {
// session_id is the ID of a chunk upload session. Not used for downloads.
string session_id = 1;

// number is the number
// number is the number of the chunk.
int64 number = 2;

// size is a byte size of the chunk.
Expand Down
5 changes: 4 additions & 1 deletion docs/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ when creating (uploading) a new blob object.
| md5 | [bytes](#bytes) | | md5 is the MD5 hash of the blob content. If supplied for uploads, the server validates the content using the hash value. For downloads, the server returns the stored hash value of the blob content. The length of the hash value is 0 (not present) or 16 (present) bytes. |
| crc32c | [uint32](#uint32) | | crc32c is the CRC32C checksum of the blob content. Specifically, it uses the Castagnoli polynomial. https://pkg.go.dev/hash/crc32#pkg-constants If supplied for uploads, the server validates the content using the checksum. For downloads, the server returns the checksum of the blob content. Open Saves provides both MD5 and CRC32C because CRC32C is often used by Cloud object storage services. |
| has_crc32c | [bool](#bool) | | has_crc32c indicates if crc32c is present. |
| chunked | [bool](#bool) | | chunked is set true if the attached blob is chunked, otherwise false (read only). |
| chunk_count | [int64](#int64) | | Number of chunks (read only). |



Expand All @@ -164,7 +166,7 @@ when creating (uploading) a new blob object.
| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| session_id | [string](#string) | | session_id is the ID of a chunk upload session. Not used for downloads. |
| number | [int64](#int64) | | number is the number |
| number | [int64](#int64) | | number is the number of the chunk. |
| size | [int64](#int64) | | size is a byte size of the chunk. |
| hint | [Hint](#opensaves-Hint) | | Performance hints (write only). |
| md5 | [bytes](#bytes) | | md5 is the MD5 hash of the chunk content. If supplied for uploads, the server validates the content using the hash value. For downloads, the server returns the stored hash value of the chunk content. The length of the hash value is 0 (not present) or 16 (present) bytes. |
Expand Down Expand Up @@ -258,6 +260,7 @@ should contain content.
| store_key | [string](#string) | | store_key is the key of the store that the record belongs to. |
| record_key | [string](#string) | | record_key is the key of the record the new blob object belongs to. |
| chunk_size | [int64](#int64) | | Size of each chunk |
| chunk_count | [int64](#int64) | | Expected number of chunks. When set to non-zero, the server checks if it has received the exact number of chunks when CommitChunkedUpload is called. |



Expand Down
18 changes: 9 additions & 9 deletions internal/app/collector/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,13 @@ func TestCollector_DeletesBlobs(t *testing.T) {
collector := newTestCollector(ctx, t)
store := setupTestStore(ctx, t, collector)
record := setupTestRecord(ctx, t, collector, store.Key)
const numBlobRefs = 5
blobRefs := make([]*blobref.BlobRef, 0, numBlobRefs)
const blobRefCount = 5
blobRefs := make([]*blobref.BlobRef, 0, blobRefCount)

// 0 and 2 are old, to be deleted
// 1 and 3 have the applicable statuses but new
// 4 is still initializing
for i := 0; i < numBlobRefs; i++ {
for i := 0; i < blobRefCount; i++ {
blobRef := blobref.NewBlobRef(0, store.Key, record.Key)
blobRef.Timestamps.CreatedAt = collector.cfg.Before
blobRef.Timestamps.UpdatedAt = collector.cfg.Before
Expand Down Expand Up @@ -197,10 +197,10 @@ func TestCollector_DeletesUnlinkedBlobRefs(t *testing.T) {
collector := newTestCollector(ctx, t)
store := setupTestStore(ctx, t, collector)
record := setupTestRecord(ctx, t, collector, store.Key)
const numBlobRefs = 3
blobRefs := make([]*blobref.BlobRef, 0, numBlobRefs)
const blobRefCount = 3
blobRefs := make([]*blobref.BlobRef, 0, blobRefCount)
ds := newDatastoreClient(ctx, t)
for i := 0; i < numBlobRefs; i++ {
for i := 0; i < blobRefCount; i++ {
blobRef := blobref.NewBlobRef(0, store.Key, record.Key)
blobRef.Fail() // Fail() updates Timestamps so needs to come here.
blobRef.Timestamps.CreatedAt = collector.cfg.Before.Add(-1 * time.Second)
Expand All @@ -226,15 +226,15 @@ func TestCollector_DeletesUnlinkedBlobRefs(t *testing.T) {
}

func TestCollector_DeletesChunkedBlobs(t *testing.T) {
const chunkRefCount = 5

ctx := context.Background()
collector := newTestCollector(ctx, t)
store := setupTestStore(ctx, t, collector)
record := setupTestRecord(ctx, t, collector, store.Key)
blob := blobref.NewChunkedBlobRef(store.Key, record.Key)
blob := blobref.NewChunkedBlobRef(store.Key, record.Key, chunkRefCount)
ds := newDatastoreClient(ctx, t)
setupTestBlobRef(ctx, t, ds, blob)

const chunkRefCount = 5
chunks := make([]*chunkref.ChunkRef, 0, chunkRefCount)

// 0 and 2 are old, to be deleted
Expand Down
4 changes: 2 additions & 2 deletions internal/app/server/open_saves.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ func (s *openSavesServer) Ping(ctx context.Context, req *pb.PingRequest) (*pb.Pi
}

func (s *openSavesServer) CreateChunkedBlob(ctx context.Context, req *pb.CreateChunkedBlobRequest) (*pb.CreateChunkedBlobResponse, error) {
b := blobref.NewChunkedBlobRef(req.GetStoreKey(), req.GetRecordKey())
b := blobref.NewChunkedBlobRef(req.GetStoreKey(), req.GetRecordKey(), req.GetChunkCount())
b, err := s.metaDB.InsertBlobRef(ctx, b)
if err != nil {
log.Errorf("CreateChunkedBlob failed for store (%v), record (%v): %v", req.GetStoreKey(), req.GetRecordKey(), err)
Expand Down Expand Up @@ -609,7 +609,7 @@ func (s *openSavesServer) CommitChunkedUpload(ctx context.Context, req *pb.Commi
log.Errorf("Cannot retrieve chunked blob metadata for session (%v): %v", blobKey, err)
return nil, err
}
record, blob, err := s.metaDB.PromoteBlobRefToCurrent(ctx, blob)
record, _, err := s.metaDB.PromoteBlobRefToCurrent(ctx, blob)
if err != nil {
log.Errorf("PromoteBlobRefToCurrent failed for object %v: %v", blob.ObjectPath(), err)
// Do not delete the blob object here. Leave it to the garbage collector.
Expand Down
89 changes: 89 additions & 0 deletions internal/app/server/open_saves_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1557,6 +1557,95 @@ func TestOpenSaves_UploadChunkedBlob(t *testing.T) {
SessionId: sessionId,
}); assert.NoError(t, err) {
assert.Equal(t, int64(len(testChunk)*chunkCount), meta.Size)
assert.True(t, meta.Chunked)
assert.Equal(t, int64(chunkCount), meta.ChunkCount)
assert.False(t, meta.HasCrc32C)
assert.Empty(t, meta.Md5)
assert.Equal(t, store.Key, meta.StoreKey)
assert.Equal(t, record.Key, meta.RecordKey)
}

// Check if the metadata is reflected to the record as well.
if updatedRecord, err := client.GetRecord(ctx, &pb.GetRecordRequest{
StoreKey: store.Key, Key: record.Key,
}); assert.NoError(t, err) {
if assert.NotNil(t, updatedRecord) {
assert.Equal(t, int64(len(testChunk)*chunkCount), updatedRecord.BlobSize)
assert.Equal(t, int64(chunkCount), updatedRecord.ChunkCount)
assert.True(t, updatedRecord.Chunked)
assert.True(t, record.GetCreatedAt().AsTime().Equal(updatedRecord.GetCreatedAt().AsTime()))
assert.True(t, beforeCreateChunk.Before(updatedRecord.GetUpdatedAt().AsTime()))
assert.NotEqual(t, record.Signature, updatedRecord.Signature)
}
}

for i := 0; i < chunkCount; i++ {
verifyChunk(ctx, t, client, store.Key, record.Key, sessionId, int64(i), testChunk)
}

// Deletion test
if _, err := client.DeleteBlob(ctx, &pb.DeleteBlobRequest{
StoreKey: store.Key,
RecordKey: record.Key,
}); err != nil {
t.Errorf("DeleteBlob failed: %v", err)
}
verifyBlob(ctx, t, client, store.Key, record.Key, make([]byte, 0))
}

func TestOpenSaves_UploadChunkedBlobWithChunkCount(t *testing.T) {
ctx := context.Background()
_, listener := getOpenSavesServer(ctx, t, "gcp")
_, client := getTestClient(ctx, t, listener)
store := &pb.Store{Key: uuid.NewString()}
setupTestStore(ctx, t, client, store)
record := &pb.Record{Key: uuid.NewString()}
record = setupTestRecord(ctx, t, client, store.Key, record)

const chunkSize = 1025
const chunkCount = 4
testChunk := make([]byte, chunkSize)
for i := 0; i < chunkSize; i++ {
testChunk[i] = byte(i % 256)
}

beforeCreateChunk := time.Now()
var sessionId string

if res, err := client.CreateChunkedBlob(ctx, &pb.CreateChunkedBlobRequest{
StoreKey: store.Key,
RecordKey: record.Key,
ChunkSize: chunkSize,
ChunkCount: chunkCount,
}); assert.NoError(t, err) {
if assert.NotNil(t, res) {
_, err := uuid.Parse(res.SessionId)
assert.NoError(t, err)
sessionId = res.SessionId
}
}
t.Cleanup(func() {
client.DeleteBlob(ctx, &pb.DeleteBlobRequest{StoreKey: store.Key, RecordKey: record.Key})
})

for i := 0; i < chunkCount-1; i++ {
uploadChunk(ctx, t, client, sessionId, int64(i), testChunk)
// UploadChunk shouldn't update Signature.
if actual, err := client.GetRecord(ctx, &pb.GetRecordRequest{StoreKey: store.Key, Key: record.Key}); assert.NoError(t, err) {
assert.Equal(t, record.Signature, actual.Signature)
}
b, err := client.CommitChunkedUpload(ctx, &pb.CommitChunkedUploadRequest{SessionId: sessionId})
assert.Nil(t, b)
assert.Equal(t, codes.FailedPrecondition, status.Code(err))
}
uploadChunk(ctx, t, client, sessionId, chunkCount-1, testChunk)

if meta, err := client.CommitChunkedUpload(ctx, &pb.CommitChunkedUploadRequest{
SessionId: sessionId,
}); assert.NoError(t, err) {
assert.Equal(t, int64(len(testChunk)*chunkCount), meta.Size)
assert.True(t, meta.Chunked)
assert.Equal(t, int64(chunkCount), meta.ChunkCount)
assert.False(t, meta.HasCrc32C)
assert.Empty(t, meta.Md5)
assert.Equal(t, store.Key, meta.StoreKey)
Expand Down
25 changes: 16 additions & 9 deletions internal/pkg/metadb/blobref/blobref.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ type BlobRef struct {
RecordKey string
// Chunked is whether the BlobRef is chunked or not.
Chunked bool
// ChunkCount is the number of chunks that should be associated to the BlobRef.
// It is set by either the client when starting a chunk upload or
// the server when committing a chunked upload.
ChunkCount int64

// Checksums have checksums for each blob object associated with the BlobRef entity.
// Record.{MD5,CRC32C} must be used for inline blobs, and
Expand Down Expand Up @@ -95,12 +99,13 @@ func NewBlobRef(size int64, storeKey, recordKey string) *BlobRef {
}
}

// NewChunkedBlobRef creates a new BlobRef object with Size and Chunked
// set 0 and true, respectively.
// NewChunkedBlobRef creates a new BlobRef object with Size, Chunked, and
// ChunkCount set to 0, true, and chunkCount respectively.
// Other behaviors are the same as NewBlobRef
func NewChunkedBlobRef(storeKey, recordKey string) *BlobRef {
func NewChunkedBlobRef(storeKey, recordKey string, chunkCount int64) *BlobRef {
b := NewBlobRef(0, storeKey, recordKey)
b.Chunked = true
b.ChunkCount = chunkCount
return b
}

Expand All @@ -112,11 +117,13 @@ func (b *BlobRef) ObjectPath() string {
// ToProto returns a BlobMetadata representation of the object.
func (b *BlobRef) ToProto() *pb.BlobMetadata {
return &pb.BlobMetadata{
StoreKey: b.StoreKey,
RecordKey: b.RecordKey,
Size: b.Size,
Md5: b.MD5,
Crc32C: b.GetCRC32C(),
HasCrc32C: b.HasCRC32C,
StoreKey: b.StoreKey,
RecordKey: b.RecordKey,
Size: b.Size,
Md5: b.MD5,
Chunked: b.Chunked,
ChunkCount: b.ChunkCount,
Crc32C: b.GetCRC32C(),
HasCrc32C: b.HasCRC32C,
}
}
Loading