From be6ec74d3c10bc98af0bdaa001d5fab2591c821a Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Thu, 1 Oct 2020 15:20:42 -0700 Subject: [PATCH] feat(cache)!: add support for cache hints (#123) * feat(cache)!: add support for cache hints --- api/triton.proto | 41 +++++----- internal/app/server/triton.go | 115 +++++++++++++++++----------- internal/app/server/triton_test.go | 119 ++++++++++++++++++++++++++++- 3 files changed, 206 insertions(+), 69 deletions(-) diff --git a/api/triton.proto b/api/triton.proto index 189932fc..6de14101 100644 --- a/api/triton.proto +++ b/api/triton.proto @@ -118,23 +118,22 @@ message Record { // Performance optimization hints for the server. // The server may silently ignore the hints when not feasible. -enum Hint { - // Unspecified. This value must not be used. - HINT_UNSPECIFIED = 0; - - // Do not cache the record for future requests. - DO_NOT_CACHE = 1; - - // Skip the cache and always check the metadata server. - IGNORE_CACHE = 2; - - // Always store the blob in the bulk storage - ALWAYS_USE_BULK_STORE = 3; - - // Do not use the bulk storage. Always store the blob into the metadata - // entity. The server will return an error if the blob is too large. The exact - // size limit depends on the backend implementation. - DO_NOT_USE_BULK_STORE = 4; +message Hint { + // If true, do not cache the record for future requests. + bool do_not_cache = 1; + + // If true, skip the cache check and always check the metadata server. + // If false, allow file size to determine cache checks. + bool skip_cache = 2; + + // If true, always store the blob in blob storage, rather than in the metadata + // server. If false, allow file size to determine where to store the blob. + bool force_blob_store = 3; + + // Tells the server to not use blob storage. Always store the blob into + // the metadata entity. The server will return an error if the blob is too + // large. The exact size limit depends on the backend implementation. + bool force_inline_blob = 4; } // Store represents an internal bucket to contain records. @@ -246,7 +245,7 @@ message QueryRecordsResponse { // Performance hints. // Query caching is not supported at the moment - repeated Hint hints = 3; + Hint hint = 3; } message DeleteStoreRequest { @@ -262,7 +261,7 @@ message CreateRecordRequest { Record record = 2; // Performance hints. - repeated Hint hints = 3; + Hint hint = 3; } message GetRecordRequest { @@ -273,7 +272,7 @@ message GetRecordRequest { string key = 2; // Performance hints. - repeated Hint hints = 3; + Hint hint = 3; } message UpdateRecordRequest { @@ -284,7 +283,7 @@ message UpdateRecordRequest { Record record = 2; // Performance hints. - repeated Hint hints = 3; + Hint hint = 3; } message DeleteRecordRequest { diff --git a/internal/app/server/triton.go b/internal/app/server/triton.go index f331bc46..48aa0694 100644 --- a/internal/app/server/triton.go +++ b/internal/app/server/triton.go @@ -39,6 +39,9 @@ type tritonServer struct { cacheStore cache.Cache } +// Assert tritonServer implements tritonpb.TritonServer +var _ tritonpb.TritonServer = new(tritonServer) + // newTritonServer creates a new instance of the triton server. func newTritonServer(ctx context.Context, cloud, project, bucket, cacheAddr string) (*tritonServer, error) { switch cloud { @@ -82,7 +85,7 @@ func (s *tritonServer) CreateStore(ctx context.Context, req *tritonpb.CreateStor log.Warnf("CreateStore failed for store (%s): %v", store.Key, err) return nil, status.Convert(err).Err() } - log.Infof("Created store: %+v", store) + log.Debugf("Created store: %+v", store) return newStore.ToProto(), nil } @@ -95,19 +98,10 @@ func (s *tritonServer) CreateRecord(ctx context.Context, req *tritonpb.CreateRec return nil, status.Convert(err).Err() } - // Update cache store. - k := cache.FormatKey(req.GetStoreKey(), req.Record.GetKey()) rp := newRecord.ToProto() - by, err := cache.EncodeRecord(rp) - if err != nil { - // Cache fails should be logged but not return error. - log.Errorf("failed to encode record for cache for key (%s): %v", k, err) - } else { - if len(by) < maxRecordSizeToCache { - if err := s.cacheStore.Set(ctx, k, by); err != nil { - log.Errorf("failed to update cache for key (%s): %v", k, err) - } - } + if shouldCache(req.Hint) { + k := cache.FormatKey(req.GetStoreKey(), req.GetRecord().GetKey()) + s.storeRecordInCache(ctx, k, rp) } return rp, nil } @@ -119,7 +113,7 @@ func (s *tritonServer) DeleteRecord(ctx context.Context, req *tritonpb.DeleteRec req.GetStoreKey(), req.GetKey(), err) return nil, status.Convert(err).Err() } - log.Infof("Deleted record: store (%s), record (%s)", + log.Debugf("Deleted record: store (%s), record (%s)", req.GetStoreKey(), req.GetKey()) // Purge record from cache store. @@ -159,22 +153,20 @@ func (s *tritonServer) DeleteStore(ctx context.Context, req *tritonpb.DeleteStor log.Warnf("DeleteStore failed for store (%s): %v", req.GetKey(), err) return nil, status.Convert(err).Err() } - log.Infof("Deletes store: %s", req.GetKey()) + log.Debugf("Deletes store: %s", req.GetKey()) return new(empty.Empty), nil } func (s *tritonServer) GetRecord(ctx context.Context, req *tritonpb.GetRecordRequest) (*tritonpb.Record, error) { k := cache.FormatKey(req.GetStoreKey(), req.GetKey()) - r, err := s.cacheStore.Get(ctx, k) - // Cache hit, use value from cache store. - if err == nil { - re, err := cache.DecodeRecord(r) + if shouldCheckCache(req.Hint) { + r, err := s.getRecordFromCache(ctx, k) if err != nil { - return nil, err + log.Debug("cache miss") + } else if r != nil { + return r, nil } - log.Infof("cache hit: %+v", re) - return re, nil } record, err := s.metaDB.GetRecord(ctx, req.GetStoreKey(), req.GetKey()) @@ -183,20 +175,13 @@ func (s *tritonServer) GetRecord(ctx context.Context, req *tritonpb.GetRecordReq req.GetStoreKey(), req.GetKey(), err) return nil, status.Convert(err).Err() } - log.Infof("Got record %+v", record) + log.Debugf("Got record %+v", record) // Update cache store. rp := record.ToProto() - by, err := cache.EncodeRecord(rp) - if err != nil { - // Cache fails should be logged but not return error. - log.Errorf("failed to encode record for cache for key (%s): %v", k, err) - } else { - if len(by) < maxRecordSizeToCache { - if err := s.cacheStore.Set(ctx, k, by); err != nil { - log.Errorf("failed to update cache for key (%s): %v", k, err) - } - } + + if shouldCache(req.Hint) { + s.storeRecordInCache(ctx, k, rp) } return rp, nil @@ -212,18 +197,11 @@ func (s *tritonServer) UpdateRecord(ctx context.Context, req *tritonpb.UpdateRec } // Update cache store. - k := cache.FormatKey(req.GetStoreKey(), req.GetRecord().GetKey()) rp := newRecord.ToProto() - by, err := cache.EncodeRecord(rp) - if err != nil { - // Cache fails should be logged but not return error. - log.Errorf("failed to encode record for cache for key (%s): %v", k, err) - } else { - if len(by) < maxRecordSizeToCache { - if err := s.cacheStore.Set(ctx, k, by); err != nil { - log.Errorf("failed to update cache for key (%s): %v", k, err) - } - } + + if shouldCache(req.Hint) { + k := cache.FormatKey(req.GetStoreKey(), req.GetRecord().GetKey()) + s.storeRecordInCache(ctx, k, rp) } return rp, nil @@ -238,3 +216,52 @@ func (s *tritonServer) Ping(ctx context.Context, req *tritonpb.PingRequest) (*tr Pong: req.GetPing(), }, nil } + +func (s *tritonServer) getRecordFromCache(ctx context.Context, key string) (*tritonpb.Record, error) { + r, err := s.cacheStore.Get(ctx, key) + if err != nil { + // cache miss. + return nil, err + } + // cache hit, use value from cache store. + re, err := cache.DecodeRecord(r) + if err != nil { + return nil, err + } + log.Debugf("cache hit: %+v", re) + return re, nil +} + +func (s *tritonServer) storeRecordInCache(ctx context.Context, key string, rp *tritonpb.Record) { + by, err := cache.EncodeRecord(rp) + if err != nil { + // Cache fails should be logged but not return error. + log.Warnf("failed to encode record for cache for key (%s): %v", key, err) + } else { + if len(by) < maxRecordSizeToCache { + if err := s.cacheStore.Set(ctx, key, by); err != nil { + log.Warnf("failed to update cache for key (%s): %v", key, err) + } + } + } +} + +// shouldCache returns whether or not triton should try to store +// the record in the cache store. Default behavior is to cache +// if hint is not specified. +func shouldCache(hint *tritonpb.Hint) bool { + if hint == nil { + return true + } + return !hint.DoNotCache +} + +// shouldCheckCache returns whether or not triton should try to check +// the record in the cache store. Default behavior is to check +// the cache if hint is not specified. +func shouldCheckCache(hint *tritonpb.Hint) bool { + if hint == nil { + return true + } + return !hint.SkipCache +} diff --git a/internal/app/server/triton_test.go b/internal/app/server/triton_test.go index 998bc8bc..23f313f8 100644 --- a/internal/app/server/triton_test.go +++ b/internal/app/server/triton_test.go @@ -22,6 +22,7 @@ import ( "github.com/google/uuid" pb "github.com/googleforgames/triton/api" + "github.com/googleforgames/triton/internal/pkg/cache" "github.com/stretchr/testify/assert" "google.golang.org/grpc" "google.golang.org/grpc/test/bufconn" @@ -39,7 +40,7 @@ const ( timestampDelta = 10 * time.Second ) -func getTestServer(ctx context.Context, t *testing.T, cloud string) (*grpc.Server, *bufconn.Listener) { +func getTritonServer(ctx context.Context, t *testing.T, cloud string) (*tritonServer, *bufconn.Listener) { impl, err := newTritonServer(ctx, cloud, testProject, testBucket, testCacheAddr) if err != nil { t.Fatalf("Failed to create a new Triton server instance: %v", err) @@ -54,7 +55,7 @@ func getTestServer(ctx context.Context, t *testing.T, cloud string) (*grpc.Serve } }() t.Cleanup(func() { server.Stop() }) - return server, listener + return impl, listener } func assertEqualStore(t *testing.T, expected, actual *pb.Store) { @@ -130,13 +131,14 @@ func TestTriton(t *testing.T) { } func testTritonBackend(ctx context.Context, t *testing.T, cloud string) { - _, listener := getTestServer(ctx, t, cloud) + triton, listener := getTritonServer(ctx, t, cloud) _, client := getTestClient(ctx, t, listener) t.Run("CreateGetDeleteStore", func(t *testing.T) { createGetDeleteStore(ctx, t, client) }) t.Run("CreateGetDeleteRecord", func(t *testing.T) { createGetDeleteRecord(ctx, t, client) }) t.Run("UpdateRecordSimple", func(t *testing.T) { updateRecordSimple(ctx, t, client) }) t.Run("ListStoresNamePerfectMatch", func(t *testing.T) { listStoresNamePerfectMatch(ctx, t, client) }) + t.Run("CacheRecordsWithHints", func(t *testing.T) { cacheRecordsWithHints(ctx, t, triton, client) }) } func createGetDeleteStore(ctx context.Context, t *testing.T, client pb.TritonClient) { @@ -341,9 +343,118 @@ func listStoresNamePerfectMatch(ctx context.Context, t *testing.T, client pb.Tri } } +func cacheRecordsWithHints(ctx context.Context, t *testing.T, triton *tritonServer, client pb.TritonClient) { + storeKey := uuid.New().String() + storeReq := &pb.CreateStoreRequest{ + Store: &pb.Store{ + Key: storeKey, + }, + } + _, err := client.CreateStore(ctx, storeReq) + if err != nil { + t.Fatalf("CreateStore failed: %v", err) + } + t.Cleanup(func() { + req := &pb.DeleteStoreRequest{Key: storeKey} + _, err := client.DeleteStore(ctx, req) + assert.NoError(t, err) + }) + + recordKey := uuid.New().String() + testBlob := []byte{0x42, 0x24, 0x00} + createReq := &pb.CreateRecordRequest{ + StoreKey: storeKey, + Record: &pb.Record{ + Key: recordKey, + Blob: testBlob, + BlobSize: int64(len(testBlob)), + Tags: []string{"tag1", "tag2"}, + OwnerId: "owner", + Properties: map[string]*pb.Property{ + "prop1": { + Type: pb.Property_INTEGER, + Value: &pb.Property_IntegerValue{IntegerValue: -42}, + }, + }, + }, + Hint: &pb.Hint{ + DoNotCache: true, + }, + } + expected := createReq.Record + record, err := client.CreateRecord(ctx, createReq) + if err != nil { + t.Fatalf("CreateRecord failed: %v", err) + } + expected.CreatedAt = timestamppb.Now() + expected.UpdatedAt = expected.CreatedAt + assertEqualRecord(t, expected, record) + assert.Equal(t, record.GetCreatedAt(), record.GetUpdatedAt()) + + // Check do not cache hint was honored. + key := cache.FormatKey(storeKey, recordKey) + recFromCache, _ := triton.getRecordFromCache(ctx, key) + assert.Nil(t, recFromCache, "should not have retrieved record from cache after Create with DoNotCache hint") + + getReq := &pb.GetRecordRequest{ + StoreKey: storeKey, + Key: recordKey, + Hint: &pb.Hint{ + DoNotCache: true, + }, + } + if _, err = client.GetRecord(ctx, getReq); err != nil { + t.Errorf("GetRecord failed: %v", err) + } + + recFromCache2, _ := triton.getRecordFromCache(ctx, key) + assert.Nil(t, recFromCache2, "should not have retrieved record from cache after Get with DoNotCache hint") + + // Modify GetRecordRequest to not use the hint. + getReq.Hint = nil + if _, err = client.GetRecord(ctx, getReq); err != nil { + t.Errorf("GetRecord failed: %v", err) + } + + recFromCache3, _ := triton.getRecordFromCache(ctx, key) + assert.NotNil(t, recFromCache3, "should have retrieved record from cache after Get without hints") + assertEqualRecord(t, expected, recFromCache3) + + // Insert some bad data directly into the cache store. + // Check that the SkipCache hint successfully skips the + // cache and retrieves the correct data directly. + triton.storeRecordInCache(ctx, key, &pb.Record{ + Key: "bad record", + }) + getReqSkipCache := &pb.GetRecordRequest{ + StoreKey: storeKey, + Key: recordKey, + Hint: &pb.Hint{ + SkipCache: true, + }, + } + gotRecord, err := client.GetRecord(ctx, getReqSkipCache) + if err != nil { + t.Errorf("GetRecord failed: %v", err) + } + assertEqualRecord(t, expected, gotRecord) + + deleteReq := &pb.DeleteRecordRequest{ + StoreKey: storeKey, + Key: recordKey, + } + _, err = client.DeleteRecord(ctx, deleteReq) + if err != nil { + t.Errorf("DeleteRecord failed: %v", err) + } + + recFromCache4, _ := triton.getRecordFromCache(ctx, key) + assert.Nil(t, recFromCache4, "should not have retrieved record from cache post-delete") +} + func TestTriton_Ping(t *testing.T) { ctx := context.Background() - _, listener := getTestServer(ctx, t, "gcp") + _, listener := getTritonServer(ctx, t, "gcp") _, client := getTestClient(ctx, t, listener) pong, err := client.Ping(ctx, new(pb.PingRequest))