diff --git a/pkg/kubenest/constants/constant.go b/pkg/kubenest/constants/constant.go index 478b5cc1e..c427fbcae 100644 --- a/pkg/kubenest/constants/constant.go +++ b/pkg/kubenest/constants/constant.go @@ -51,6 +51,7 @@ const ( EtcdDataVolumeName = "etcd-data" EtcdListenClientPort = 2379 EtcdListenPeerPort = 2380 + EtcdSuffix = "-etcd-client" //controlplane kube-controller KubeControllerReplicas = 2 @@ -80,6 +81,10 @@ const ( // DeInitAction represents delete virtual cluster instance DeInitAction Action = "deInit" + //host_port_manager + HostPortsCMName = "kosmos-hostports" + HostPortsCMDataName = "config.yaml" + ManifestComponentsConfigmap = "components-manifest-cm" NodePoolConfigmap = "node-pool" NodeVirtualclusterState = "virtualcluster" diff --git a/pkg/kubenest/controller/virtualcluster.node.controller/hostport_manager.go b/pkg/kubenest/controller/virtualcluster.node.controller/hostport_manager.go index aee51c25a..beba1de91 100644 --- a/pkg/kubenest/controller/virtualcluster.node.controller/hostport_manager.go +++ b/pkg/kubenest/controller/virtualcluster.node.controller/hostport_manager.go @@ -8,6 +8,8 @@ import ( "gopkg.in/yaml.v3" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + + "github.com/kosmos.io/kosmos/pkg/kubenest/constants" ) /** @@ -46,13 +48,11 @@ type ClusterPort struct { } func NewHostPortManager(client kubernetes.Interface) (*HostPortManager, error) { - //todo magic Value - hostPorts, err := client.CoreV1().ConfigMaps("kosmos-system").Get(context.TODO(), "kosmos-hostports", metav1.GetOptions{}) + hostPorts, err := client.CoreV1().ConfigMaps(constants.KosmosNs).Get(context.TODO(), constants.HostPortsCMName, metav1.GetOptions{}) if err != nil { return nil, err } - //todo magic Value - yamlData, exist := hostPorts.Data["config.yaml"] + yamlData, exist := hostPorts.Data[constants.HostPortsCMDataName] if !exist { return nil, fmt.Errorf("hostports not found in configmap") } @@ -68,35 +68,72 @@ func NewHostPortManager(client kubernetes.Interface) (*HostPortManager, error) { return manager, nil } -func (m *HostPortManager) AllocateHostIP(clusterName string) (int32, error) { +func (m *HostPortManager) AllocateHostPort(clusterName string) (int32, error) { m.lock.Lock() defer m.lock.Unlock() + + //使用临时变量存储原来的cm + oldHostPool := m.HostPortPool + for _, port := range m.HostPortPool.PortsPool { if !m.isPortAllocated(port) { m.HostPortPool.ClusterPorts = append(m.HostPortPool.ClusterPorts, ClusterPort{Port: port, Cluster: clusterName}) - m.HostPortPool.ClusterPorts = append(m.HostPortPool.ClusterPorts, ClusterPort{Port: port, Cluster: clusterName}) - return port, nil + err := updateConfigMapAndRollback(m, oldHostPool) + if err != nil { + return 0, err + } + return port, err } } - // todo 更新 cm return 0, fmt.Errorf("no available ports to allocate") } -func (m *HostPortManager) ReleaseHostIP(clusterName string) error { +func (m *HostPortManager) ReleaseHostPort(clusterName string) error { m.lock.Lock() defer m.lock.Unlock() + oldHostPool := m.HostPortPool + for i, cp := range m.HostPortPool.ClusterPorts { if cp.Cluster == clusterName { // Remove the entry from the slice m.HostPortPool.ClusterPorts = append(m.HostPortPool.ClusterPorts[:i], m.HostPortPool.ClusterPorts[i+1:]...) + err := updateConfigMapAndRollback(m, oldHostPool) + if err != nil { + return err + } return nil } } - // todo 更新 cm return fmt.Errorf("no port found for cluster %s", clusterName) } +func updateConfigMapAndRollback(m *HostPortManager, oldHostPool *HostPortPool) error { + data, err := yaml.Marshal(m.HostPortPool) + if err != nil { + m.HostPortPool = oldHostPool + return err + } + + configMap, err := m.kubeClient.CoreV1().ConfigMaps(constants.KosmosNs).Get(context.TODO(), constants.HostPortsCMName, metav1.GetOptions{}) + if err != nil { + m.HostPortPool = oldHostPool + return err + } + + configMap.Data[constants.HostPortsCMDataName] = string(data) + + _, updateErr := m.kubeClient.CoreV1().ConfigMaps(constants.KosmosNs).Update(context.TODO(), configMap, metav1.UpdateOptions{}) + + if updateErr != nil { + // 回滚 HostPortPool + m.HostPortPool = oldHostPool + return updateErr + } + + return nil +} + func (m *HostPortManager) isPortAllocated(port int32) bool { for _, cp := range m.HostPortPool.ClusterPorts { if cp.Port == port { diff --git a/pkg/kubenest/controlplane/apiserver.go b/pkg/kubenest/controlplane/apiserver.go index 7df18505d..ca6cc2bc4 100644 --- a/pkg/kubenest/controlplane/apiserver.go +++ b/pkg/kubenest/controlplane/apiserver.go @@ -16,12 +16,12 @@ import ( ) func EnsureVirtualClusterAPIServer(client clientset.Interface, name, namespace string, manager *vcnodecontroller.HostPortManager) error { - _, err := manager.AllocateHostIP(name) + port, err := manager.AllocateHostPort(name) if err != nil { return fmt.Errorf("failed to allocate host ip for virtual cluster apiserver, err: %w", err) } - if err := installAPIServer(client, name, namespace); err != nil { + if err := installAPIServer(client, name, namespace, port); err != nil { return fmt.Errorf("failed to install virtual cluster apiserver, err: %w", err) } return nil @@ -35,9 +35,9 @@ func DeleteVirtualClusterAPIServer(client clientset.Interface, name, namespace s return nil } -func installAPIServer(client clientset.Interface, name, namespace string) error { +func installAPIServer(client clientset.Interface, name, namespace string, port int32) error { imageRepository, imageVersion := util.GetImageMessage() - err, clusterIps := util.GetEtcdServiceClusterIp(namespace, client) + clusterIp, err := util.GetEtcdServiceClusterIp(namespace, name+constants.EtcdSuffix, client) if err != nil { return nil } @@ -47,17 +47,19 @@ func installAPIServer(client clientset.Interface, name, namespace string) error ServiceSubnet, VirtualClusterCertsSecret, EtcdCertsSecret string Replicas int32 EtcdListenClientPort int32 + ClusterPort int32 }{ DeploymentName: fmt.Sprintf("%s-%s", name, "apiserver"), Namespace: namespace, ImageRepository: imageRepository, Version: imageVersion, - EtcdClientService: clusterIps[1], + EtcdClientService: clusterIp, ServiceSubnet: constants.ApiServerServiceSubnet, VirtualClusterCertsSecret: fmt.Sprintf("%s-%s", name, "cert"), EtcdCertsSecret: fmt.Sprintf("%s-%s", name, "etcd-cert"), Replicas: constants.ApiServerReplicas, EtcdListenClientPort: constants.ApiServerEtcdListenClientPort, + ClusterPort: port, }) if err != nil { return fmt.Errorf("error when parsing virtual cluster apiserver deployment template: %w", err) diff --git a/pkg/kubenest/manifest/controlplane/apiserver/mainfests_deployment.go b/pkg/kubenest/manifest/controlplane/apiserver/mainfests_deployment.go index b54f1a752..0f2c59ed4 100644 --- a/pkg/kubenest/manifest/controlplane/apiserver/mainfests_deployment.go +++ b/pkg/kubenest/manifest/controlplane/apiserver/mainfests_deployment.go @@ -42,7 +42,7 @@ spec: - --kubelet-client-certificate=/etc/virtualcluster/pki/virtualCluster.crt - --kubelet-client-key=/etc/virtualcluster/pki/virtualCluster.key - --kubelet-preferred-address-types=InternalIP,ExternalIP,Hostname - - --secure-port=5443 + - --secure-port={{ .ClusterPort }} - --service-account-issuer=https://kubernetes.default.svc.cluster.local - --service-account-key-file=/etc/virtualcluster/pki/virtualCluster.key - --service-account-signing-key-file=/etc/virtualcluster/pki/virtualCluster.key @@ -64,7 +64,7 @@ spec: failureThreshold: 8 httpGet: path: /livez - port: 5443 + port: {{ .ClusterPort }} scheme: HTTPS initialDelaySeconds: 10 periodSeconds: 10 @@ -74,7 +74,7 @@ spec: failureThreshold: 3 httpGet: path: /readyz - port: 5443 + port: {{ .ClusterPort }} scheme: HTTPS initialDelaySeconds: 10 periodSeconds: 10 @@ -91,7 +91,7 @@ spec: - apiserver topologyKey: kubernetes.io/hostname ports: - - containerPort: 5443 + - containerPort: {{ .ClusterPort }} name: http protocol: TCP volumeMounts: diff --git a/pkg/kubenest/util/address.go b/pkg/kubenest/util/address.go index 2061a6e14..7e8a7175b 100644 --- a/pkg/kubenest/util/address.go +++ b/pkg/kubenest/util/address.go @@ -67,18 +67,16 @@ func GetServiceClusterIp(namespace string, client clientset.Interface) (error, [ return nil, clusterIps } -func GetEtcdServiceClusterIp(namespace string, client clientset.Interface) (error, []string) { - serviceLists, err := client.CoreV1().Services(namespace).List(context.TODO(), metav1.ListOptions{}) +func GetEtcdServiceClusterIp(namespace string, serviceName string, client clientset.Interface) (string, error) { + service, err := client.CoreV1().Services(namespace).Get(context.TODO(), serviceName, metav1.GetOptions{}) if err != nil { - return err, nil + return "", err } - var clusterIps []string - if serviceLists != nil { - for _, service := range serviceLists.Items { - if service.Spec.Type == constants.EtcdServiceType && service.Spec.ClusterIP != "" { - clusterIps = append(clusterIps, service.Spec.ClusterIP) - } - } + + // 检查服务是否是期望的类型并且具有有效的 ClusterIP + if service.Spec.Type == constants.EtcdServiceType && service.Spec.ClusterIP != "" { + return service.Spec.ClusterIP, nil } - return nil, clusterIps + + return "", fmt.Errorf("Service %s not found or does not have a valid ClusterIP for Etcd", serviceName) }