Skip to content

Commit

Permalink
Add force mode for component image upgrade
Browse files Browse the repository at this point in the history
Signed-off-by: shaoyue.chen <[email protected]>
  • Loading branch information
haorenfsa committed Mar 26, 2024
1 parent cdce33d commit fae2377
Show file tree
Hide file tree
Showing 10 changed files with 94 additions and 35 deletions.
6 changes: 5 additions & 1 deletion apis/milvus.io/v1beta1/components_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,17 @@ const (
ImageUpdateModeRollingDowngrade ImageUpdateMode = "rollingDowngrade"
// ImageUpdateModeAll means all components' image will be updated immediately to spec.image
ImageUpdateModeAll ImageUpdateMode = "all"
// ImageUpdateModeForceUpgrade means all components' image will be updated immediately to spec.image
// and kills the terminated pods to speed up the process
ImageUpdateModeForce ImageUpdateMode = "force"
)

type MilvusComponents struct {
ComponentSpec `json:",inline"`

// ImageUpdateMode is how the milvus components' image should be updated. works only when rolling update is enabled.
// +kubebuilder:validation:Enum=rollingUpgrade;rollingDowngrade;all
// forceUpgrade will update all pods' image immediately, and kills the terminated pods to speed up the process
// +kubebuilder:validation:Enum=rollingUpgrade;rollingDowngrade;all;force
// +kubebuilder:default:="rollingUpgrade"
// +kubebuilder:validation:Optional
ImageUpdateMode ImageUpdateMode `json:"imageUpdateMode,omitempty"`
Expand Down
2 changes: 2 additions & 0 deletions charts/milvus-operator/templates/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1539,6 +1539,7 @@ spec:
- rollingUpgrade
- rollingDowngrade
- all
- force
type: string
indexCoord:
properties:
Expand Down Expand Up @@ -8803,6 +8804,7 @@ spec:
- rollingUpgrade
- rollingDowngrade
- all
- force
type: string
indexCoord:
properties:
Expand Down
1 change: 1 addition & 0 deletions config/crd/bases/milvus.io_milvusclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1538,6 +1538,7 @@ spec:
- rollingUpgrade
- rollingDowngrade
- all
- force
type: string
indexCoord:
properties:
Expand Down
1 change: 1 addition & 0 deletions config/crd/bases/milvus.io_milvuses.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2476,6 +2476,7 @@ spec:
- rollingUpgrade
- rollingDowngrade
- all
- force
type: string
indexCoord:
properties:
Expand Down
2 changes: 2 additions & 0 deletions deploy/manifests/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1570,6 +1570,7 @@ spec:
- rollingUpgrade
- rollingDowngrade
- all
- force
type: string
indexCoord:
properties:
Expand Down Expand Up @@ -8835,6 +8836,7 @@ spec:
- rollingUpgrade
- rollingDowngrade
- all
- force
type: string
indexCoord:
properties:
Expand Down
56 changes: 38 additions & 18 deletions pkg/controllers/component_condition.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,56 +159,76 @@ func GetComponentConditionGetter() ComponentConditionGetter {

var singletonComponentConditionGetter ComponentConditionGetter = ComponentConditionGetterImpl{}

var CheckMilvusHasTerminatingPod = func(ctx context.Context, cli client.Client, mc v1beta1.Milvus) (bool, error) {
var ListMilvusTerminatingPods = func(ctx context.Context, cli client.Client, mc v1beta1.Milvus) (*corev1.PodList, error) {
opts := &client.ListOptions{
Namespace: mc.Namespace,
}
opts.LabelSelector = labels.SelectorFromSet(NewAppLabels(mc.Name))
return HasTerminatingPodByListOpts(ctx, cli, mc, opts)
return listTerminatingPodByOpts(ctx, cli, opts)
}

var CheckComponentHasTerminatingPod = func(ctx context.Context, cli client.Client, mc v1beta1.Milvus, component MilvusComponent) (bool, error) {
opts := &client.ListOptions{
Namespace: mc.Namespace,
}
opts.LabelSelector = labels.SelectorFromSet(NewComponentAppLabels(mc.Name, component.Name))
return HasTerminatingPodByListOpts(ctx, cli, mc, opts)
list, err := listTerminatingPodByOpts(ctx, cli, opts)
if err != nil {
return false, err
}
return len(list.Items) > 0, nil
}

func listMilvusPods(ctx context.Context, cli client.Client, mc v1beta1.Milvus) (*corev1.PodList, error) {
opts := &client.ListOptions{
Namespace: mc.Namespace,
}
opts.LabelSelector = labels.SelectorFromSet(NewAppLabels(mc.Name))
return listPodByOpts(ctx, cli, opts)
}

var HasTerminatingPodByListOpts = func(ctx context.Context, cli client.Client, mc v1beta1.Milvus, opts *client.ListOptions) (bool, error) {
func listPodByOpts(ctx context.Context, cli client.Client, opts *client.ListOptions) (*corev1.PodList, error) {
podList := &corev1.PodList{}
if err := cli.List(ctx, podList, opts); err != nil {
return false, err
return nil, err
}
return podList, nil
}

func filterTerminatingPod(podList *corev1.PodList) *corev1.PodList {
ret := &corev1.PodList{
Items: make([]corev1.Pod, 0),
}
for _, pod := range podList.Items {
if pod.DeletionTimestamp != nil {
return true, nil
ret.Items = append(ret.Items, pod)
}
}
return false, nil
return ret
}

var CheckMilvusStopped = func(ctx context.Context, cli client.Client, mc v1beta1.Milvus) (bool, error) {
podList := &corev1.PodList{}
opts := &client.ListOptions{
Namespace: mc.Namespace,
func listTerminatingPodByOpts(ctx context.Context, cli client.Client, opts *client.ListOptions) (*corev1.PodList, error) {
podList, err := listPodByOpts(ctx, cli, opts)
if err != nil {
return nil, err
}
opts.LabelSelector = labels.SelectorFromSet(map[string]string{
AppLabelInstance: mc.GetName(),
AppLabelName: "milvus",
})
if err := cli.List(ctx, podList, opts); err != nil {
return filterTerminatingPod(podList), nil
}

var CheckMilvusStopped = func(ctx context.Context, cli client.Client, mc v1beta1.Milvus) (bool, error) {
podList, err := listMilvusPods(ctx, cli, mc)
if err != nil {
return false, err
}
if len(podList.Items) > 0 {
logger := ctrl.LoggerFrom(ctx)
logger.Info("milvus has pods not stopped", "pods count", len(podList.Items))
return false, ExecKillIfTerminatingTooLong(ctx, podList)
return false, ExecKillIfTerminating(ctx, podList)
}
return true, nil
}

func ExecKillIfTerminatingTooLong(ctx context.Context, podList *corev1.PodList) error {
func ExecKillIfTerminating(ctx context.Context, podList *corev1.PodList) error {
// we use kubectl exec to kill milvus process, because tini ignore SIGKILL
cli := rest.GetRestClient()
var ret error
Expand Down
6 changes: 3 additions & 3 deletions pkg/controllers/component_condition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,15 +277,15 @@ func TestExecKillIfTerminatingTooLong(t *testing.T) {
Items: []corev1.Pod{{}, {}},
}
t.Run("delete not sent yet", func(t *testing.T) {
err := ExecKillIfTerminatingTooLong(ctx, pods)
err := ExecKillIfTerminating(ctx, pods)
assert.NoError(t, err)
})

t.Run("kill ok", func(t *testing.T) {
pods.Items[0].DeletionTimestamp = &metav1.Time{Time: time.Now().Add(-time.Hour)}
pods.Items[1].DeletionTimestamp = &metav1.Time{Time: time.Now().Add(-time.Hour)}
mockRestClient.EXPECT().Exec(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return("", "", nil).Times(2)
err := ExecKillIfTerminatingTooLong(ctx, pods)
err := ExecKillIfTerminating(ctx, pods)
assert.NoError(t, err)
})

Expand All @@ -294,7 +294,7 @@ func TestExecKillIfTerminatingTooLong(t *testing.T) {
pods.Items[1].DeletionTimestamp = &metav1.Time{Time: time.Now().Add(-time.Hour)}
mockRestClient.EXPECT().Exec(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return("", "", errors.New("test")).Times(1)
mockRestClient.EXPECT().Exec(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return("", "", nil).Times(1)
err := ExecKillIfTerminatingTooLong(ctx, pods)
err := ExecKillIfTerminating(ctx, pods)
assert.Error(t, err)
})
}
12 changes: 12 additions & 0 deletions pkg/controllers/deployment_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
)

type deploymentUpdater interface {
Expand Down Expand Up @@ -232,6 +233,7 @@ func updateMilvusContainer(template *corev1.PodTemplateSpec, updater deploymentU
if isCreating ||
!updater.GetMilvus().IsRollingUpdateEnabled() || // rolling update is disabled
updater.GetMilvus().Spec.Com.ImageUpdateMode == v1beta1.ImageUpdateModeAll || // image update mode is update all
updater.GetMilvus().Spec.Com.ImageUpdateMode == v1beta1.ImageUpdateModeForceUpgrade ||

Check failure on line 236 in pkg/controllers/deployment_updater.go

View workflow job for this annotation

GitHub Actions / Code Check (ubuntu18.04)

undefined: v1beta1.ImageUpdateModeForceUpgrade

Check failure on line 236 in pkg/controllers/deployment_updater.go

View workflow job for this annotation

GitHub Actions / Code Check (ubuntu18.04)

undefined: v1beta1.ImageUpdateModeForceUpgrade

Check failure on line 236 in pkg/controllers/deployment_updater.go

View workflow job for this annotation

GitHub Actions / Unit Test (ubuntu18.04)

undefined: v1beta1.ImageUpdateModeForceUpgrade

Check failure on line 236 in pkg/controllers/deployment_updater.go

View workflow job for this annotation

GitHub Actions / Unit Test (ubuntu18.04)

undefined: v1beta1.ImageUpdateModeForceUpgrade
updater.RollingUpdateImageDependencyReady() {
container.Image = mergedComSpec.Image
}
Expand Down Expand Up @@ -352,6 +354,16 @@ func (m milvusDeploymentUpdater) GetInitContainers() []corev1.Container {
}

func (m milvusDeploymentUpdater) GetDeploymentStrategy() appsv1.DeploymentStrategy {
if m.Milvus.Spec.Com.ImageUpdateMode == v1beta1.ImageUpdateModeForceUpgrade {

Check failure on line 357 in pkg/controllers/deployment_updater.go

View workflow job for this annotation

GitHub Actions / Code Check (ubuntu18.04)

undefined: v1beta1.ImageUpdateModeForceUpgrade

Check failure on line 357 in pkg/controllers/deployment_updater.go

View workflow job for this annotation

GitHub Actions / Code Check (ubuntu18.04)

undefined: v1beta1.ImageUpdateModeForceUpgrade

Check failure on line 357 in pkg/controllers/deployment_updater.go

View workflow job for this annotation

GitHub Actions / Unit Test (ubuntu18.04)

undefined: v1beta1.ImageUpdateModeForceUpgrade

Check failure on line 357 in pkg/controllers/deployment_updater.go

View workflow job for this annotation

GitHub Actions / Unit Test (ubuntu18.04)

undefined: v1beta1.ImageUpdateModeForceUpgrade
all := intstr.FromString("100%")
return appsv1.DeploymentStrategy{
Type: appsv1.RollingUpdateDeploymentStrategyType,
RollingUpdate: &appsv1.RollingUpdateDeployment{
MaxSurge: &all,
MaxUnavailable: &all,
},
}
}
return m.component.GetDeploymentStrategy(m.Milvus.Spec.Conf.Data)
}

Expand Down
35 changes: 26 additions & 9 deletions pkg/controllers/status_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,17 +299,10 @@ func (r *MilvusStatusSyncer) UpdateStatusForNewGeneration(ctx context.Context, m
return err
}
UpdateCondition(&mc.Status, milvusCond)
updatedCond := GetMilvusUpdatedCondition(mc)
hasTerminatingPod, err := CheckMilvusHasTerminatingPod(ctx, r.Client, *mc)
err = r.handleTerminatingPods(ctx, mc)
if err != nil {
return err
}
if hasTerminatingPod {
updatedCond.Status = corev1.ConditionFalse
updatedCond.Reason = v1beta1.ReasonMilvusComponentsUpdating
updatedCond.Message = v1beta1.MsgMilvusHasTerminatingPods
return errors.Wrap(err, "handle terminating pods failed")
}
UpdateCondition(&mc.Status, updatedCond)

statusInfo := MilvusHealthStatusInfo{
LastState: mc.Status.Status,
Expand All @@ -325,6 +318,30 @@ func (r *MilvusStatusSyncer) UpdateStatusForNewGeneration(ctx context.Context, m
return r.Status().Update(ctx, mc)
}

func (r *MilvusStatusSyncer) handleTerminatingPods(ctx context.Context, mc *v1beta1.Milvus) error {
updatedCond := GetMilvusUpdatedCondition(mc)
terminatingPodList, err := ListMilvusTerminatingPods(ctx, r.Client, *mc)
if err != nil {
return err
}
hasTerminatingPod := len(terminatingPodList.Items) > 0
if hasTerminatingPod {
updatedCond.Status = corev1.ConditionFalse
updatedCond.Reason = v1beta1.ReasonMilvusComponentsUpdating
updatedCond.Message = v1beta1.MsgMilvusHasTerminatingPods

if mc.Spec.Com.ImageUpdateMode == v1beta1.ImageUpdateModeForceUpgrade {

Check failure on line 333 in pkg/controllers/status_cluster.go

View workflow job for this annotation

GitHub Actions / Code Check (ubuntu18.04)

undefined: v1beta1.ImageUpdateModeForceUpgrade

Check failure on line 333 in pkg/controllers/status_cluster.go

View workflow job for this annotation

GitHub Actions / Code Check (ubuntu18.04)

undefined: v1beta1.ImageUpdateModeForceUpgrade

Check failure on line 333 in pkg/controllers/status_cluster.go

View workflow job for this annotation

GitHub Actions / Unit Test (ubuntu18.04)

undefined: v1beta1.ImageUpdateModeForceUpgrade

Check failure on line 333 in pkg/controllers/status_cluster.go

View workflow job for this annotation

GitHub Actions / Unit Test (ubuntu18.04)

undefined: v1beta1.ImageUpdateModeForceUpgrade
err := ExecKillIfTerminating(ctx, terminatingPodList)
if err != nil {
// not fatal, so we just print it
r.logger.Error(err, "kill terminating pod failed")
}
}
}
UpdateCondition(&mc.Status, updatedCond)
return nil
}

var replicaUpdater replicaUpdaterInterface = new(replicaUpdaterImpl)

type replicaUpdaterInterface interface {
Expand Down
8 changes: 4 additions & 4 deletions pkg/controllers/status_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,12 +241,12 @@ func TestStatusSyncer_UpdateStatusRoutine(t *testing.T) {
s := NewMilvusStatusSyncer(ctx, mockCli, logger)

// mock hasTerminatingPod
bak := CheckMilvusHasTerminatingPod
CheckMilvusHasTerminatingPod = func(ctx context.Context, cli client.Client, mc v1beta1.Milvus) (bool, error) {
return false, nil
bak := ListMilvusTerminatingPods
ListMilvusTerminatingPods = func(ctx context.Context, cli client.Client, mc v1beta1.Milvus) (*corev1.PodList, error) {
return &corev1.PodList{}, nil
}
defer func() {
CheckMilvusHasTerminatingPod = bak
ListMilvusTerminatingPods = bak
}()

// default status not set
Expand Down

0 comments on commit fae2377

Please sign in to comment.