Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: remove extra writes when uploading chunked data #405

Merged
merged 4 commits into from
Oct 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions internal/app/collector/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,7 @@ const (
// TODO(yuryu): Make these configurable
testProject = "triton-for-games-dev"
testBucket = "gs://triton-integration"
testBufferSize = 1024 * 1024
testCacheAddr = "localhost:6379"
storeKind = "store"
recordKind = "record"
blobKind = "blob"
chunkKind = "chunk"
testTimeThreshold = -1 * time.Hour
Expand Down Expand Up @@ -120,10 +117,10 @@ func chunkRefKey(chunk *chunkref.ChunkRef) *datastore.Key {
datastore.NameKey(blobKind, chunk.BlobRef.String(), nil))
}

func setupTestChunkRef(ctx context.Context, t *testing.T, collector *Collector, ds *datastore.Client, chunk *chunkref.ChunkRef) {
func setupTestChunkRef(ctx context.Context, t *testing.T, collector *Collector, ds *datastore.Client, blob *blobref.BlobRef, chunk *chunkref.ChunkRef) {
t.Helper()

if err := collector.metaDB.InsertChunkRef(ctx, chunk); err != nil {
if err := collector.metaDB.InsertChunkRef(ctx, blob, chunk); err != nil {
t.Fatalf("InsertChunkRef failed: %v", err)
}
t.Cleanup(func() {
Expand Down Expand Up @@ -254,7 +251,7 @@ func TestCollector_DeletesChunkedBlobs(t *testing.T) {
chunks[3].Fail()

for _, c := range chunks {
setupTestChunkRef(ctx, t, collector, ds, c)
setupTestChunkRef(ctx, t, collector, ds, blob, c)
setupExternalBlob(ctx, t, collector, c.ObjectPath())
}
collector.run(ctx)
Expand Down
53 changes: 29 additions & 24 deletions internal/app/server/open_saves.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,13 +317,6 @@ func (s *openSavesServer) blobRefFail(ctx context.Context, blobref *blobref.Blob
}
}

func (s *openSavesServer) chunkRefFail(ctx context.Context, chunk *chunkref.ChunkRef) {
chunk.Fail()
if err := s.metaDB.UpdateChunkRef(ctx, chunk); err != nil {
log.Errorf("Failed to mark ChunkRef (%v) as Failed: %v", chunk.Key, err)
}
}

func (s *openSavesServer) insertExternalBlob(ctx context.Context, stream pb.OpenSaves_CreateBlobServer, meta *pb.BlobMetadata) error {
log.Debugf("Inserting external blob: %v\n", meta)
// Create a blob reference based on the metadata.
Expand Down Expand Up @@ -539,21 +532,23 @@ func (s *openSavesServer) UploadChunk(stream pb.OpenSaves_UploadChunkServer) err
return status.Errorf(codes.InvalidArgument, "SessionId is not a valid UUID string: %v", err)
}

// Create a chunk reference based on the metadata.
// Create a chunk reference based on the metadata. Do not add to blobref right away to minimize writes
chunk := chunkref.New(blobKey, int32(meta.GetNumber()))
if err := s.metaDB.InsertChunkRef(ctx, chunk); err != nil {
blob, err := s.metaDB.ValidateChunkRefPreconditions(ctx, chunk)
if err != nil {
return err
}

writer, err := s.blobStore.NewWriter(ctx, chunk.ObjectPath())
contextWithCancel, cancel := context.WithCancel(ctx)
writer, err := s.blobStore.NewWriter(contextWithCancel, chunk.ObjectPath())
if err != nil {
cancel()
return err
}
defer func() {
if writer != nil {
writer.Close()
// This means an abnormal exit, so make sure to mark the blob as Fail.
s.chunkRefFail(ctx, chunk)
cancel()
_ = writer.Close()
}
}()

Expand All @@ -577,7 +572,7 @@ func (s *openSavesServer) UploadChunk(stream pb.OpenSaves_UploadChunkServer) err
log.Errorf("UploadChunk: BlobStore write error: %v", err)
return err
}
digest.Write(fragment)
_, _ = digest.Write(fragment)
written += n
// TODO(yuryu): This is not suitable for unit tests until we make the value
// configurable, or have a BlobStore mock.
Expand All @@ -591,33 +586,43 @@ func (s *openSavesServer) UploadChunk(stream pb.OpenSaves_UploadChunkServer) err
writer = nil
if err != nil {
log.Errorf("writer.Close() failed on chunk object %v: %v", chunk.ObjectPath(), err)
s.chunkRefFail(ctx, chunk)
// The uploaded object can be deleted immediately.
if derr := s.blobStore.Delete(ctx, chunk.ObjectPath()); derr != nil {
log.Errorf("Delete chunk failed after writer.Close() error: %v", derr)
}
return err
}

// Update the chunk size based on the actual bytes written
// MarkChunkRefReady commits the new change to Datastore
chunk.Size = int32(written)
chunk.Checksums = digest.Checksums()

if err := chunk.Ready(); err != nil {
_ = s.deleteObjectOnExit(ctx, chunk.ObjectPath())
log.Error(err)
return err
}
chunk.Timestamps.Update()

if err := chunk.ValidateIfPresent(meta); err != nil {
_ = s.deleteObjectOnExit(ctx, chunk.ObjectPath())
log.Error(err)
return err
}

if err := s.metaDB.MarkChunkRefReady(ctx, chunk); err != nil {
log.Errorf("Failed to update chunkref metadata (%v): %v", chunk.Key, err)
s.chunkRefFail(ctx, chunk)
// Do not delete the chunk object here. Leave it to the garbage collector.
if err := s.metaDB.InsertChunkRef(ctx, blob, chunk); err != nil {
_ = s.deleteObjectOnExit(ctx, chunk.ObjectPath())
log.Errorf("Failed to insert chunkref metadata (%v), blobref (%v): %v", chunk.Key, chunk.BlobRef, err)
return err
}
return stream.SendAndClose(chunk.ToProto())
}

// deleteObjectOnExit deletes the object from GS if there are errors uploading the data or inserting a chunkref
func (s *openSavesServer) deleteObjectOnExit(ctx context.Context, path string) error {
err := s.blobStore.Delete(ctx, path)
if err != nil {
log.Errorf("Delete chunk failed after writer.Close() error: %v", err)
}
return err
}

func (s *openSavesServer) CommitChunkedUpload(ctx context.Context, req *pb.CommitChunkedUploadRequest) (*pb.BlobMetadata, error) {
blobKey, err := uuid.Parse(req.GetSessionId())
if err != nil {
Expand Down
46 changes: 17 additions & 29 deletions internal/pkg/metadb/metadb.go
Original file line number Diff line number Diff line change
Expand Up @@ -898,22 +898,26 @@ func (m *MetaDB) FindChunkRefByNumber(ctx context.Context, storeKey, recordKey s
return nil, status.Errorf(codes.NotFound, "chunk number (%v) was not found for record (%v)", number, recordKey)
}

// MarkChunkRefReady marks the specified chunk as Ready. If the current session has another chunk
// ValidateChunkRefPreconditions check if the parent blobref is chunked before attempting to upload any data
func (m *MetaDB) ValidateChunkRefPreconditions(ctx context.Context, chunk *chunkref.ChunkRef) (*blobref.BlobRef, error) {
blob, err := m.getBlobRef(ctx, nil, chunk.BlobRef)
if err != nil {
return nil, err
} else {
if !blob.Chunked {
return nil, status.Errorf(codes.FailedPrecondition, "BlobRef (%v) is not chunked", chunk.BlobRef)
}
}
return blob, nil
}

// InsertChunkRef inserts a new ChunkRef object to the datastore. If the current session has another chunk
// with the same Number, it will be marked for deletion.
func (m *MetaDB) MarkChunkRefReady(ctx context.Context, chunk *chunkref.ChunkRef) error {
func (m *MetaDB) InsertChunkRef(ctx context.Context, blob *blobref.BlobRef, chunk *chunkref.ChunkRef) error {
_, err := m.client.RunInTransaction(ctx, func(tx *ds.Transaction) error {
blob, err := m.getBlobRef(ctx, tx, chunk.BlobRef)
if err != nil {
return err
}

// Mark the chunk ready (Datastore's transaction isolation guarantees this is not visible until
// we commit the transaction).
if err := chunk.Ready(); err != nil {
return err
}
chunk.Timestamps.Update()
if err := m.mutateSingleInTransaction(tx, ds.NewUpdate(m.createChunkRefKey(blob.Key, chunk.Key), chunk)); err != nil {
mut := ds.NewInsert(m.createChunkRefKey(chunk.BlobRef, chunk.Key), chunk)
if err := m.mutateSingleInTransaction(tx, mut); err != nil {
return err
}

Expand All @@ -938,22 +942,6 @@ func (m *MetaDB) MarkChunkRefReady(ctx context.Context, chunk *chunkref.ChunkRef
return err
}

// InsertChunkRef inserts a new ChunkRef object to the datastore.
func (m *MetaDB) InsertChunkRef(ctx context.Context, chunk *chunkref.ChunkRef) error {
_, err := m.client.RunInTransaction(ctx, func(tx *ds.Transaction) error {
if blob, err := m.getBlobRef(ctx, tx, chunk.BlobRef); err != nil {
return err
} else {
if !blob.Chunked {
return status.Errorf(codes.FailedPrecondition, "BlobRef (%v) is not chunked", chunk.BlobRef)
}
}
mut := ds.NewInsert(m.createChunkRefKey(chunk.BlobRef, chunk.Key), chunk)
return m.mutateSingleInTransaction(tx, mut)
})
return err
}

// UpdateChunkRef updates a ChunkRef object with the new property values.
// Returns a NotFound error if the key is not found.
func (m *MetaDB) UpdateChunkRef(ctx context.Context, chunk *chunkref.ChunkRef) error {
Expand Down
42 changes: 17 additions & 25 deletions internal/pkg/metadb/metadb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,10 @@ import (
)

const (
storeKind = "store"
recordKind = "record"
blobKind = "blob"
chunkKind = "chunk"
timestampTestDelta = 5 * time.Second
testProject = "triton-for-games-dev"
testNamespace = "datastore-unittests"
blobKind = "blob"
chunkKind = "chunk"
testProject = "triton-for-games-dev"
testNamespace = "datastore-unittests"
)

func TestMetaDB_NewMetaDB(t *testing.T) {
Expand Down Expand Up @@ -169,9 +166,9 @@ func setupTestBlobRef(ctx context.Context, t *testing.T, metaDB *m.MetaDB, blob
return got
}

func setupTestChunkRef(ctx context.Context, t *testing.T, metaDB *m.MetaDB, chunk *chunkref.ChunkRef) {
func setupTestChunkRef(ctx context.Context, t *testing.T, metaDB *m.MetaDB, blob *blobref.BlobRef, chunk *chunkref.ChunkRef) {
t.Helper()
if err := metaDB.InsertChunkRef(ctx, chunk); err != nil {
if err := metaDB.InsertChunkRef(ctx, blob, chunk); err != nil {
t.Fatalf("InsertChunkRef() failed for chunk key (%v): %v", chunk.Key, err)
}

Expand Down Expand Up @@ -977,23 +974,21 @@ func TestMetaDB_SimpleCreateGetDeleteChunkedBlob(t *testing.T) {
chunks := []*chunkref.ChunkRef{}
for i := 0; i < testChunkCount; i++ {
chunk := chunkref.New(blob.Key, int32(i))
setupTestChunkRef(ctx, t, metaDB, chunk)
chunk.Size = testChunkSize
if err := chunk.Ready(); err != nil {
t.Fatalf("Ready() failed for chunk (%v): %v", chunk.Key, err)
}
setupTestChunkRef(ctx, t, metaDB, blob, chunk)
chunks = append(chunks, chunk)
}

// Mark the chunks ready
for _, chunk := range chunks {
chunk.Size = testChunkSize
now := time.Now()
if err := metaDB.MarkChunkRefReady(ctx, chunk); err != nil {
t.Fatalf("MarkChunkRefReady() failed for chunk (%v): %v", chunk.Key, err)
}
dsKey := chunkRefKey(chunk.BlobRef, chunk.Key)
got := new(chunkref.ChunkRef)
if err := ds.Get(ctx, dsKey, got); err != nil {
t.Errorf("Failed to get updated ChunkRef (%v): %v", dsKey, err)
} else {
assert.True(t, now.Before(got.Timestamps.UpdatedAt))
assert.Equal(t, blobref.StatusReady, got.Status)
}
}
Expand Down Expand Up @@ -1074,7 +1069,7 @@ func TestMetaDB_MarkUncommittedBlobForDeletion(t *testing.T) {
store, record, blob := setupTestStoreRecordBlobSet(ctx, t, metaDB, true)

chunk := chunkref.New(blob.Key, 0)
setupTestChunkRef(ctx, t, metaDB, chunk)
setupTestChunkRef(ctx, t, metaDB, blob, chunk)

assert.NoError(t, metaDB.MarkUncommittedBlobForDeletion(ctx, blob.Key))

Expand All @@ -1099,7 +1094,7 @@ func TestMetaDB_UpdateChunkRef(t *testing.T) {
_, _, blob := setupTestStoreRecordBlobSet(ctx, t, metaDB, true)

chunk := chunkref.New(blob.Key, 0)
setupTestChunkRef(ctx, t, metaDB, chunk)
setupTestChunkRef(ctx, t, metaDB, blob, chunk)

chunk.Fail()
assert.NoError(t, metaDB.UpdateChunkRef(ctx, chunk))
Expand All @@ -1125,13 +1120,10 @@ func TestMetaDB_MultipleChunksWithSameNumber(t *testing.T) {
chunkref.New(blob.Key, 0),
}

for _, chunk := range chunks {
setupTestChunkRef(ctx, t, metaDB, chunk)
}

for i, chunk := range chunks {
chunk.Size = int32(i)
assert.NoError(t, metaDB.MarkChunkRefReady(ctx, chunk))
assert.NoError(t, chunk.Ready())
setupTestChunkRef(ctx, t, metaDB, blob, chunk)
}

chunks[0].Status = blobref.StatusPendingDeletion
Expand Down Expand Up @@ -1177,7 +1169,7 @@ func TestMetaDB_GetChildChunkRefs(t *testing.T) {
chunks[2].Status = blobref.StatusPendingDeletion

for _, chunk := range chunks {
setupTestChunkRef(ctx, t, metaDB, chunk)
setupTestChunkRef(ctx, t, metaDB, blob, chunk)
}

cur := metaDB.GetChildChunkRefs(ctx, blob.Key)
Expand Down Expand Up @@ -1225,7 +1217,7 @@ func TestMetaDB_ListChunkRefsByStatus(t *testing.T) {
},
}
chunks = append(chunks, chunk)
setupTestChunkRef(ctx, t, metaDB, chunk)
setupTestChunkRef(ctx, t, metaDB, blob, chunk)
}

testCase := []struct {
Expand Down