Skip to content

Commit

Permalink
feat(store): implement reconciliation
Browse files Browse the repository at this point in the history
  • Loading branch information
Th3Shadowbroker committed Nov 7, 2024
1 parent ca28afd commit 4a2ce71
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 4 deletions.
4 changes: 3 additions & 1 deletion internal/k8s/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
)

type ResourceWatcher struct {
client dynamic.Interface
resourceConfig *config.ResourceConfiguration
informer cache.SharedIndexInformer
stopChan chan struct{}
Expand All @@ -34,6 +35,7 @@ func NewResourceWatcher(
var namespace = resourceConfig.Kubernetes.Namespace
var informer = createInformer(client, resource, namespace, reSyncPeriod)
var watcher = ResourceWatcher{
client: client,
resourceConfig: resourceConfig,
informer: informer,
stopChan: make(chan struct{}),
Expand Down Expand Up @@ -129,7 +131,7 @@ func (w *ResourceWatcher) delete(obj any) {
}

func (w *ResourceWatcher) Start() {
store.CurrentStore.InitializeResource(w.resourceConfig)
store.CurrentStore.InitializeResource(w.client, w.resourceConfig)

defer func() {
if err := recover(); err != nil {
Expand Down
13 changes: 13 additions & 0 deletions internal/reconciliation/functions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright 2024 Deutsche Telekom IT GmbH
//
// SPDX-License-Identifier: Apache-2.0

package reconciliation

import "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"

type Functions struct {
AddFunc func(obj *unstructured.Unstructured)
CountFunc func(mapName string) (int, error)
KeysFunc func(mapName string) ([]string, error)
}
89 changes: 89 additions & 0 deletions internal/reconciliation/reconciliation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright 2024 Deutsche Telekom IT GmbH
//
// SPDX-License-Identifier: Apache-2.0

package reconciliation

import (
"context"
"github.com/rs/zerolog/log"
"github.com/telekom/quasar/internal/config"
"github.com/telekom/quasar/internal/utils"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/dynamic"
)

type Reconciliation struct {
client dynamic.Interface
resource *config.ResourceConfiguration
}

func NewReconciliation(client dynamic.Interface, resource *config.ResourceConfiguration) *Reconciliation {
return &Reconciliation{client, resource}
}

func (r *Reconciliation) Reconcile(reconcileFuncs Functions) {
resources, err := r.client.Resource(r.resource.GetGroupVersionResource()).
Namespace(r.resource.Kubernetes.Namespace).
List(context.Background(), v1.ListOptions{})

if err != nil {
log.Error().Err(err).Fields(map[string]any{
"cache": r.resource.GetCacheName(),
}).Msg("Could not retrieve resources from cluster")
return
}

resourceCount := len(resources.Items)
storeSize, err := reconcileFuncs.CountFunc(r.resource.GetCacheName())
if err != nil {
log.Error().Err(err).Fields(map[string]any{
"cache": r.resource.GetCacheName(),
}).Msg("Could not get size of store")
return
}

log.Info().Fields(map[string]any{
"cache": r.resource.GetCacheName(),
"storeSize": storeSize,
"resourceCount": resourceCount,
}).Msg("Checking for store size mismatch...")

if storeSize < resourceCount {
log.Warn().Fields(map[string]any{
"cache": r.resource.GetCacheName(),
}).Msg("Store size does not match resource count. Generating diff for reconciliation...")

storeKeys, err := reconcileFuncs.KeysFunc(r.resource.GetCacheName())
if err != nil {
log.Error().Err(err).Msg("Could no retrieve store keys")
}

missingEntries := r.generateDiff(resources.Items, storeKeys)
log.Warn().Msgf("Identified %d missing cache entries. Reprocessing...", len(missingEntries))
for _, entry := range missingEntries {
reconcileFuncs.AddFunc(&entry)
log.Warn().Fields(utils.CreateFieldsForOp("restore", &entry)).Msg("Restored")
}
}
}

func (r *Reconciliation) generateDiff(resources []unstructured.Unstructured, storeKeys []string) []unstructured.Unstructured {
var diff = make([]unstructured.Unstructured, 0)
for _, resource := range resources {
found := false
for _, storeKey := range storeKeys {
if resource.GetName() == storeKey {
found = true
break
}
}

if !found {
diff = append(diff, resource)
}
}

return diff
}
23 changes: 22 additions & 1 deletion internal/store/hazelcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ package store
import (
"context"
"github.com/hazelcast/hazelcast-go-client"
"github.com/hazelcast/hazelcast-go-client/cluster"
"github.com/hazelcast/hazelcast-go-client/serialization"
"github.com/rs/zerolog/log"
"github.com/telekom/quasar/internal/config"
"github.com/telekom/quasar/internal/mongo"
reconciler "github.com/telekom/quasar/internal/reconciliation"
"github.com/telekom/quasar/internal/utils"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/dynamic"
)

type HazelcastStore struct {
Expand All @@ -29,6 +32,7 @@ func (s *HazelcastStore) Initialize() {
hazelcastConfig.Cluster.Security.Credentials.Username = config.Current.Store.Hazelcast.Username
hazelcastConfig.Cluster.Security.Credentials.Password = config.Current.Store.Hazelcast.Password
hazelcastConfig.Cluster.Network.Addresses = config.Current.Store.Hazelcast.Addresses
hazelcastConfig.Cluster.Unisocket = true
hazelcastConfig.Logger.CustomLogger = new(utils.HazelcastZerologLogger)

s.ctx = context.Background()
Expand All @@ -42,7 +46,7 @@ func (s *HazelcastStore) Initialize() {
}
}

func (s *HazelcastStore) InitializeResource(resourceConfig *config.ResourceConfiguration) {
func (s *HazelcastStore) InitializeResource(kubernetesClient dynamic.Interface, resourceConfig *config.ResourceConfiguration) {
if s.wtClient != nil {
s.wtClient.EnsureIndexesOfResource(resourceConfig)
}
Expand All @@ -63,6 +67,23 @@ func (s *HazelcastStore) InitializeResource(resourceConfig *config.ResourceConfi
}).Err(err).Msg("Could not create hazelcast index")
}
}

var reconciliation = reconciler.NewReconciliation(kubernetesClient, resourceConfig)
_, err = s.client.AddMembershipListener(func(event cluster.MembershipStateChanged) {
if event.State == cluster.MembershipStateRemoved {
reconciliation.Reconcile(reconciler.Functions{
AddFunc: s.OnAdd,
CountFunc: s.Count,
KeysFunc: s.Keys,
})
}
})

if err != nil {
log.Error().Err(err).Fields(map[string]any{
"cache": resourceConfig.GetCacheName(),
}).Msg("Could not register membership listener for reconciliation")
}
}

func (s *HazelcastStore) OnAdd(obj *unstructured.Unstructured) {
Expand Down
3 changes: 2 additions & 1 deletion internal/store/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/telekom/quasar/internal/config"
"github.com/telekom/quasar/internal/utils"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/dynamic"
)

type RedisStore struct {
Expand All @@ -37,7 +38,7 @@ func (s *RedisStore) Initialize() {
log.Info().Msg("Redis connection established...")
}

func (s *RedisStore) InitializeResource(resourceConfig *config.ResourceConfiguration) {
func (s *RedisStore) InitializeResource(kubernetesClient dynamic.Interface, resourceConfig *config.ResourceConfiguration) {
// Nothing to do here
}

Expand Down
3 changes: 2 additions & 1 deletion internal/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ import (
"github.com/rs/zerolog/log"
"github.com/telekom/quasar/internal/config"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/dynamic"
"strings"
)

var CurrentStore Store

type Store interface {
Initialize()
InitializeResource(resourceConfig *config.ResourceConfiguration)
InitializeResource(kubernetesClient dynamic.Interface, resourceConfig *config.ResourceConfiguration)
OnAdd(obj *unstructured.Unstructured)
OnUpdate(oldObj *unstructured.Unstructured, newObj *unstructured.Unstructured)
OnDelete(obj *unstructured.Unstructured)
Expand Down

0 comments on commit 4a2ce71

Please sign in to comment.