Skip to content

Commit

Permalink
move types to common package and define interface on consumer's side
Browse files Browse the repository at this point in the history
  • Loading branch information
Furkhat Kasymov Genii Uulu committed Apr 29, 2024
1 parent 5fc3c84 commit ba511f2
Show file tree
Hide file tree
Showing 44 changed files with 708 additions and 720 deletions.
87 changes: 48 additions & 39 deletions actions/actions.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Package actions polls, handles and acknowledges actions from mothership for a given cluster.
//
//go:generate mockgen -destination ./mock/client.go . Client
package actions

import (
Expand All @@ -14,22 +17,24 @@ import (
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"

"github.com/castai/cluster-controller/castai"
"github.com/castai/cluster-controller/health"
"github.com/castai/cluster-controller/helm"
"github.com/castai/cluster-controller/types"
"github.com/castai/cluster-controller/waitext"
)

const (
// actionIDLogField is the log field name for action ID.
// This field is used in backend to detect actions ID in logs.
actionIDLogField = "id"
labelNodeID = "provisioner.cast.ai/node-id"
)

func newUnexpectedTypeErr(value interface{}, expectedType interface{}) error {
return fmt.Errorf("unexpected type %T, expected %T", value, expectedType)
}

// Config contains parameters to modify actions handling frequency and values required to poll/ack actions.
type Config struct {
PollWaitInterval time.Duration // How long to wait unit next long polling request.
PollTimeout time.Duration // hard timeout. Normally server should return empty result before this timeout.
Expand All @@ -41,67 +46,73 @@ type Config struct {
Namespace string
}

type Service interface {
Run(ctx context.Context)
// Client abstracts communication means.
type Client interface {
GetActions(ctx context.Context, k8sVersion string) ([]*types.ClusterAction, error)
AckAction(ctx context.Context, actionID string, errMessage *string) error
SendAKSInitData(ctx context.Context, cloudConfigBase64, protectedSettingsBase64, architecture string) error
}

type ActionHandler interface {
Handle(ctx context.Context, action *castai.ClusterAction) error
type actionHandler interface {
Handle(ctx context.Context, action *types.ClusterAction) error
}

// NewService returns new Service that can continuously handle actions once started.
func NewService(
log logrus.FieldLogger,
cfg Config,
k8sVersion string,
clientset *kubernetes.Clientset,
dynamicClient dynamic.Interface,
castaiClient castai.ActionsClient,
castaiClient Client,
helmClient helm.Client,
healthCheck *health.HealthzProvider,
) Service {
return &service{
) *Service {
return &Service{
log: log,
cfg: cfg,
k8sVersion: k8sVersion,
castAIClient: castaiClient,
startedActions: map[string]struct{}{},
actionHandlers: map[reflect.Type]ActionHandler{
reflect.TypeOf(&castai.ActionDeleteNode{}): newDeleteNodeHandler(log, clientset),
reflect.TypeOf(&castai.ActionDrainNode{}): newDrainNodeHandler(log, clientset, cfg.Namespace),
reflect.TypeOf(&castai.ActionPatchNode{}): newPatchNodeHandler(log, clientset),
reflect.TypeOf(&castai.ActionCreateEvent{}): newCreateEventHandler(log, clientset),
reflect.TypeOf(&castai.ActionApproveCSR{}): newApproveCSRHandler(log, clientset),
reflect.TypeOf(&castai.ActionChartUpsert{}): newChartUpsertHandler(log, helmClient),
reflect.TypeOf(&castai.ActionChartUninstall{}): newChartUninstallHandler(log, helmClient),
reflect.TypeOf(&castai.ActionChartRollback{}): newChartRollbackHandler(log, helmClient, cfg.Version),
reflect.TypeOf(&castai.ActionDisconnectCluster{}): newDisconnectClusterHandler(log, clientset),
reflect.TypeOf(&castai.ActionSendAKSInitData{}): newSendAKSInitDataHandler(log, castaiClient),
reflect.TypeOf(&castai.ActionCheckNodeDeleted{}): newCheckNodeDeletedHandler(log, clientset),
reflect.TypeOf(&castai.ActionCheckNodeStatus{}): newCheckNodeStatusHandler(log, clientset),
reflect.TypeOf(&castai.ActionPatch{}): newPatchHandler(log, dynamicClient),
reflect.TypeOf(&castai.ActionCreate{}): newCreateHandler(log, dynamicClient),
reflect.TypeOf(&castai.ActionDelete{}): newDeleteHandler(log, dynamicClient),
actionHandlers: map[reflect.Type]actionHandler{
reflect.TypeOf(&types.ActionDeleteNode{}): newDeleteNodeHandler(log, clientset),
reflect.TypeOf(&types.ActionDrainNode{}): newDrainNodeHandler(log, clientset, cfg.Namespace),
reflect.TypeOf(&types.ActionPatchNode{}): newPatchNodeHandler(log, clientset),
reflect.TypeOf(&types.ActionCreateEvent{}): newCreateEventHandler(log, clientset),
reflect.TypeOf(&types.ActionApproveCSR{}): newApproveCSRHandler(log, clientset),
reflect.TypeOf(&types.ActionChartUpsert{}): newChartUpsertHandler(log, helmClient),
reflect.TypeOf(&types.ActionChartUninstall{}): newChartUninstallHandler(log, helmClient),
reflect.TypeOf(&types.ActionChartRollback{}): newChartRollbackHandler(log, helmClient, cfg.Version),
reflect.TypeOf(&types.ActionDisconnectCluster{}): newDisconnectClusterHandler(log, clientset),
reflect.TypeOf(&types.ActionSendAKSInitData{}): newSendAKSInitDataHandler(log, castaiClient),
reflect.TypeOf(&types.ActionCheckNodeDeleted{}): newCheckNodeDeletedHandler(log, clientset),
reflect.TypeOf(&types.ActionCheckNodeStatus{}): newCheckNodeStatusHandler(log, clientset),
reflect.TypeOf(&types.ActionPatch{}): newPatchHandler(log, dynamicClient),
reflect.TypeOf(&types.ActionCreate{}): newCreateHandler(log, dynamicClient),
reflect.TypeOf(&types.ActionDelete{}): newDeleteHandler(log, dynamicClient),
},
healthCheck: healthCheck,
}
}

type service struct {
// Service can continuously poll and handle actions.
type Service struct {
log logrus.FieldLogger
cfg Config
castAIClient castai.ActionsClient
castAIClient Client

k8sVersion string

actionHandlers map[reflect.Type]ActionHandler
actionHandlers map[reflect.Type]actionHandler

startedActionsWg sync.WaitGroup
startedActions map[string]struct{}
startedActionsMu sync.Mutex
healthCheck *health.HealthzProvider
}

func (s *service) Run(ctx context.Context) {
// Run starts polling and handling actions.
func (s *Service) Run(ctx context.Context) {
s.healthCheck.Initializing()
for {
select {
Expand All @@ -123,11 +134,11 @@ func (s *service) Run(ctx context.Context) {
}
}

func (s *service) doWork(ctx context.Context) error {
func (s *Service) doWork(ctx context.Context) error {
s.log.Info("polling actions")
start := time.Now()
var (
actions []*castai.ClusterAction
actions []*types.ClusterAction
err error
iteration int
)
Expand Down Expand Up @@ -161,13 +172,13 @@ func (s *service) doWork(ctx context.Context) error {
return nil
}

func (s *service) handleActions(ctx context.Context, actions []*castai.ClusterAction) {
func (s *Service) handleActions(ctx context.Context, actions []*types.ClusterAction) {
for _, action := range actions {
if !s.startProcessing(action.ID) {
continue
}

go func(action *castai.ClusterAction) {
go func(action *types.ClusterAction) {
defer s.finishProcessing(action.ID)

var err error
Expand All @@ -193,15 +204,15 @@ func (s *service) handleActions(ctx context.Context, actions []*castai.ClusterAc
}
}

func (s *service) finishProcessing(actionID string) {
func (s *Service) finishProcessing(actionID string) {
s.startedActionsMu.Lock()
defer s.startedActionsMu.Unlock()

s.startedActionsWg.Done()
delete(s.startedActions, actionID)
}

func (s *service) startProcessing(actionID string) bool {
func (s *Service) startProcessing(actionID string) bool {
s.startedActionsMu.Lock()
defer s.startedActionsMu.Unlock()

Expand All @@ -214,7 +225,7 @@ func (s *service) startProcessing(actionID string) bool {
return true
}

func (s *service) handleAction(ctx context.Context, action *castai.ClusterAction) (err error) {
func (s *Service) handleAction(ctx context.Context, action *types.ClusterAction) (err error) {
actionType := reflect.TypeOf(action.Data())

defer func() {
Expand All @@ -238,7 +249,7 @@ func (s *service) handleAction(ctx context.Context, action *castai.ClusterAction
return nil
}

func (s *service) ackAction(ctx context.Context, action *castai.ClusterAction, handleErr error) error {
func (s *Service) ackAction(ctx context.Context, action *types.ClusterAction, handleErr error) error {
actionType := reflect.TypeOf(action.Data())
s.log.WithFields(logrus.Fields{
actionIDLogField: action.ID,
Expand All @@ -250,9 +261,7 @@ func (s *service) ackAction(ctx context.Context, action *castai.ClusterAction, h
return waitext.Retry(ctx, boff, s.cfg.AckRetriesCount, func(ctx context.Context) (bool, error) {
ctx, cancel := context.WithTimeout(ctx, s.cfg.AckTimeout)
defer cancel()
return true, s.castAIClient.AckAction(ctx, action.ID, &castai.AckClusterActionRequest{
Error: getHandlerError(handleErr),
})
return true, s.castAIClient.AckAction(ctx, action.ID, getHandlerError(handleErr))
}, func(err error) {
s.log.Debugf("ack failed, will retry: %v", err)
})
Expand Down
Loading

0 comments on commit ba511f2

Please sign in to comment.