Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cache)!: add support for cache hints #123

Merged
merged 5 commits into from
Oct 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 20 additions & 21 deletions api/triton.proto
Original file line number Diff line number Diff line change
Expand Up @@ -118,23 +118,22 @@ message Record {

// Performance optimization hints for the server.
// The server may silently ignore the hints when not feasible.
enum Hint {
// Unspecified. This value must not be used.
HINT_UNSPECIFIED = 0;

// Do not cache the record for future requests.
DO_NOT_CACHE = 1;

// Skip the cache and always check the metadata server.
IGNORE_CACHE = 2;

// Always store the blob in the bulk storage
ALWAYS_USE_BULK_STORE = 3;

// Do not use the bulk storage. Always store the blob into the metadata
// entity. The server will return an error if the blob is too large. The exact
// size limit depends on the backend implementation.
DO_NOT_USE_BULK_STORE = 4;
message Hint {
// If true, do not cache the record for future requests.
bool do_not_cache = 1;

// If true, skip the cache check and always check the metadata server.
// If false, allow file size to determine cache checks.
bool skip_cache = 2;

// If true, always store the blob in blob storage, rather than in the metadata
// server. If false, allow file size to determine where to store the blob.
bool force_blob_store = 3;

// Tells the server to not use blob storage. Always store the blob into
// the metadata entity. The server will return an error if the blob is too
// large. The exact size limit depends on the backend implementation.
bool force_inline_blob = 4;
}

// Store represents an internal bucket to contain records.
Expand Down Expand Up @@ -246,7 +245,7 @@ message QueryRecordsResponse {

// Performance hints.
// Query caching is not supported at the moment
repeated Hint hints = 3;
Hint hint = 3;
}

message DeleteStoreRequest {
Expand All @@ -262,7 +261,7 @@ message CreateRecordRequest {
Record record = 2;

// Performance hints.
repeated Hint hints = 3;
Hint hint = 3;
}

message GetRecordRequest {
Expand All @@ -273,7 +272,7 @@ message GetRecordRequest {
string key = 2;

// Performance hints.
repeated Hint hints = 3;
Hint hint = 3;
}

message UpdateRecordRequest {
Expand All @@ -284,7 +283,7 @@ message UpdateRecordRequest {
Record record = 2;

// Performance hints.
repeated Hint hints = 3;
Hint hint = 3;
}

message DeleteRecordRequest {
Expand Down
115 changes: 71 additions & 44 deletions internal/app/server/triton.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ type tritonServer struct {
cacheStore cache.Cache
}

// Assert tritonServer implements tritonpb.TritonServer
var _ tritonpb.TritonServer = new(tritonServer)

// newTritonServer creates a new instance of the triton server.
func newTritonServer(ctx context.Context, cloud, project, bucket, cacheAddr string) (*tritonServer, error) {
switch cloud {
Expand Down Expand Up @@ -82,7 +85,7 @@ func (s *tritonServer) CreateStore(ctx context.Context, req *tritonpb.CreateStor
log.Warnf("CreateStore failed for store (%s): %v", store.Key, err)
return nil, status.Convert(err).Err()
}
log.Infof("Created store: %+v", store)
log.Debugf("Created store: %+v", store)
return newStore.ToProto(), nil
}

Expand All @@ -95,19 +98,10 @@ func (s *tritonServer) CreateRecord(ctx context.Context, req *tritonpb.CreateRec
return nil, status.Convert(err).Err()
}

// Update cache store.
k := cache.FormatKey(req.GetStoreKey(), req.Record.GetKey())
rp := newRecord.ToProto()
by, err := cache.EncodeRecord(rp)
if err != nil {
// Cache fails should be logged but not return error.
log.Errorf("failed to encode record for cache for key (%s): %v", k, err)
} else {
if len(by) < maxRecordSizeToCache {
if err := s.cacheStore.Set(ctx, k, by); err != nil {
log.Errorf("failed to update cache for key (%s): %v", k, err)
}
}
if shouldCache(req.Hint) {
k := cache.FormatKey(req.GetStoreKey(), req.GetRecord().GetKey())
s.storeRecordInCache(ctx, k, rp)
}
return rp, nil
}
Expand All @@ -119,7 +113,7 @@ func (s *tritonServer) DeleteRecord(ctx context.Context, req *tritonpb.DeleteRec
req.GetStoreKey(), req.GetKey(), err)
return nil, status.Convert(err).Err()
}
log.Infof("Deleted record: store (%s), record (%s)",
log.Debugf("Deleted record: store (%s), record (%s)",
req.GetStoreKey(), req.GetKey())

// Purge record from cache store.
Expand Down Expand Up @@ -159,22 +153,20 @@ func (s *tritonServer) DeleteStore(ctx context.Context, req *tritonpb.DeleteStor
log.Warnf("DeleteStore failed for store (%s): %v", req.GetKey(), err)
return nil, status.Convert(err).Err()
}
log.Infof("Deletes store: %s", req.GetKey())
log.Debugf("Deletes store: %s", req.GetKey())
return new(empty.Empty), nil
}

func (s *tritonServer) GetRecord(ctx context.Context, req *tritonpb.GetRecordRequest) (*tritonpb.Record, error) {
k := cache.FormatKey(req.GetStoreKey(), req.GetKey())
r, err := s.cacheStore.Get(ctx, k)

// Cache hit, use value from cache store.
if err == nil {
re, err := cache.DecodeRecord(r)
if shouldCheckCache(req.Hint) {
r, err := s.getRecordFromCache(ctx, k)
if err != nil {
return nil, err
log.Debug("cache miss")
} else if r != nil {
return r, nil
}
log.Infof("cache hit: %+v", re)
return re, nil
}

record, err := s.metaDB.GetRecord(ctx, req.GetStoreKey(), req.GetKey())
Expand All @@ -183,20 +175,13 @@ func (s *tritonServer) GetRecord(ctx context.Context, req *tritonpb.GetRecordReq
req.GetStoreKey(), req.GetKey(), err)
return nil, status.Convert(err).Err()
}
log.Infof("Got record %+v", record)
log.Debugf("Got record %+v", record)

// Update cache store.
rp := record.ToProto()
by, err := cache.EncodeRecord(rp)
if err != nil {
// Cache fails should be logged but not return error.
log.Errorf("failed to encode record for cache for key (%s): %v", k, err)
} else {
if len(by) < maxRecordSizeToCache {
if err := s.cacheStore.Set(ctx, k, by); err != nil {
log.Errorf("failed to update cache for key (%s): %v", k, err)
}
}

if shouldCache(req.Hint) {
s.storeRecordInCache(ctx, k, rp)
}

return rp, nil
Expand All @@ -212,18 +197,11 @@ func (s *tritonServer) UpdateRecord(ctx context.Context, req *tritonpb.UpdateRec
}

// Update cache store.
k := cache.FormatKey(req.GetStoreKey(), req.GetRecord().GetKey())
rp := newRecord.ToProto()
by, err := cache.EncodeRecord(rp)
if err != nil {
// Cache fails should be logged but not return error.
log.Errorf("failed to encode record for cache for key (%s): %v", k, err)
} else {
if len(by) < maxRecordSizeToCache {
if err := s.cacheStore.Set(ctx, k, by); err != nil {
log.Errorf("failed to update cache for key (%s): %v", k, err)
}
}

if shouldCache(req.Hint) {
k := cache.FormatKey(req.GetStoreKey(), req.GetRecord().GetKey())
s.storeRecordInCache(ctx, k, rp)
}

return rp, nil
Expand All @@ -238,3 +216,52 @@ func (s *tritonServer) Ping(ctx context.Context, req *tritonpb.PingRequest) (*tr
Pong: req.GetPing(),
}, nil
}

func (s *tritonServer) getRecordFromCache(ctx context.Context, key string) (*tritonpb.Record, error) {
r, err := s.cacheStore.Get(ctx, key)
if err != nil {
// cache miss.
return nil, err
}
// cache hit, use value from cache store.
re, err := cache.DecodeRecord(r)
if err != nil {
return nil, err
}
log.Debugf("cache hit: %+v", re)
return re, nil
}

func (s *tritonServer) storeRecordInCache(ctx context.Context, key string, rp *tritonpb.Record) {
by, err := cache.EncodeRecord(rp)
if err != nil {
// Cache fails should be logged but not return error.
log.Warnf("failed to encode record for cache for key (%s): %v", key, err)
} else {
if len(by) < maxRecordSizeToCache {
if err := s.cacheStore.Set(ctx, key, by); err != nil {
log.Warnf("failed to update cache for key (%s): %v", key, err)
}
}
}
}

// shouldCache returns whether or not triton should try to store
// the record in the cache store. Default behavior is to cache
// if hint is not specified.
func shouldCache(hint *tritonpb.Hint) bool {
if hint == nil {
return true
}
return !hint.DoNotCache
}

// shouldCheckCache returns whether or not triton should try to check
// the record in the cache store. Default behavior is to check
// the cache if hint is not specified.
func shouldCheckCache(hint *tritonpb.Hint) bool {
if hint == nil {
return true
}
return !hint.SkipCache
}
Loading