Skip to content

Commit

Permalink
feat(cache)!: add support for cache hints (#123)
Browse files Browse the repository at this point in the history
* feat(cache)!: add support for cache hints
  • Loading branch information
hongalex authored Oct 1, 2020
1 parent 46cecb4 commit be6ec74
Show file tree
Hide file tree
Showing 3 changed files with 206 additions and 69 deletions.
41 changes: 20 additions & 21 deletions api/triton.proto
Original file line number Diff line number Diff line change
Expand Up @@ -118,23 +118,22 @@ message Record {

// Performance optimization hints for the server.
// The server may silently ignore the hints when not feasible.
enum Hint {
// Unspecified. This value must not be used.
HINT_UNSPECIFIED = 0;

// Do not cache the record for future requests.
DO_NOT_CACHE = 1;

// Skip the cache and always check the metadata server.
IGNORE_CACHE = 2;

// Always store the blob in the bulk storage
ALWAYS_USE_BULK_STORE = 3;

// Do not use the bulk storage. Always store the blob into the metadata
// entity. The server will return an error if the blob is too large. The exact
// size limit depends on the backend implementation.
DO_NOT_USE_BULK_STORE = 4;
message Hint {
// If true, do not cache the record for future requests.
bool do_not_cache = 1;

// If true, skip the cache check and always check the metadata server.
// If false, allow file size to determine cache checks.
bool skip_cache = 2;

// If true, always store the blob in blob storage, rather than in the metadata
// server. If false, allow file size to determine where to store the blob.
bool force_blob_store = 3;

// Tells the server to not use blob storage. Always store the blob into
// the metadata entity. The server will return an error if the blob is too
// large. The exact size limit depends on the backend implementation.
bool force_inline_blob = 4;
}

// Store represents an internal bucket to contain records.
Expand Down Expand Up @@ -246,7 +245,7 @@ message QueryRecordsResponse {

// Performance hints.
// Query caching is not supported at the moment
repeated Hint hints = 3;
Hint hint = 3;
}

message DeleteStoreRequest {
Expand All @@ -262,7 +261,7 @@ message CreateRecordRequest {
Record record = 2;

// Performance hints.
repeated Hint hints = 3;
Hint hint = 3;
}

message GetRecordRequest {
Expand All @@ -273,7 +272,7 @@ message GetRecordRequest {
string key = 2;

// Performance hints.
repeated Hint hints = 3;
Hint hint = 3;
}

message UpdateRecordRequest {
Expand All @@ -284,7 +283,7 @@ message UpdateRecordRequest {
Record record = 2;

// Performance hints.
repeated Hint hints = 3;
Hint hint = 3;
}

message DeleteRecordRequest {
Expand Down
115 changes: 71 additions & 44 deletions internal/app/server/triton.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ type tritonServer struct {
cacheStore cache.Cache
}

// Assert tritonServer implements tritonpb.TritonServer
var _ tritonpb.TritonServer = new(tritonServer)

// newTritonServer creates a new instance of the triton server.
func newTritonServer(ctx context.Context, cloud, project, bucket, cacheAddr string) (*tritonServer, error) {
switch cloud {
Expand Down Expand Up @@ -82,7 +85,7 @@ func (s *tritonServer) CreateStore(ctx context.Context, req *tritonpb.CreateStor
log.Warnf("CreateStore failed for store (%s): %v", store.Key, err)
return nil, status.Convert(err).Err()
}
log.Infof("Created store: %+v", store)
log.Debugf("Created store: %+v", store)
return newStore.ToProto(), nil
}

Expand All @@ -95,19 +98,10 @@ func (s *tritonServer) CreateRecord(ctx context.Context, req *tritonpb.CreateRec
return nil, status.Convert(err).Err()
}

// Update cache store.
k := cache.FormatKey(req.GetStoreKey(), req.Record.GetKey())
rp := newRecord.ToProto()
by, err := cache.EncodeRecord(rp)
if err != nil {
// Cache fails should be logged but not return error.
log.Errorf("failed to encode record for cache for key (%s): %v", k, err)
} else {
if len(by) < maxRecordSizeToCache {
if err := s.cacheStore.Set(ctx, k, by); err != nil {
log.Errorf("failed to update cache for key (%s): %v", k, err)
}
}
if shouldCache(req.Hint) {
k := cache.FormatKey(req.GetStoreKey(), req.GetRecord().GetKey())
s.storeRecordInCache(ctx, k, rp)
}
return rp, nil
}
Expand All @@ -119,7 +113,7 @@ func (s *tritonServer) DeleteRecord(ctx context.Context, req *tritonpb.DeleteRec
req.GetStoreKey(), req.GetKey(), err)
return nil, status.Convert(err).Err()
}
log.Infof("Deleted record: store (%s), record (%s)",
log.Debugf("Deleted record: store (%s), record (%s)",
req.GetStoreKey(), req.GetKey())

// Purge record from cache store.
Expand Down Expand Up @@ -159,22 +153,20 @@ func (s *tritonServer) DeleteStore(ctx context.Context, req *tritonpb.DeleteStor
log.Warnf("DeleteStore failed for store (%s): %v", req.GetKey(), err)
return nil, status.Convert(err).Err()
}
log.Infof("Deletes store: %s", req.GetKey())
log.Debugf("Deletes store: %s", req.GetKey())
return new(empty.Empty), nil
}

func (s *tritonServer) GetRecord(ctx context.Context, req *tritonpb.GetRecordRequest) (*tritonpb.Record, error) {
k := cache.FormatKey(req.GetStoreKey(), req.GetKey())
r, err := s.cacheStore.Get(ctx, k)

// Cache hit, use value from cache store.
if err == nil {
re, err := cache.DecodeRecord(r)
if shouldCheckCache(req.Hint) {
r, err := s.getRecordFromCache(ctx, k)
if err != nil {
return nil, err
log.Debug("cache miss")
} else if r != nil {
return r, nil
}
log.Infof("cache hit: %+v", re)
return re, nil
}

record, err := s.metaDB.GetRecord(ctx, req.GetStoreKey(), req.GetKey())
Expand All @@ -183,20 +175,13 @@ func (s *tritonServer) GetRecord(ctx context.Context, req *tritonpb.GetRecordReq
req.GetStoreKey(), req.GetKey(), err)
return nil, status.Convert(err).Err()
}
log.Infof("Got record %+v", record)
log.Debugf("Got record %+v", record)

// Update cache store.
rp := record.ToProto()
by, err := cache.EncodeRecord(rp)
if err != nil {
// Cache fails should be logged but not return error.
log.Errorf("failed to encode record for cache for key (%s): %v", k, err)
} else {
if len(by) < maxRecordSizeToCache {
if err := s.cacheStore.Set(ctx, k, by); err != nil {
log.Errorf("failed to update cache for key (%s): %v", k, err)
}
}

if shouldCache(req.Hint) {
s.storeRecordInCache(ctx, k, rp)
}

return rp, nil
Expand All @@ -212,18 +197,11 @@ func (s *tritonServer) UpdateRecord(ctx context.Context, req *tritonpb.UpdateRec
}

// Update cache store.
k := cache.FormatKey(req.GetStoreKey(), req.GetRecord().GetKey())
rp := newRecord.ToProto()
by, err := cache.EncodeRecord(rp)
if err != nil {
// Cache fails should be logged but not return error.
log.Errorf("failed to encode record for cache for key (%s): %v", k, err)
} else {
if len(by) < maxRecordSizeToCache {
if err := s.cacheStore.Set(ctx, k, by); err != nil {
log.Errorf("failed to update cache for key (%s): %v", k, err)
}
}

if shouldCache(req.Hint) {
k := cache.FormatKey(req.GetStoreKey(), req.GetRecord().GetKey())
s.storeRecordInCache(ctx, k, rp)
}

return rp, nil
Expand All @@ -238,3 +216,52 @@ func (s *tritonServer) Ping(ctx context.Context, req *tritonpb.PingRequest) (*tr
Pong: req.GetPing(),
}, nil
}

func (s *tritonServer) getRecordFromCache(ctx context.Context, key string) (*tritonpb.Record, error) {
r, err := s.cacheStore.Get(ctx, key)
if err != nil {
// cache miss.
return nil, err
}
// cache hit, use value from cache store.
re, err := cache.DecodeRecord(r)
if err != nil {
return nil, err
}
log.Debugf("cache hit: %+v", re)
return re, nil
}

func (s *tritonServer) storeRecordInCache(ctx context.Context, key string, rp *tritonpb.Record) {
by, err := cache.EncodeRecord(rp)
if err != nil {
// Cache fails should be logged but not return error.
log.Warnf("failed to encode record for cache for key (%s): %v", key, err)
} else {
if len(by) < maxRecordSizeToCache {
if err := s.cacheStore.Set(ctx, key, by); err != nil {
log.Warnf("failed to update cache for key (%s): %v", key, err)
}
}
}
}

// shouldCache returns whether or not triton should try to store
// the record in the cache store. Default behavior is to cache
// if hint is not specified.
func shouldCache(hint *tritonpb.Hint) bool {
if hint == nil {
return true
}
return !hint.DoNotCache
}

// shouldCheckCache returns whether or not triton should try to check
// the record in the cache store. Default behavior is to check
// the cache if hint is not specified.
func shouldCheckCache(hint *tritonpb.Hint) bool {
if hint == nil {
return true
}
return !hint.SkipCache
}
Loading

0 comments on commit be6ec74

Please sign in to comment.