Skip to content

Commit

Permalink
feat: delete redundant resource from ES when syncer starts (#255)
Browse files Browse the repository at this point in the history
## What type of PR is this?
/kind feature

## What this PR does / why we need it:
delete redundant resource from ES when syncer starts

## Which issue(s) this PR fixes:
Fixes #256
  • Loading branch information
panshuai111 authored Jan 18, 2024
1 parent 91da8e3 commit d8eba17
Show file tree
Hide file tree
Showing 11 changed files with 145 additions and 26 deletions.
2 changes: 1 addition & 1 deletion pkg/core/handler/search/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func SearchForResource(searchMgr *search.SearchManager, searchStorage storage.Se

logger.Info("Searching for resources...", "page", searchPage, "pageSize", searchPageSize)

res, err := searchStorage.Search(ctx, searchQuery, searchPattern, searchPageSize, searchPage)
res, err := searchStorage.Search(ctx, searchQuery, searchPattern, &storage.Pagination{Page: searchPage, PageSize: searchPageSize})
if err != nil {
render.Render(w, r, handler.FailureResponse(ctx, err))
return
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/manager/insight/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (i *InsightManager) scanFor(ctx context.Context, locator core.Locator, noCa
"searchQuery", searchQuery, "searchPattern", searchPattern,
"searchPageSize", pageSizeIteration, "searchPage", pageIteration)

res, err := i.search.Search(ctx, searchQuery, searchPattern, pageSizeIteration, pageIteration)
res, err := i.search.Search(ctx, searchQuery, searchPattern, &storage.Pagination{Page: pageSizeIteration, PageSize: pageIteration})
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/manager/insight/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (i *InsightManager) CountResourcesByNamespace(ctx context.Context, client *
"searchQuery", searchQuery, "searchPattern", searchPattern, "searchPageSize", pageSizeIteration, "searchPage", pageIteration)

for {
res, err := i.search.Search(ctx, searchQuery, searchPattern, pageSizeIteration, pageIteration)
res, err := i.search.Search(ctx, searchQuery, searchPattern, &storage.Pagination{Page: pageSizeIteration, PageSize: pageIteration})
if err != nil {
return nil, err
}
Expand Down
42 changes: 27 additions & 15 deletions pkg/infra/search/storage/elasticsearch/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,27 @@ import (

"github.com/KusionStack/karbour/pkg/infra/search/storage"
"github.com/cch123/elasticsql"
"github.com/elastic/go-elasticsearch/v8/esapi"
"github.com/pkg/errors"
)

func (s *ESClient) Search(ctx context.Context, queryStr string, patternType string, pageSize, page int) (*storage.SearchResult, error) {
type Pagination struct {
Page int
PageSize int
}

func (s *ESClient) Search(ctx context.Context, queryStr string, patternType string, pagination *storage.Pagination) (*storage.SearchResult, error) {
var res *SearchResponse
var err error

switch patternType {
case storage.DSLPatternType:
res, err = s.searchByDSL(ctx, queryStr, pageSize, page)
res, err = s.searchByDSL(ctx, queryStr, pagination)
if err != nil {
return nil, errors.Wrap(err, "search by DSL failed")
}
case storage.SQLPatternType:
res, err = s.searchBySQL(ctx, queryStr, pageSize, page)
res, err = s.searchBySQL(ctx, queryStr, pagination)
if err != nil {
return nil, errors.Wrap(err, "search by SQL failed")
}
Expand All @@ -58,7 +64,7 @@ func (s *ESClient) Search(ctx context.Context, queryStr string, patternType stri
return rt, nil
}

func (s *ESClient) searchByDSL(ctx context.Context, dslStr string, pageSize, page int) (*SearchResponse, error) {
func (s *ESClient) searchByDSL(ctx context.Context, dslStr string, pagination *storage.Pagination) (*SearchResponse, error) {
queries, err := Parse(dslStr)
if err != nil {
return nil, err
Expand All @@ -67,38 +73,44 @@ func (s *ESClient) searchByDSL(ctx context.Context, dslStr string, pageSize, pag
if err != nil {
return nil, err
}
res, err := s.searchByQuery(ctx, esQuery, pageSize, page)
res, err := s.SearchByQuery(ctx, esQuery, pagination)
if err != nil {
return nil, err
}
return res, nil
}

func (s *ESClient) searchBySQL(ctx context.Context, sqlStr string, pageSize, page int) (*SearchResponse, error) {
func (s *ESClient) searchBySQL(ctx context.Context, sqlStr string, pagination *storage.Pagination) (*SearchResponse, error) {
dsl, _, err := elasticsql.Convert(sqlStr)
if err != nil {
return nil, err
}
return s.search(ctx, strings.NewReader(dsl), pageSize, page)
return s.search(ctx, strings.NewReader(dsl), pagination)
}

func (s *ESClient) searchByQuery(ctx context.Context, query map[string]interface{}, pageSize, page int) (*SearchResponse, error) {
func (s *ESClient) SearchByQuery(ctx context.Context, query map[string]interface{}, pagination *storage.Pagination) (*SearchResponse, error) {
buf := &bytes.Buffer{}
if err := json.NewEncoder(buf).Encode(query); err != nil {
return nil, err
}
return s.search(ctx, buf, pageSize, page)
return s.search(ctx, buf, pagination)
}

func (s *ESClient) search(ctx context.Context, body io.Reader, pageSize, page int) (*SearchResponse, error) {
from := (page - 1) * pageSize
resp, err := s.client.Search(
func (s *ESClient) search(ctx context.Context, body io.Reader, pagination *storage.Pagination) (*SearchResponse, error) {
opts := []func(*esapi.SearchRequest){
s.client.Search.WithContext(ctx),
s.client.Search.WithIndex(s.indexName),
s.client.Search.WithBody(body),
s.client.Search.WithSize(pageSize),
s.client.Search.WithFrom(from),
)
}
if pagination != nil {
from := (pagination.Page - 1) * pagination.PageSize
opts = append(
opts,
s.client.Search.WithSize(pagination.PageSize),
s.client.Search.WithFrom(from),
)
}
resp, err := s.client.Search(opts...)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/infra/search/storage/elasticsearch/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (s *ESClient) Get(ctx context.Context, cluster string, obj runtime.Object)
}

query := generateQuery(cluster, unObj.GetNamespace(), unObj.GetName(), unObj)
sr, err := s.searchByQuery(ctx, query, 10, 0)
sr, err := s.SearchByQuery(ctx, query, nil)
if err != nil {
return err
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/infra/search/storage/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,13 @@ type Query struct {
Operator string
}

type Pagination struct {
Page int
PageSize int
}

type SearchStorage interface {
Search(ctx context.Context, queryString, patternType string, pageSize, page int) (*SearchResult, error)
Search(ctx context.Context, queryString, patternType string, pagination *Pagination) (*SearchResult, error)
}

type SearchStorageGetter interface {
Expand Down
3 changes: 2 additions & 1 deletion pkg/syncer/cache/resource_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,11 @@ func NewResourceInformer(lw cache.ListerWatcher,
transform cache.TransformFunc,
resyncPeriod time.Duration,
handler ResourceHandler,
knownObjects cache.KeyListerGetter,
) cache.Controller {
informerCache := NewResourceCache()
fifo := cache.NewDeltaFIFOWithOptions(cache.DeltaFIFOOptions{
KnownObjects: informerCache,
KnownObjects: knownObjects,
EmitDeltaTypeReplaced: true,
})

Expand Down
4 changes: 2 additions & 2 deletions pkg/syncer/cache/resource_informer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func TestInformerWithSelectors(t *testing.T) {
},
}

informer := NewResourceInformer(lw, utils.MultiSelectors(tt.selectors), nil, 0, recorder.resourceHandler())
informer := NewResourceInformer(lw, utils.MultiSelectors(tt.selectors), nil, 0, recorder.resourceHandler(), nil)
stop := make(chan struct{})
defer close(stop)
go informer.Run(stop)
Expand Down Expand Up @@ -275,7 +275,7 @@ func TestInformerWithTransformer(t *testing.T) {
},
}

informer := NewResourceInformer(lw, nil, tt.transFunc, 0, recorder.resourceHandler())
informer := NewResourceInformer(lw, nil, tt.transFunc, 0, recorder.resourceHandler(), nil)
stop := make(chan struct{})
defer close(stop)
go informer.Run(stop)
Expand Down
10 changes: 8 additions & 2 deletions pkg/syncer/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"text/template"
"time"

"github.com/KusionStack/karbour/pkg/infra/search/storage"
"github.com/KusionStack/karbour/pkg/infra/search/storage/elasticsearch"
"github.com/KusionStack/karbour/pkg/kubernetes/apis/search/v1beta1"
"github.com/KusionStack/karbour/pkg/syncer/cache"
"github.com/KusionStack/karbour/pkg/syncer/internal"
Expand Down Expand Up @@ -53,6 +55,7 @@ type SyncSource interface {
type informerSource struct {
cluster string
v1beta1.ResourceSyncRule
storage storage.Storage

client dynamic.Interface
informer clientgocache.Controller
Expand All @@ -62,9 +65,10 @@ type informerSource struct {
stopped chan struct{}
}

func NewSource(cluster string, client dynamic.Interface, rsr v1beta1.ResourceSyncRule) SyncSource {
func NewSource(cluster string, client dynamic.Interface, rsr v1beta1.ResourceSyncRule, storage storage.Storage) SyncSource {
return &informerSource{
cluster: cluster,
storage: storage,
ResourceSyncRule: rsr,
client: client,
stopped: make(chan struct{}),
Expand Down Expand Up @@ -140,7 +144,9 @@ func (s *informerSource) createInformer(_ context.Context, handler ctrlhandler.E
}

h := internal.EventHandler{EventHandler: handler, Queue: queue, Predicates: predicates}
return cache.NewResourceInformer(lw, utils.MultiSelectors(selectors), transform, resyncPeriod, h), nil
// TODO: Use interface instead of struct
knownObjects := utils.NewESListerGetter(s.cluster, s.storage.(*elasticsearch.ESClient), gvr)
return cache.NewResourceInformer(lw, utils.MultiSelectors(selectors), transform, resyncPeriod, h, knownObjects), nil
}

func (s *informerSource) HasSynced() bool {
Expand Down
2 changes: 1 addition & 1 deletion pkg/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ type ResourceSyncer struct {
}

func NewResourceSyncer(cluster string, dynamicClient dynamic.Interface, rsr v1beta1.ResourceSyncRule, storage storage.Storage) *ResourceSyncer {
source := NewSource(cluster, dynamicClient, rsr)
source := NewSource(cluster, dynamicClient, rsr, storage)
return &ResourceSyncer{
source: source,
storage: storage,
Expand Down
95 changes: 95 additions & 0 deletions pkg/syncer/utils/elasticsearch_getter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package utils

import (
"context"
"fmt"
"strings"

"github.com/KusionStack/karbour/pkg/infra/search/storage/elasticsearch"
"github.com/aquasecurity/esquery"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
k8scache "k8s.io/client-go/tools/cache"
)

var _ k8scache.KeyListerGetter = &ESListerGetter{}

type ESListerGetter struct {
cluster string
esClient *elasticsearch.ESClient
gvr schema.GroupVersionResource
}

func NewESListerGetter(cluster string, esClient *elasticsearch.ESClient, gvr schema.GroupVersionResource) *ESListerGetter {
return &ESListerGetter{
cluster: cluster,
esClient: esClient,
gvr: gvr,
}
}

func (e *ESListerGetter) ListKeys() []string {
resource := e.gvr.Resource
kind := resource[0 : len(resource)-1]
query := make(map[string]interface{})
query["query"] = esquery.Bool().Must(
esquery.Term("cluster", e.cluster),
esquery.Term("apiVersion", e.gvr.GroupVersion().String()),
esquery.Term("kind", kind),
).Map()
sr, err := e.esClient.SearchByQuery(context.Background(), query, nil)
if err != nil {
return nil
}
rt := []string{}
for _, r := range sr.GetResources() {
name, _, _ := unstructured.NestedString(r.Object, "metadata", "name")
ns, _, _ := unstructured.NestedString(r.Object, "metadata", "namespace")
var key string
if ns != "" && name != "" {
key = ns + "/" + name
} else if name != "" {
key = name
}
if key != "" {
rt = append(rt, key)
}
}
return rt
}

func (e *ESListerGetter) GetByKey(key string) (value interface{}, exists bool, err error) {
s := strings.Split(key, "/")
var name, ns string
switch len(s) {
case 1:
name = s[0]
case 2:
ns = s[0]
name = s[1]
default:
return nil, false, fmt.Errorf("invalid key:%s", key)
}

resource := e.gvr.Resource
kind := resource[0 : len(resource)-1]
query := make(map[string]interface{})
query["query"] = esquery.Bool().Must(
esquery.Term("cluster", e.cluster),
esquery.Term("apiVersion", e.gvr.GroupVersion().String()),
esquery.Term("kind", kind),
esquery.Term("namespace", ns),
esquery.Term("name", name)).Map()
sr, err := e.esClient.SearchByQuery(context.Background(), query, nil)
if err != nil {
return nil, false, err
}
resources := sr.GetResources()
if len(resources) != 1 {
return nil, false, fmt.Errorf("query result expected 1, got %d", len(resources))
}

unObj := &unstructured.Unstructured{}
unObj.SetUnstructuredContent(resources[0].Object)
return unObj, true, nil
}

0 comments on commit d8eba17

Please sign in to comment.