diff --git a/apis/milvus.io/v1beta1/components_types.go b/apis/milvus.io/v1beta1/components_types.go index 732d7b91..d1eb24a6 100644 --- a/apis/milvus.io/v1beta1/components_types.go +++ b/apis/milvus.io/v1beta1/components_types.go @@ -105,13 +105,17 @@ const ( ImageUpdateModeRollingDowngrade ImageUpdateMode = "rollingDowngrade" // ImageUpdateModeAll means all components' image will be updated immediately to spec.image ImageUpdateModeAll ImageUpdateMode = "all" + // ImageUpdateModeForceUpgrade means all components' image will be updated immediately to spec.image + // and kills the terminated pods to speed up the process + ImageUpdateModeForce ImageUpdateMode = "force" ) type MilvusComponents struct { ComponentSpec `json:",inline"` // ImageUpdateMode is how the milvus components' image should be updated. works only when rolling update is enabled. - // +kubebuilder:validation:Enum=rollingUpgrade;rollingDowngrade;all + // forceUpgrade will update all pods' image immediately, and kills the terminated pods to speed up the process + // +kubebuilder:validation:Enum=rollingUpgrade;rollingDowngrade;all;force // +kubebuilder:default:="rollingUpgrade" // +kubebuilder:validation:Optional ImageUpdateMode ImageUpdateMode `json:"imageUpdateMode,omitempty"` diff --git a/charts/milvus-operator/templates/crds.yaml b/charts/milvus-operator/templates/crds.yaml index 2908d08f..31a7a74b 100644 --- a/charts/milvus-operator/templates/crds.yaml +++ b/charts/milvus-operator/templates/crds.yaml @@ -1539,6 +1539,7 @@ spec: - rollingUpgrade - rollingDowngrade - all + - force type: string indexCoord: properties: @@ -8803,6 +8804,7 @@ spec: - rollingUpgrade - rollingDowngrade - all + - force type: string indexCoord: properties: diff --git a/config/crd/bases/milvus.io_milvusclusters.yaml b/config/crd/bases/milvus.io_milvusclusters.yaml index 7e9900e2..7a2163c8 100644 --- a/config/crd/bases/milvus.io_milvusclusters.yaml +++ b/config/crd/bases/milvus.io_milvusclusters.yaml @@ -1538,6 +1538,7 @@ spec: - rollingUpgrade - rollingDowngrade - all + - force type: string indexCoord: properties: diff --git a/config/crd/bases/milvus.io_milvuses.yaml b/config/crd/bases/milvus.io_milvuses.yaml index 00a2587a..0fe57e7c 100644 --- a/config/crd/bases/milvus.io_milvuses.yaml +++ b/config/crd/bases/milvus.io_milvuses.yaml @@ -2476,6 +2476,7 @@ spec: - rollingUpgrade - rollingDowngrade - all + - force type: string indexCoord: properties: diff --git a/deploy/manifests/deployment.yaml b/deploy/manifests/deployment.yaml index 796f4944..010a1134 100644 --- a/deploy/manifests/deployment.yaml +++ b/deploy/manifests/deployment.yaml @@ -1570,6 +1570,7 @@ spec: - rollingUpgrade - rollingDowngrade - all + - force type: string indexCoord: properties: @@ -8835,6 +8836,7 @@ spec: - rollingUpgrade - rollingDowngrade - all + - force type: string indexCoord: properties: diff --git a/pkg/controllers/component_condition.go b/pkg/controllers/component_condition.go index c7c4022c..445fd462 100644 --- a/pkg/controllers/component_condition.go +++ b/pkg/controllers/component_condition.go @@ -159,12 +159,12 @@ func GetComponentConditionGetter() ComponentConditionGetter { var singletonComponentConditionGetter ComponentConditionGetter = ComponentConditionGetterImpl{} -var CheckMilvusHasTerminatingPod = func(ctx context.Context, cli client.Client, mc v1beta1.Milvus) (bool, error) { +var ListMilvusTerminatingPods = func(ctx context.Context, cli client.Client, mc v1beta1.Milvus) (*corev1.PodList, error) { opts := &client.ListOptions{ Namespace: mc.Namespace, } opts.LabelSelector = labels.SelectorFromSet(NewAppLabels(mc.Name)) - return HasTerminatingPodByListOpts(ctx, cli, mc, opts) + return listTerminatingPodByOpts(ctx, cli, opts) } var CheckComponentHasTerminatingPod = func(ctx context.Context, cli client.Client, mc v1beta1.Milvus, component MilvusComponent) (bool, error) { @@ -172,43 +172,63 @@ var CheckComponentHasTerminatingPod = func(ctx context.Context, cli client.Clien Namespace: mc.Namespace, } opts.LabelSelector = labels.SelectorFromSet(NewComponentAppLabels(mc.Name, component.Name)) - return HasTerminatingPodByListOpts(ctx, cli, mc, opts) + list, err := listTerminatingPodByOpts(ctx, cli, opts) + if err != nil { + return false, err + } + return len(list.Items) > 0, nil +} + +func listMilvusPods(ctx context.Context, cli client.Client, mc v1beta1.Milvus) (*corev1.PodList, error) { + opts := &client.ListOptions{ + Namespace: mc.Namespace, + } + opts.LabelSelector = labels.SelectorFromSet(NewAppLabels(mc.Name)) + return listPodByOpts(ctx, cli, opts) } -var HasTerminatingPodByListOpts = func(ctx context.Context, cli client.Client, mc v1beta1.Milvus, opts *client.ListOptions) (bool, error) { +func listPodByOpts(ctx context.Context, cli client.Client, opts *client.ListOptions) (*corev1.PodList, error) { podList := &corev1.PodList{} if err := cli.List(ctx, podList, opts); err != nil { - return false, err + return nil, err + } + return podList, nil +} + +func filterTerminatingPod(podList *corev1.PodList) *corev1.PodList { + ret := &corev1.PodList{ + Items: make([]corev1.Pod, 0), } for _, pod := range podList.Items { if pod.DeletionTimestamp != nil { - return true, nil + ret.Items = append(ret.Items, pod) } } - return false, nil + return ret } -var CheckMilvusStopped = func(ctx context.Context, cli client.Client, mc v1beta1.Milvus) (bool, error) { - podList := &corev1.PodList{} - opts := &client.ListOptions{ - Namespace: mc.Namespace, +func listTerminatingPodByOpts(ctx context.Context, cli client.Client, opts *client.ListOptions) (*corev1.PodList, error) { + podList, err := listPodByOpts(ctx, cli, opts) + if err != nil { + return nil, err } - opts.LabelSelector = labels.SelectorFromSet(map[string]string{ - AppLabelInstance: mc.GetName(), - AppLabelName: "milvus", - }) - if err := cli.List(ctx, podList, opts); err != nil { + return filterTerminatingPod(podList), nil +} + +var CheckMilvusStopped = func(ctx context.Context, cli client.Client, mc v1beta1.Milvus) (bool, error) { + podList, err := listMilvusPods(ctx, cli, mc) + if err != nil { return false, err } if len(podList.Items) > 0 { logger := ctrl.LoggerFrom(ctx) logger.Info("milvus has pods not stopped", "pods count", len(podList.Items)) - return false, ExecKillIfTerminatingTooLong(ctx, podList) + return false, ExecKillIfTerminating(ctx, podList) } return true, nil } -func ExecKillIfTerminatingTooLong(ctx context.Context, podList *corev1.PodList) error { +func ExecKillIfTerminating(ctx context.Context, podList *corev1.PodList) error { // we use kubectl exec to kill milvus process, because tini ignore SIGKILL cli := rest.GetRestClient() var ret error diff --git a/pkg/controllers/component_condition_test.go b/pkg/controllers/component_condition_test.go index 3bcf42d1..9deb3db7 100644 --- a/pkg/controllers/component_condition_test.go +++ b/pkg/controllers/component_condition_test.go @@ -277,7 +277,7 @@ func TestExecKillIfTerminatingTooLong(t *testing.T) { Items: []corev1.Pod{{}, {}}, } t.Run("delete not sent yet", func(t *testing.T) { - err := ExecKillIfTerminatingTooLong(ctx, pods) + err := ExecKillIfTerminating(ctx, pods) assert.NoError(t, err) }) @@ -285,7 +285,7 @@ func TestExecKillIfTerminatingTooLong(t *testing.T) { pods.Items[0].DeletionTimestamp = &metav1.Time{Time: time.Now().Add(-time.Hour)} pods.Items[1].DeletionTimestamp = &metav1.Time{Time: time.Now().Add(-time.Hour)} mockRestClient.EXPECT().Exec(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return("", "", nil).Times(2) - err := ExecKillIfTerminatingTooLong(ctx, pods) + err := ExecKillIfTerminating(ctx, pods) assert.NoError(t, err) }) @@ -294,7 +294,7 @@ func TestExecKillIfTerminatingTooLong(t *testing.T) { pods.Items[1].DeletionTimestamp = &metav1.Time{Time: time.Now().Add(-time.Hour)} mockRestClient.EXPECT().Exec(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return("", "", errors.New("test")).Times(1) mockRestClient.EXPECT().Exec(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return("", "", nil).Times(1) - err := ExecKillIfTerminatingTooLong(ctx, pods) + err := ExecKillIfTerminating(ctx, pods) assert.Error(t, err) }) } diff --git a/pkg/controllers/deployment_updater.go b/pkg/controllers/deployment_updater.go index 316532f7..cd01eacd 100644 --- a/pkg/controllers/deployment_updater.go +++ b/pkg/controllers/deployment_updater.go @@ -9,6 +9,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" runtime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/intstr" ) type deploymentUpdater interface { @@ -232,6 +233,7 @@ func updateMilvusContainer(template *corev1.PodTemplateSpec, updater deploymentU if isCreating || !updater.GetMilvus().IsRollingUpdateEnabled() || // rolling update is disabled updater.GetMilvus().Spec.Com.ImageUpdateMode == v1beta1.ImageUpdateModeAll || // image update mode is update all + updater.GetMilvus().Spec.Com.ImageUpdateMode == v1beta1.ImageUpdateModeForceUpgrade || updater.RollingUpdateImageDependencyReady() { container.Image = mergedComSpec.Image } @@ -352,6 +354,16 @@ func (m milvusDeploymentUpdater) GetInitContainers() []corev1.Container { } func (m milvusDeploymentUpdater) GetDeploymentStrategy() appsv1.DeploymentStrategy { + if m.Milvus.Spec.Com.ImageUpdateMode == v1beta1.ImageUpdateModeForceUpgrade { + all := intstr.FromString("100%") + return appsv1.DeploymentStrategy{ + Type: appsv1.RollingUpdateDeploymentStrategyType, + RollingUpdate: &appsv1.RollingUpdateDeployment{ + MaxSurge: &all, + MaxUnavailable: &all, + }, + } + } return m.component.GetDeploymentStrategy(m.Milvus.Spec.Conf.Data) } diff --git a/pkg/controllers/status_cluster.go b/pkg/controllers/status_cluster.go index fbec2629..abbe0ddd 100644 --- a/pkg/controllers/status_cluster.go +++ b/pkg/controllers/status_cluster.go @@ -299,17 +299,10 @@ func (r *MilvusStatusSyncer) UpdateStatusForNewGeneration(ctx context.Context, m return err } UpdateCondition(&mc.Status, milvusCond) - updatedCond := GetMilvusUpdatedCondition(mc) - hasTerminatingPod, err := CheckMilvusHasTerminatingPod(ctx, r.Client, *mc) + err = r.handleTerminatingPods(ctx, mc) if err != nil { - return err - } - if hasTerminatingPod { - updatedCond.Status = corev1.ConditionFalse - updatedCond.Reason = v1beta1.ReasonMilvusComponentsUpdating - updatedCond.Message = v1beta1.MsgMilvusHasTerminatingPods + return errors.Wrap(err, "handle terminating pods failed") } - UpdateCondition(&mc.Status, updatedCond) statusInfo := MilvusHealthStatusInfo{ LastState: mc.Status.Status, @@ -325,6 +318,30 @@ func (r *MilvusStatusSyncer) UpdateStatusForNewGeneration(ctx context.Context, m return r.Status().Update(ctx, mc) } +func (r *MilvusStatusSyncer) handleTerminatingPods(ctx context.Context, mc *v1beta1.Milvus) error { + updatedCond := GetMilvusUpdatedCondition(mc) + terminatingPodList, err := ListMilvusTerminatingPods(ctx, r.Client, *mc) + if err != nil { + return err + } + hasTerminatingPod := len(terminatingPodList.Items) > 0 + if hasTerminatingPod { + updatedCond.Status = corev1.ConditionFalse + updatedCond.Reason = v1beta1.ReasonMilvusComponentsUpdating + updatedCond.Message = v1beta1.MsgMilvusHasTerminatingPods + + if mc.Spec.Com.ImageUpdateMode == v1beta1.ImageUpdateModeForceUpgrade { + err := ExecKillIfTerminating(ctx, terminatingPodList) + if err != nil { + // not fatal, so we just print it + r.logger.Error(err, "kill terminating pod failed") + } + } + } + UpdateCondition(&mc.Status, updatedCond) + return nil +} + var replicaUpdater replicaUpdaterInterface = new(replicaUpdaterImpl) type replicaUpdaterInterface interface { diff --git a/pkg/controllers/status_cluster_test.go b/pkg/controllers/status_cluster_test.go index d56ab3ed..8a0bbec5 100644 --- a/pkg/controllers/status_cluster_test.go +++ b/pkg/controllers/status_cluster_test.go @@ -241,12 +241,12 @@ func TestStatusSyncer_UpdateStatusRoutine(t *testing.T) { s := NewMilvusStatusSyncer(ctx, mockCli, logger) // mock hasTerminatingPod - bak := CheckMilvusHasTerminatingPod - CheckMilvusHasTerminatingPod = func(ctx context.Context, cli client.Client, mc v1beta1.Milvus) (bool, error) { - return false, nil + bak := ListMilvusTerminatingPods + ListMilvusTerminatingPods = func(ctx context.Context, cli client.Client, mc v1beta1.Milvus) (*corev1.PodList, error) { + return &corev1.PodList{}, nil } defer func() { - CheckMilvusHasTerminatingPod = bak + ListMilvusTerminatingPods = bak }() // default status not set