Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add new commit with upload method #402

Merged
merged 6 commits into from
Oct 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
593 changes: 303 additions & 290 deletions api/open_saves.pb.go

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion api/open_saves.proto
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ service OpenSaves {
rpc UploadChunk(stream UploadChunkRequest) returns (ChunkMetadata) {}

// CommitChunkedUpload commits a chunked blob upload session and
// makes the blob available for reads.
// makes the blob available for reads. An optional record can be passed
// to perform an update within the same transaction.
rpc CommitChunkedUpload(CommitChunkedUploadRequest) returns (BlobMetadata) {}

// AbortChunkedUploads aborts a chunked blob upload session and
Expand Down Expand Up @@ -578,6 +579,9 @@ message CommitChunkedUploadRequest {

// Performance hints.
Hint hint = 2;

// Optional record object to update during the commit operation
Record record = 4;
}

message AbortChunkedUploadRequest {
Expand Down
10 changes: 8 additions & 2 deletions api/open_saves_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion docs/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ when creating (uploading) a new blob object.
| ----- | ---- | ----- | ----------- |
| session_id | [string](#string) | | session_id is the ID of a chunked upload session to commit. |
| hint | [Hint](#opensaves-Hint) | | Performance hints. |
| record | [Record](#opensaves-Record) | | Optional record object to update during the commit operation |



Expand Down Expand Up @@ -815,7 +816,7 @@ Public interface of the Open Saves service.
| CreateBlob | [CreateBlobRequest](#opensaves-CreateBlobRequest) stream | [BlobMetadata](#opensaves-BlobMetadata) | CreateBlob adds a new blob to a record. |
| CreateChunkedBlob | [CreateChunkedBlobRequest](#opensaves-CreateChunkedBlobRequest) | [CreateChunkedBlobResponse](#opensaves-CreateChunkedBlobResponse) | CreateChunkedBlob starts a new chunked blob upload session. |
| UploadChunk | [UploadChunkRequest](#opensaves-UploadChunkRequest) stream | [ChunkMetadata](#opensaves-ChunkMetadata) | UploadChunk uploads and stores each each chunk. |
| CommitChunkedUpload | [CommitChunkedUploadRequest](#opensaves-CommitChunkedUploadRequest) | [BlobMetadata](#opensaves-BlobMetadata) | CommitChunkedUpload commits a chunked blob upload session and makes the blob available for reads. |
| CommitChunkedUpload | [CommitChunkedUploadRequest](#opensaves-CommitChunkedUploadRequest) | [BlobMetadata](#opensaves-BlobMetadata) | CommitChunkedUpload commits a chunked blob upload session and makes the blob available for reads. An optional record can be passed to perform an update within the same transaction. |
| AbortChunkedUpload | [AbortChunkedUploadRequest](#opensaves-AbortChunkedUploadRequest) | [.google.protobuf.Empty](#google-protobuf-Empty) | AbortChunkedUploads aborts a chunked blob upload session and discards temporary objects. |
| GetBlob | [GetBlobRequest](#opensaves-GetBlobRequest) | [GetBlobResponse](#opensaves-GetBlobResponse) stream | GetBlob retrieves a blob object in a record. Currently this method does not support chunked blobs and returns an UNIMPLEMENTED error if called for chunked blobs. TODO(yuryu): Support chunked blobs and return such objects entirely. |
| GetBlobChunk | [GetBlobChunkRequest](#opensaves-GetBlobChunkRequest) | [GetBlobChunkResponse](#opensaves-GetBlobChunkResponse) stream | GetBlobChunk returns a chunk of a blob object uploaded using CreateChunkedBlob. It returns an INVALID_ARGUMENT error if the blob is not a chunked object. |
Expand Down
43 changes: 37 additions & 6 deletions internal/app/server/open_saves.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,13 +629,44 @@ func (s *openSavesServer) CommitChunkedUpload(ctx context.Context, req *pb.Commi
log.Errorf("Cannot retrieve chunked blob metadata for session (%v): %v", blobKey, err)
return nil, err
}
record, _, err := s.metaDB.PromoteBlobRefToCurrent(ctx, blob)
if err != nil {
log.Errorf("PromoteBlobRefToCurrent failed for object %v: %v", blob.ObjectPath(), err)
// Do not delete the blob object here. Leave it to the garbage collector.
return nil, err
// The record in the request is optional
var updateTo *record.Record
storeKey := blob.StoreKey
pbRecord := req.GetRecord()
if pbRecord != nil {
updateTo, err = record.FromProto(storeKey, pbRecord)
if err != nil {
log.Errorf("Invalid proto for store (%s), record (%s): %v", storeKey, pbRecord.GetKey(), err)
return nil, status.Errorf(codes.InvalidArgument, "Invalid record proto: %v", err)
}
}
var updatedRecord *record.Record
if updateTo != nil {
updatedRecord, _, err = s.metaDB.PromoteBlobRefWithRecordUpdater(ctx, blob, updateTo, func(r *record.Record) (*record.Record, error) {
r.BlobSize = blob.Size
r.ExternalBlob = blob.Key
r.Chunked = blob.Chunked
r.Timestamps.Update()
r.OwnerID = updateTo.OwnerID
r.Properties = updateTo.Properties
r.Tags = updateTo.Tags
r.OpaqueString = updateTo.OpaqueString
return r, nil
})
if err != nil {
log.Errorf("PromoteBlobRefWithRecordUpdater failed for object %v: %v", blob.ObjectPath(), err)
// Do not delete the blob object here. Leave it to the garbage collector.
return nil, err
}
} else {
updatedRecord, _, err = s.metaDB.PromoteBlobRefToCurrent(ctx, blob)
if err != nil {
log.Errorf("PromoteBlobRefToCurrent failed for object %v: %v", blob.ObjectPath(), err)
// Do not delete the blob object here. Leave it to the garbage collector.
return nil, err
}
}
s.cacheRecord(ctx, record, req.GetHint())
s.cacheRecord(ctx, updatedRecord, req.GetHint())
return blob.ToProto(), nil
}

Expand Down
95 changes: 94 additions & 1 deletion internal/app/server/open_saves_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func assertEqualRecord(t *testing.T, expected, actual *pb.Record) {
assert.Equal(t, expected.BlobSize, actual.BlobSize)
assert.ElementsMatch(t, expected.Tags, actual.Tags)
assert.Equal(t, expected.OwnerId, actual.OwnerId)
// assert.Equal(t, expectecd.Properties, actual.Properties) doesn't work.
// assert.Equal(t, expected.Properties, actual.Properties) doesn't work.
// See Issue #138
assert.Equal(t, len(expected.Properties), len(actual.Properties))
for k, v := range expected.Properties {
Expand Down Expand Up @@ -1896,6 +1896,99 @@ func TestOpenSaves_UploadChunkedBlobWithChunkCount(t *testing.T) {
verifyBlob(ctx, t, client, store.Key, record.Key, make([]byte, 0))
}

func TestOpenSaves_UploadChunkedBlobWithUpdateRecord(t *testing.T) {
ctx := context.Background()
_, listener := getOpenSavesServer(ctx, t, "gcp")
_, client := getTestClient(ctx, t, listener)
store := &pb.Store{Key: uuid.NewString()}
setupTestStore(ctx, t, client, store)
record := &pb.Record{Key: uuid.NewString()}
record = setupTestRecord(ctx, t, client, store.Key, record)

const chunkSize = 1*1024*1024 + 13 // 1 Mi + 13 B
const chunkCount = 4
testChunk := make([]byte, chunkSize)
for i := 0; i < chunkSize; i++ {
testChunk[i] = byte(i % 256)
}

beforeCreateChunk := time.Now()
var sessionId string
if res, err := client.CreateChunkedBlob(ctx, &pb.CreateChunkedBlobRequest{
StoreKey: store.Key,
RecordKey: record.Key,
ChunkSize: chunkSize,
}); assert.NoError(t, err) {
if assert.NotNil(t, res) {
_, err := uuid.Parse(res.SessionId)
assert.NoError(t, err)
sessionId = res.SessionId
}
}
t.Cleanup(func() {
client.DeleteBlob(ctx, &pb.DeleteBlobRequest{StoreKey: store.Key, RecordKey: record.Key})
})

for i := 0; i < chunkCount; i++ {
uploadChunk(ctx, t, client, sessionId, int64(i), testChunk)
// UploadChunk shouldn't update Signature.
if actual, err := client.GetRecord(ctx, &pb.GetRecordRequest{StoreKey: store.Key, Key: record.Key}); assert.NoError(t, err) {
assert.Equal(t, record.Signature, actual.Signature)
}
}

updateTo := record
stringVal1 := &pb.Property_StringValue{StringValue: "foo"}
updateTo.Properties = map[string]*pb.Property{
"prop1": {
Type: pb.Property_STRING,
Value: stringVal1,
},
}

if meta, err := client.CommitChunkedUpload(ctx, &pb.CommitChunkedUploadRequest{
SessionId: sessionId,
Record: updateTo,
}); assert.NoError(t, err) {
assert.Equal(t, int64(len(testChunk)*chunkCount), meta.Size)
assert.True(t, meta.Chunked)
assert.Equal(t, int64(chunkCount), meta.ChunkCount)
assert.False(t, meta.HasCrc32C)
assert.Empty(t, meta.Md5)
assert.Equal(t, store.Key, meta.StoreKey)
assert.Equal(t, record.Key, meta.RecordKey)
}

// Check if the metadata is reflected to the record as well.
updatedRecord, err := client.GetRecord(ctx, &pb.GetRecordRequest{
StoreKey: store.Key, Key: record.Key,
})
if assert.NoError(t, err) {
if assert.NotNil(t, updatedRecord) {
assert.Equal(t, int64(len(testChunk)*chunkCount), updatedRecord.BlobSize)
assert.Equal(t, int64(chunkCount), updatedRecord.ChunkCount)
assert.True(t, updatedRecord.Chunked)
assert.True(t, record.GetCreatedAt().AsTime().Equal(updatedRecord.GetCreatedAt().AsTime()))
assert.True(t, beforeCreateChunk.Before(updatedRecord.GetUpdatedAt().AsTime()))
assert.NotEqual(t, record.Signature, updatedRecord.Signature)
assert.Equal(t, updatedRecord.Properties["prop1"].Value, stringVal1)
}
}

for i := 0; i < chunkCount; i++ {
verifyChunk(ctx, t, client, store.Key, record.Key, sessionId, int64(i), testChunk)
}

// Deletion test
if _, err := client.DeleteBlob(ctx, &pb.DeleteBlobRequest{
StoreKey: store.Key,
RecordKey: record.Key,
}); err != nil {
t.Errorf("DeleteBlob failed: %v", err)
}
verifyBlob(ctx, t, client, store.Key, record.Key, make([]byte, 0))
}

func TestOpenSaves_LongOpaqueStrings(t *testing.T) {
t.Parallel()

Expand Down
73 changes: 73 additions & 0 deletions internal/pkg/metadb/metadb.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,79 @@ func (m *MetaDB) PromoteBlobRefToCurrent(ctx context.Context, blob *blobref.Blob
return record, blob, nil
}

// PromoteBlobRefWithRecordUpdater promotes the provided BlobRef object as a current
// external blob reference and updates a record in one transaction.
// Returned errors:
// - NotFound: the specified record or the blobref was not found
// - Internal: BlobRef status transition error
func (m *MetaDB) PromoteBlobRefWithRecordUpdater(ctx context.Context, blob *blobref.BlobRef, updateTo *record.Record, updater RecordUpdater) (*record.Record, *blobref.BlobRef, error) {
record := new(record.Record)
_, err := m.client.RunInTransaction(ctx, func(tx *ds.Transaction) error {
rkey := m.createRecordKey(blob.StoreKey, blob.RecordKey)
if err := tx.Get(rkey, record); err != nil {
return err
}
if updateTo.Timestamps.Signature != uuid.Nil && record.Timestamps.Signature != updateTo.Timestamps.Signature {
return status.Errorf(codes.Aborted, "Signature mismatch: expected (%v), actual (%v)",
updateTo.Timestamps.Signature.String(), record.Timestamps.Signature.String())
}
if record.ExternalBlob == uuid.Nil {
// Simply add the new blob if previously didn't have a blob
record = removeInlineBlob(record)
} else {
// Mark previous blob for deletion
oldBlob, err := m.getBlobRef(ctx, tx, record.ExternalBlob)
if err != nil {
return err
}
record, err = m.markBlobRefForDeletion(tx, record, oldBlob, blob.Key)
if err != nil {
return err
}
}

// Update the blob size for chunked uploads
if blob.Chunked {
size, count, err := m.chunkObjectsSizeSum(ctx, tx, blob)
if err != nil {
return err
}
if blob.ChunkCount != 0 && blob.ChunkCount != count {
return status.Errorf(codes.FailedPrecondition, "expected chunk count doesn't match: expected (%v), actual (%v)", blob.ChunkCount, count)
}
blob.ChunkCount = count
record.ChunkCount = count
blob.Size = size
} else {
record.ChunkCount = 0
}
if blob.Status != blobref.StatusReady {
if blob.Ready() != nil {
return status.Error(codes.Internal, "blob is not ready to become current")
}
}
blob.Timestamps.Update()
if _, err := tx.Mutate(ds.NewUpdate(m.createBlobKey(blob.Key), blob)); err != nil {
return err
}

// Call the custom defined updater method as well to modify the record.
record, err := updater(record)
if err != nil {
return err
}
if err := m.mutateSingleInTransaction(tx, ds.NewUpdate(rkey, record)); err != nil {
return err
}

return nil
})
if err != nil {
return nil, nil, datastoreErrToGRPCStatus(err)
}
return record, blob, nil
}

// RemoveBlobFromRecord removes the ExternalBlob from the record specified by
// storeKey and recordKey. It also changes the status of the blob object to
// BlobRefStatusPendingDeletion.
Expand Down