diff --git a/internal/task/init_rainbond_cluster.go b/internal/task/init_rainbond_cluster.go index 357f84c..d2c87c3 100644 --- a/internal/task/init_rainbond_cluster.go +++ b/internal/task/init_rainbond_cluster.go @@ -22,7 +22,12 @@ import ( "context" "encoding/json" "fmt" + "goodrain.com/cloud-adaptor/internal/adaptor/rke" + "k8s.io/apimachinery/pkg/fields" + ktype "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" "runtime/debug" + "strings" "time" rainbondv1alpha1 "github.com/goodrain/rainbond-operator/api/v1alpha1" @@ -30,11 +35,11 @@ import ( "github.com/rancher/rke/k8s" "github.com/sirupsen/logrus" apiv1 "goodrain.com/cloud-adaptor/api/cloud-adaptor/v1" + ccv1 "goodrain.com/cloud-adaptor/api/cloud-adaptor/v1" "goodrain.com/cloud-adaptor/internal/adaptor/factory" "goodrain.com/cloud-adaptor/internal/types" "goodrain.com/cloud-adaptor/internal/usecase" "goodrain.com/cloud-adaptor/pkg/util/constants" - "goodrain.com/cloud-adaptor/pkg/util/versionutil" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -52,87 +57,172 @@ func (c *InitRainbondCluster) rollback(step, message, status string) { c.result <- apiv1.Message{StepType: step, Message: message, Status: status} } -func (c *InitRainbondCluster) Rollback(step, message, status string) { - if status == "failure" { - logrus.Errorf("%s failure, Message: %s", step, message) +// CheckKubernetesStatus Check kubernetes status +func (c *InitRainbondCluster) CheckKubernetesStatus(clientset *kubernetes.Clientset) (bool, error) { + nodeList, err := clientset.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{}) + if err != nil { + return false, err } - c.result <- apiv1.Message{StepType: step, Message: message, Status: status} + if len(nodeList.Items) == 0 { + return false, nil + } + return true, err } -// Run run take time 214.10s -func (c *InitRainbondCluster) Run(ctx context.Context) { - defer c.rollback("Close", "", "") - c.rollback("Init", "", "start") - // create adaptor - adaptor, err := factory.GetCloudFactory().GetRainbondClusterAdaptor(c.config.Provider, c.config.AccessKey, c.config.SecretKey) - if err != nil { - c.rollback("Init", fmt.Sprintf("create cloud adaptor failure %s", err.Error()), "failure") - return - } +func (c *InitRainbondCluster) CheckOperatorStatus(ctx context.Context, clientset *kubernetes.Clientset) error { + //通过一个定时器来控制检测时间 + ticker := time.NewTicker(time.Second * 5) + timer := time.NewTimer(time.Minute * 60) + defer ticker.Stop() + defer timer.Stop() + for { + select { + case <-ctx.Done(): + return fmt.Errorf("context cancel") + case <-ticker.C: + case <-timer.C: + return nil + } - c.rollback("Init", "cloud adaptor create success", "success") - c.rollback("CheckCluster", "", "start") - // get kubernetes cluster info - cluster, err := adaptor.DescribeCluster(c.config.EnterpriseID, c.config.ClusterID) - if err != nil { - cluster, err = adaptor.DescribeCluster(c.config.EnterpriseID, c.config.ClusterID) + roPods, err := clientset.CoreV1().Pods(constants.Namespace).List(ctx, metav1.ListOptions{ + LabelSelector: fields.SelectorFromSet(map[string]string{ + "release": "rainbond", + }).String(), + }) if err != nil { - c.rollback("CheckCluster", err.Error(), "failure") - return + return fmt.Errorf("get rainbond-operator pod failed:%s", err) + } + + if len(roPods.Items) == 0 { + continue + } + if roPods.Items[0].Status.Phase == "Running" { + break } } - // check cluster status - if cluster.State != "running" { - c.rollback("CheckCluster", fmt.Sprintf("cluster status is %s,not support init rainbond", cluster.State), "failure") - return + c.rollback("InitRainbondOperator", "", "success") + return nil +} + +func (c *InitRainbondCluster) CheckClusterStatus(ctx context.Context) error { + adapter, _ := rke.Create() + kubeConfig, err := adapter.GetKubeConfig(c.config.EnterpriseID, c.config.ClusterID) + _, runtimeClient, err := kubeConfig.GetKubeClient() + if err != nil { + logrus.Infof("get kubeclient failure %s", err.Error()) + return err } - // check cluster version - if !versionutil.CheckVersion(cluster.KubernetesVersion) { - c.rollback("CheckCluster", fmt.Sprintf("current cluster version is %s, init rainbond support kubernetes version is 1.19.x-1.29.x", cluster.KubernetesVersion), "failure") - return + + var cluster rainbondv1alpha1.RainbondCluster + ticker := time.NewTicker(time.Second * 10) + timer := time.NewTimer(time.Minute * 60) + defer timer.Stop() + defer ticker.Stop() + var initRainbondCluster bool + for { + select { + case <-ctx.Done(): + return fmt.Errorf("context cancel") + case <-ticker.C: + case <-timer.C: + return fmt.Errorf("check cluster status failure") + } + ctx2, cancel2 := context.WithTimeout(context.Background(), time.Second*5) + defer cancel2() + err = runtimeClient.Get(ctx2, ktype.NamespacedName{Name: constants.RainbondCluster, Namespace: constants.Namespace}, &cluster) + if err != nil { + logrus.Errorf("get cluster failure %s", err.Error()) + return err + } + //获取到cluster信息后,进行数据校验 + for _, condition := range cluster.Status.Conditions { + if condition.Type == rainbondv1alpha1.RainbondClusterConditionTypeStorage { + continue + } + + status, msg := c.HandleClusterStatus(cluster, condition.Type) + if strings.Contains(msg.Message, "not ready") { + break + } + if !status && msg.Message != "" { + c.rollback(msg.StepType, msg.Message, "failure") + return fmt.Errorf("get clusterType %s failure%s:", msg.StepType, msg.Message) + } + //更新状态为成功 + c.rollback(msg.StepType, "", "success") + logrus.Infof("get clusterType %s success", msg.StepType) + if condition.Type == "ImageRepository" { + initRainbondCluster = true + } + } + if initRainbondCluster { + break + } } - // check cluster connection status - logrus.Infof("init kubernetes url %s", cluster.MasterURL) - if cluster.MasterURL.APIServerEndpoint == "" { - c.rollback("CheckCluster", "cluster api not open eip,not support init rainbond", "failure") + c.rollback("InitRainbondCluster", "", "success") + + return nil +} + +func (c *InitRainbondCluster) HandleClusterStatus(cluster rainbondv1alpha1.RainbondCluster, clusterType rainbondv1alpha1.RainbondClusterConditionType) (status bool, msg ccv1.Message) { + //如果成功就更新状态 + if idx, condition := cluster.Status.GetCondition((clusterType)); idx != -1 && condition.Status == v1.ConditionTrue { + status = true + msg.StepType = string(condition.Type) + } else if condition.Status == v1.ConditionFalse { + // 拿到这里面的一些报错信息去展示,并且退出本次安装 + msg.Status = string(condition.Status) + msg.Message = condition.Message + msg.StepType = string(condition.Type) + status = false return } + return +} +// Run run take time 214.10s +func (c *InitRainbondCluster) Run(ctx context.Context) { + defer c.rollback("Close", "", "") + c.rollback("Init", "", "start") + adaptor, err := factory.GetCloudFactory().GetRainbondClusterAdaptor(c.config.Provider, c.config.AccessKey, c.config.SecretKey) kubeConfig, err := adaptor.GetKubeConfig(c.config.EnterpriseID, c.config.ClusterID) if err != nil { kubeConfig, err = adaptor.GetKubeConfig(c.config.EnterpriseID, c.config.ClusterID) if err != nil { + logrus.Errorf("get kubeconfig failure:%s", err.Error()) c.rollback("CheckCluster", fmt.Sprintf("get kube config failure %s", err.Error()), "failure") return } } - - // check cluster not init rainbond coreClient, _, err := kubeConfig.GetKubeClient() if err != nil { c.rollback("CheckCluster", fmt.Sprintf("get kube config failure %s", err.Error()), "failure") return } - // get cluster node lists - getctx, cancel := context.WithTimeout(ctx, time.Second*10) - nodes, err := coreClient.CoreV1().Nodes().List(getctx, metav1.ListOptions{}) + // 检测k8s状态 + status, err := c.CheckKubernetesStatus(coreClient) + if !status { + c.rollback("CheckKubernetes", fmt.Sprintf("Kubernetes connection failed %s", err.Error()), "failure") + logrus.Errorf("Kubernetes connection failed") + return + } + c.rollback("CheckKubernetes", c.config.ClusterID, "success") + + //安装后检测operator的状态 + err = c.CheckOperatorStatus(ctx, coreClient) if err != nil { - nodes, err = coreClient.CoreV1().Nodes().List(getctx, metav1.ListOptions{}) - cancel() - if err != nil { - logrus.Errorf("get kubernetes cluster node failure %s", err.Error()) - c.rollback("CheckCluster", "cluster node list can not found, please check cluster public access and account authorization", "failure") - return - } - } else { - cancel() + c.rollback("CheckOperator", fmt.Sprintf("operator check failed %s", err.Error()), "failure") + logrus.Errorf("operator detection failed %s", err.Error()) + return } - if len(nodes.Items) == 0 { - c.rollback("CheckCluster", "node num is 0, can not init rainbond", "failure") + //检测cluster的状态 + err = c.CheckClusterStatus(ctx) + if err != nil { + logrus.Errorf("detection failed cluster: %s", err) return } - c.rollback("CheckCluster", c.config.ClusterID, "success") + } // GetRainbondGatewayNodeAndChaosNodes get gateway nodes diff --git a/internal/usecase/cluster.go b/internal/usecase/cluster.go index 6df59a3..e8fae67 100644 --- a/internal/usecase/cluster.go +++ b/internal/usecase/cluster.go @@ -733,6 +733,13 @@ func (c *ClusterUsecase) CreateTaskEvent(em *v1.EventMessage) (*model.TaskEvent, } logrus.Infof("set init task %s status is inited", em.TaskID) } + if em.Message.StepType == "InitRainbondCluster" && em.Message.Status == "success" { + if err := initRainbondTaskRepo.UpdateStatus(em.EnterpriseID, em.TaskID, "inited"); err != nil && err != gorm.ErrRecordNotFound { + ctx.Rollback() + return nil, err + } + logrus.Infof("set init task %s status is inited", em.TaskID) + } if em.Message.StepType == "UpdateKubernetes" && em.Message.Status == "success" { if err := c.UpdateKubernetesTaskRepo.Transaction(ctx).UpdateStatus(em.EnterpriseID, em.TaskID, "complete"); err != nil && err != gorm.ErrRecordNotFound { ctx.Rollback() diff --git a/pkg/util/constants/constants.go b/pkg/util/constants/constants.go index 888b78e..9881687 100644 --- a/pkg/util/constants/constants.go +++ b/pkg/util/constants/constants.go @@ -29,4 +29,6 @@ const ( CloudUpdate = "cloud-update" // Namespace is the namespace for rainbond-operator and rainbond components Namespace = "rbd-system" + // RainbondCluster - + RainbondCluster = "rainbondcluster" )