From d8eba17dd6f274e14654188f8b27cc77b00c6aa9 Mon Sep 17 00:00:00 2001 From: panshuai-ps <49754046+panshuai-ps@users.noreply.github.com> Date: Thu, 18 Jan 2024 10:49:49 +0800 Subject: [PATCH] feat: delete redundant resource from ES when syncer starts (#255) ## What type of PR is this? /kind feature ## What this PR does / why we need it: delete redundant resource from ES when syncer starts ## Which issue(s) this PR fixes: Fixes #256 --- pkg/core/handler/search/search.go | 2 +- pkg/core/manager/insight/scanner.go | 2 +- pkg/core/manager/insight/util.go | 2 +- .../search/storage/elasticsearch/search.go | 42 +++++--- .../search/storage/elasticsearch/storage.go | 2 +- pkg/infra/search/storage/types.go | 7 +- pkg/syncer/cache/resource_informer.go | 3 +- pkg/syncer/cache/resource_informer_test.go | 4 +- pkg/syncer/source.go | 10 +- pkg/syncer/syncer.go | 2 +- pkg/syncer/utils/elasticsearch_getter.go | 95 +++++++++++++++++++ 11 files changed, 145 insertions(+), 26 deletions(-) create mode 100644 pkg/syncer/utils/elasticsearch_getter.go diff --git a/pkg/core/handler/search/search.go b/pkg/core/handler/search/search.go index b01a196d..0176fd9d 100644 --- a/pkg/core/handler/search/search.go +++ b/pkg/core/handler/search/search.go @@ -66,7 +66,7 @@ func SearchForResource(searchMgr *search.SearchManager, searchStorage storage.Se logger.Info("Searching for resources...", "page", searchPage, "pageSize", searchPageSize) - res, err := searchStorage.Search(ctx, searchQuery, searchPattern, searchPageSize, searchPage) + res, err := searchStorage.Search(ctx, searchQuery, searchPattern, &storage.Pagination{Page: searchPage, PageSize: searchPageSize}) if err != nil { render.Render(w, r, handler.FailureResponse(ctx, err)) return diff --git a/pkg/core/manager/insight/scanner.go b/pkg/core/manager/insight/scanner.go index 14cb1bd4..9084aa8b 100644 --- a/pkg/core/manager/insight/scanner.go +++ b/pkg/core/manager/insight/scanner.go @@ -102,7 +102,7 @@ func (i *InsightManager) scanFor(ctx context.Context, locator core.Locator, noCa "searchQuery", searchQuery, "searchPattern", searchPattern, "searchPageSize", pageSizeIteration, "searchPage", pageIteration) - res, err := i.search.Search(ctx, searchQuery, searchPattern, pageSizeIteration, pageIteration) + res, err := i.search.Search(ctx, searchQuery, searchPattern, &storage.Pagination{Page: pageSizeIteration, PageSize: pageIteration}) if err != nil { return nil, err } diff --git a/pkg/core/manager/insight/util.go b/pkg/core/manager/insight/util.go index 512e91f2..1cbf4f9c 100644 --- a/pkg/core/manager/insight/util.go +++ b/pkg/core/manager/insight/util.go @@ -92,7 +92,7 @@ func (i *InsightManager) CountResourcesByNamespace(ctx context.Context, client * "searchQuery", searchQuery, "searchPattern", searchPattern, "searchPageSize", pageSizeIteration, "searchPage", pageIteration) for { - res, err := i.search.Search(ctx, searchQuery, searchPattern, pageSizeIteration, pageIteration) + res, err := i.search.Search(ctx, searchQuery, searchPattern, &storage.Pagination{Page: pageSizeIteration, PageSize: pageIteration}) if err != nil { return nil, err } diff --git a/pkg/infra/search/storage/elasticsearch/search.go b/pkg/infra/search/storage/elasticsearch/search.go index 95b64781..8146cd06 100644 --- a/pkg/infra/search/storage/elasticsearch/search.go +++ b/pkg/infra/search/storage/elasticsearch/search.go @@ -24,21 +24,27 @@ import ( "github.com/KusionStack/karbour/pkg/infra/search/storage" "github.com/cch123/elasticsql" + "github.com/elastic/go-elasticsearch/v8/esapi" "github.com/pkg/errors" ) -func (s *ESClient) Search(ctx context.Context, queryStr string, patternType string, pageSize, page int) (*storage.SearchResult, error) { +type Pagination struct { + Page int + PageSize int +} + +func (s *ESClient) Search(ctx context.Context, queryStr string, patternType string, pagination *storage.Pagination) (*storage.SearchResult, error) { var res *SearchResponse var err error switch patternType { case storage.DSLPatternType: - res, err = s.searchByDSL(ctx, queryStr, pageSize, page) + res, err = s.searchByDSL(ctx, queryStr, pagination) if err != nil { return nil, errors.Wrap(err, "search by DSL failed") } case storage.SQLPatternType: - res, err = s.searchBySQL(ctx, queryStr, pageSize, page) + res, err = s.searchBySQL(ctx, queryStr, pagination) if err != nil { return nil, errors.Wrap(err, "search by SQL failed") } @@ -58,7 +64,7 @@ func (s *ESClient) Search(ctx context.Context, queryStr string, patternType stri return rt, nil } -func (s *ESClient) searchByDSL(ctx context.Context, dslStr string, pageSize, page int) (*SearchResponse, error) { +func (s *ESClient) searchByDSL(ctx context.Context, dslStr string, pagination *storage.Pagination) (*SearchResponse, error) { queries, err := Parse(dslStr) if err != nil { return nil, err @@ -67,38 +73,44 @@ func (s *ESClient) searchByDSL(ctx context.Context, dslStr string, pageSize, pag if err != nil { return nil, err } - res, err := s.searchByQuery(ctx, esQuery, pageSize, page) + res, err := s.SearchByQuery(ctx, esQuery, pagination) if err != nil { return nil, err } return res, nil } -func (s *ESClient) searchBySQL(ctx context.Context, sqlStr string, pageSize, page int) (*SearchResponse, error) { +func (s *ESClient) searchBySQL(ctx context.Context, sqlStr string, pagination *storage.Pagination) (*SearchResponse, error) { dsl, _, err := elasticsql.Convert(sqlStr) if err != nil { return nil, err } - return s.search(ctx, strings.NewReader(dsl), pageSize, page) + return s.search(ctx, strings.NewReader(dsl), pagination) } -func (s *ESClient) searchByQuery(ctx context.Context, query map[string]interface{}, pageSize, page int) (*SearchResponse, error) { +func (s *ESClient) SearchByQuery(ctx context.Context, query map[string]interface{}, pagination *storage.Pagination) (*SearchResponse, error) { buf := &bytes.Buffer{} if err := json.NewEncoder(buf).Encode(query); err != nil { return nil, err } - return s.search(ctx, buf, pageSize, page) + return s.search(ctx, buf, pagination) } -func (s *ESClient) search(ctx context.Context, body io.Reader, pageSize, page int) (*SearchResponse, error) { - from := (page - 1) * pageSize - resp, err := s.client.Search( +func (s *ESClient) search(ctx context.Context, body io.Reader, pagination *storage.Pagination) (*SearchResponse, error) { + opts := []func(*esapi.SearchRequest){ s.client.Search.WithContext(ctx), s.client.Search.WithIndex(s.indexName), s.client.Search.WithBody(body), - s.client.Search.WithSize(pageSize), - s.client.Search.WithFrom(from), - ) + } + if pagination != nil { + from := (pagination.Page - 1) * pagination.PageSize + opts = append( + opts, + s.client.Search.WithSize(pagination.PageSize), + s.client.Search.WithFrom(from), + ) + } + resp, err := s.client.Search(opts...) if err != nil { return nil, err } diff --git a/pkg/infra/search/storage/elasticsearch/storage.go b/pkg/infra/search/storage/elasticsearch/storage.go index d4fc14d6..4a50824b 100644 --- a/pkg/infra/search/storage/elasticsearch/storage.go +++ b/pkg/infra/search/storage/elasticsearch/storage.go @@ -49,7 +49,7 @@ func (s *ESClient) Get(ctx context.Context, cluster string, obj runtime.Object) } query := generateQuery(cluster, unObj.GetNamespace(), unObj.GetName(), unObj) - sr, err := s.searchByQuery(ctx, query, 10, 0) + sr, err := s.SearchByQuery(ctx, query, nil) if err != nil { return err } diff --git a/pkg/infra/search/storage/types.go b/pkg/infra/search/storage/types.go index ff5387b6..722cb41f 100644 --- a/pkg/infra/search/storage/types.go +++ b/pkg/infra/search/storage/types.go @@ -45,8 +45,13 @@ type Query struct { Operator string } +type Pagination struct { + Page int + PageSize int +} + type SearchStorage interface { - Search(ctx context.Context, queryString, patternType string, pageSize, page int) (*SearchResult, error) + Search(ctx context.Context, queryString, patternType string, pagination *Pagination) (*SearchResult, error) } type SearchStorageGetter interface { diff --git a/pkg/syncer/cache/resource_informer.go b/pkg/syncer/cache/resource_informer.go index 7686633a..1fe13dfb 100644 --- a/pkg/syncer/cache/resource_informer.go +++ b/pkg/syncer/cache/resource_informer.go @@ -61,10 +61,11 @@ func NewResourceInformer(lw cache.ListerWatcher, transform cache.TransformFunc, resyncPeriod time.Duration, handler ResourceHandler, + knownObjects cache.KeyListerGetter, ) cache.Controller { informerCache := NewResourceCache() fifo := cache.NewDeltaFIFOWithOptions(cache.DeltaFIFOOptions{ - KnownObjects: informerCache, + KnownObjects: knownObjects, EmitDeltaTypeReplaced: true, }) diff --git a/pkg/syncer/cache/resource_informer_test.go b/pkg/syncer/cache/resource_informer_test.go index 9e5bd6c6..6e229127 100644 --- a/pkg/syncer/cache/resource_informer_test.go +++ b/pkg/syncer/cache/resource_informer_test.go @@ -169,7 +169,7 @@ func TestInformerWithSelectors(t *testing.T) { }, } - informer := NewResourceInformer(lw, utils.MultiSelectors(tt.selectors), nil, 0, recorder.resourceHandler()) + informer := NewResourceInformer(lw, utils.MultiSelectors(tt.selectors), nil, 0, recorder.resourceHandler(), nil) stop := make(chan struct{}) defer close(stop) go informer.Run(stop) @@ -275,7 +275,7 @@ func TestInformerWithTransformer(t *testing.T) { }, } - informer := NewResourceInformer(lw, nil, tt.transFunc, 0, recorder.resourceHandler()) + informer := NewResourceInformer(lw, nil, tt.transFunc, 0, recorder.resourceHandler(), nil) stop := make(chan struct{}) defer close(stop) go informer.Run(stop) diff --git a/pkg/syncer/source.go b/pkg/syncer/source.go index 5d7815f6..8764a958 100644 --- a/pkg/syncer/source.go +++ b/pkg/syncer/source.go @@ -21,6 +21,8 @@ import ( "text/template" "time" + "github.com/KusionStack/karbour/pkg/infra/search/storage" + "github.com/KusionStack/karbour/pkg/infra/search/storage/elasticsearch" "github.com/KusionStack/karbour/pkg/kubernetes/apis/search/v1beta1" "github.com/KusionStack/karbour/pkg/syncer/cache" "github.com/KusionStack/karbour/pkg/syncer/internal" @@ -53,6 +55,7 @@ type SyncSource interface { type informerSource struct { cluster string v1beta1.ResourceSyncRule + storage storage.Storage client dynamic.Interface informer clientgocache.Controller @@ -62,9 +65,10 @@ type informerSource struct { stopped chan struct{} } -func NewSource(cluster string, client dynamic.Interface, rsr v1beta1.ResourceSyncRule) SyncSource { +func NewSource(cluster string, client dynamic.Interface, rsr v1beta1.ResourceSyncRule, storage storage.Storage) SyncSource { return &informerSource{ cluster: cluster, + storage: storage, ResourceSyncRule: rsr, client: client, stopped: make(chan struct{}), @@ -140,7 +144,9 @@ func (s *informerSource) createInformer(_ context.Context, handler ctrlhandler.E } h := internal.EventHandler{EventHandler: handler, Queue: queue, Predicates: predicates} - return cache.NewResourceInformer(lw, utils.MultiSelectors(selectors), transform, resyncPeriod, h), nil + // TODO: Use interface instead of struct + knownObjects := utils.NewESListerGetter(s.cluster, s.storage.(*elasticsearch.ESClient), gvr) + return cache.NewResourceInformer(lw, utils.MultiSelectors(selectors), transform, resyncPeriod, h, knownObjects), nil } func (s *informerSource) HasSynced() bool { diff --git a/pkg/syncer/syncer.go b/pkg/syncer/syncer.go index 960d456d..ad94670c 100644 --- a/pkg/syncer/syncer.go +++ b/pkg/syncer/syncer.go @@ -98,7 +98,7 @@ type ResourceSyncer struct { } func NewResourceSyncer(cluster string, dynamicClient dynamic.Interface, rsr v1beta1.ResourceSyncRule, storage storage.Storage) *ResourceSyncer { - source := NewSource(cluster, dynamicClient, rsr) + source := NewSource(cluster, dynamicClient, rsr, storage) return &ResourceSyncer{ source: source, storage: storage, diff --git a/pkg/syncer/utils/elasticsearch_getter.go b/pkg/syncer/utils/elasticsearch_getter.go new file mode 100644 index 00000000..5bfe92ec --- /dev/null +++ b/pkg/syncer/utils/elasticsearch_getter.go @@ -0,0 +1,95 @@ +package utils + +import ( + "context" + "fmt" + "strings" + + "github.com/KusionStack/karbour/pkg/infra/search/storage/elasticsearch" + "github.com/aquasecurity/esquery" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + k8scache "k8s.io/client-go/tools/cache" +) + +var _ k8scache.KeyListerGetter = &ESListerGetter{} + +type ESListerGetter struct { + cluster string + esClient *elasticsearch.ESClient + gvr schema.GroupVersionResource +} + +func NewESListerGetter(cluster string, esClient *elasticsearch.ESClient, gvr schema.GroupVersionResource) *ESListerGetter { + return &ESListerGetter{ + cluster: cluster, + esClient: esClient, + gvr: gvr, + } +} + +func (e *ESListerGetter) ListKeys() []string { + resource := e.gvr.Resource + kind := resource[0 : len(resource)-1] + query := make(map[string]interface{}) + query["query"] = esquery.Bool().Must( + esquery.Term("cluster", e.cluster), + esquery.Term("apiVersion", e.gvr.GroupVersion().String()), + esquery.Term("kind", kind), + ).Map() + sr, err := e.esClient.SearchByQuery(context.Background(), query, nil) + if err != nil { + return nil + } + rt := []string{} + for _, r := range sr.GetResources() { + name, _, _ := unstructured.NestedString(r.Object, "metadata", "name") + ns, _, _ := unstructured.NestedString(r.Object, "metadata", "namespace") + var key string + if ns != "" && name != "" { + key = ns + "/" + name + } else if name != "" { + key = name + } + if key != "" { + rt = append(rt, key) + } + } + return rt +} + +func (e *ESListerGetter) GetByKey(key string) (value interface{}, exists bool, err error) { + s := strings.Split(key, "/") + var name, ns string + switch len(s) { + case 1: + name = s[0] + case 2: + ns = s[0] + name = s[1] + default: + return nil, false, fmt.Errorf("invalid key:%s", key) + } + + resource := e.gvr.Resource + kind := resource[0 : len(resource)-1] + query := make(map[string]interface{}) + query["query"] = esquery.Bool().Must( + esquery.Term("cluster", e.cluster), + esquery.Term("apiVersion", e.gvr.GroupVersion().String()), + esquery.Term("kind", kind), + esquery.Term("namespace", ns), + esquery.Term("name", name)).Map() + sr, err := e.esClient.SearchByQuery(context.Background(), query, nil) + if err != nil { + return nil, false, err + } + resources := sr.GetResources() + if len(resources) != 1 { + return nil, false, fmt.Errorf("query result expected 1, got %d", len(resources)) + } + + unObj := &unstructured.Unstructured{} + unObj.SetUnstructuredContent(resources[0].Object) + return unObj, true, nil +}