Skip to content

Commit

Permalink
Merge pull request kosmos-io#479 from qiuwei68/feature_hostports
Browse files Browse the repository at this point in the history
feat: add select host port for apiserver
  • Loading branch information
kosmos-robot authored Apr 24, 2024
2 parents 16c8815 + c578dda commit f3ca7a9
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 31 deletions.
5 changes: 5 additions & 0 deletions pkg/kubenest/constants/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ const (
EtcdDataVolumeName = "etcd-data"
EtcdListenClientPort = 2379
EtcdListenPeerPort = 2380
EtcdSuffix = "-etcd-client"

//controlplane kube-controller
KubeControllerReplicas = 2
Expand Down Expand Up @@ -80,6 +81,10 @@ const (
// DeInitAction represents delete virtual cluster instance
DeInitAction Action = "deInit"

//host_port_manager
HostPortsCMName = "kosmos-hostports"
HostPortsCMDataName = "config.yaml"

ManifestComponentsConfigmap = "components-manifest-cm"
NodePoolConfigmap = "node-pool"
NodeShareState = "share"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"gopkg.in/yaml.v3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

"github.com/kosmos.io/kosmos/pkg/kubenest/constants"
)

/**
Expand Down Expand Up @@ -46,13 +48,11 @@ type ClusterPort struct {
}

func NewHostPortManager(client kubernetes.Interface) (*HostPortManager, error) {
//todo magic Value
hostPorts, err := client.CoreV1().ConfigMaps("kosmos-system").Get(context.TODO(), "kosmos-hostports", metav1.GetOptions{})
hostPorts, err := client.CoreV1().ConfigMaps(constants.KosmosNs).Get(context.TODO(), constants.HostPortsCMName, metav1.GetOptions{})
if err != nil {
return nil, err
}
//todo magic Value
yamlData, exist := hostPorts.Data["config.yaml"]
yamlData, exist := hostPorts.Data[constants.HostPortsCMDataName]
if !exist {
return nil, fmt.Errorf("hostports not found in configmap")
}
Expand All @@ -68,35 +68,72 @@ func NewHostPortManager(client kubernetes.Interface) (*HostPortManager, error) {
return manager, nil
}

func (m *HostPortManager) AllocateHostIP(clusterName string) (int32, error) {
func (m *HostPortManager) AllocateHostPort(clusterName string) (int32, error) {
m.lock.Lock()
defer m.lock.Unlock()

//使用临时变量存储原来的cm
oldHostPool := m.HostPortPool

for _, port := range m.HostPortPool.PortsPool {
if !m.isPortAllocated(port) {
m.HostPortPool.ClusterPorts = append(m.HostPortPool.ClusterPorts, ClusterPort{Port: port, Cluster: clusterName})
m.HostPortPool.ClusterPorts = append(m.HostPortPool.ClusterPorts, ClusterPort{Port: port, Cluster: clusterName})
return port, nil
err := updateConfigMapAndRollback(m, oldHostPool)
if err != nil {
return 0, err
}
return port, err
}
}
// todo 更新 cm
return 0, fmt.Errorf("no available ports to allocate")
}

func (m *HostPortManager) ReleaseHostIP(clusterName string) error {
func (m *HostPortManager) ReleaseHostPort(clusterName string) error {
m.lock.Lock()
defer m.lock.Unlock()

oldHostPool := m.HostPortPool

for i, cp := range m.HostPortPool.ClusterPorts {
if cp.Cluster == clusterName {
// Remove the entry from the slice
m.HostPortPool.ClusterPorts = append(m.HostPortPool.ClusterPorts[:i], m.HostPortPool.ClusterPorts[i+1:]...)
err := updateConfigMapAndRollback(m, oldHostPool)
if err != nil {
return err
}
return nil
}
}
// todo 更新 cm
return fmt.Errorf("no port found for cluster %s", clusterName)
}

func updateConfigMapAndRollback(m *HostPortManager, oldHostPool *HostPortPool) error {
data, err := yaml.Marshal(m.HostPortPool)
if err != nil {
m.HostPortPool = oldHostPool
return err
}

configMap, err := m.kubeClient.CoreV1().ConfigMaps(constants.KosmosNs).Get(context.TODO(), constants.HostPortsCMName, metav1.GetOptions{})
if err != nil {
m.HostPortPool = oldHostPool
return err
}

configMap.Data[constants.HostPortsCMDataName] = string(data)

_, updateErr := m.kubeClient.CoreV1().ConfigMaps(constants.KosmosNs).Update(context.TODO(), configMap, metav1.UpdateOptions{})

if updateErr != nil {
// 回滚 HostPortPool
m.HostPortPool = oldHostPool
return updateErr
}

return nil
}

func (m *HostPortManager) isPortAllocated(port int32) bool {
for _, cp := range m.HostPortPool.ClusterPorts {
if cp.Port == port {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubenest/controller/virtualcluster_init_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (c *VirtualClusterInitController) SetupWithManager(mgr manager.Manager) err

func (c *VirtualClusterInitController) Update(original, updated *v1alpha1.VirtualCluster) error {
now := metav1.Now()
updated.Status.TimeStamp = &now
updated.Status.UpdateTime = &now
return c.Client.Patch(context.TODO(), updated, client.MergeFrom(original))
}

Expand Down
12 changes: 7 additions & 5 deletions pkg/kubenest/controlplane/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ import (
)

func EnsureVirtualClusterAPIServer(client clientset.Interface, name, namespace string, manager *vcnodecontroller.HostPortManager) error {
_, err := manager.AllocateHostIP(name)
port, err := manager.AllocateHostPort(name)
if err != nil {
return fmt.Errorf("failed to allocate host ip for virtual cluster apiserver, err: %w", err)
}

if err := installAPIServer(client, name, namespace); err != nil {
if err := installAPIServer(client, name, namespace, port); err != nil {
return fmt.Errorf("failed to install virtual cluster apiserver, err: %w", err)
}
return nil
Expand All @@ -35,9 +35,9 @@ func DeleteVirtualClusterAPIServer(client clientset.Interface, name, namespace s
return nil
}

func installAPIServer(client clientset.Interface, name, namespace string) error {
func installAPIServer(client clientset.Interface, name, namespace string, port int32) error {
imageRepository, imageVersion := util.GetImageMessage()
err, clusterIps := util.GetEtcdServiceClusterIp(namespace, client)
clusterIp, err := util.GetEtcdServiceClusterIp(namespace, name+constants.EtcdSuffix, client)
if err != nil {
return nil
}
Expand All @@ -47,17 +47,19 @@ func installAPIServer(client clientset.Interface, name, namespace string) error
ServiceSubnet, VirtualClusterCertsSecret, EtcdCertsSecret string
Replicas int32
EtcdListenClientPort int32
ClusterPort int32
}{
DeploymentName: fmt.Sprintf("%s-%s", name, "apiserver"),
Namespace: namespace,
ImageRepository: imageRepository,
Version: imageVersion,
EtcdClientService: clusterIps[1],
EtcdClientService: clusterIp,
ServiceSubnet: constants.ApiServerServiceSubnet,
VirtualClusterCertsSecret: fmt.Sprintf("%s-%s", name, "cert"),
EtcdCertsSecret: fmt.Sprintf("%s-%s", name, "etcd-cert"),
Replicas: constants.ApiServerReplicas,
EtcdListenClientPort: constants.ApiServerEtcdListenClientPort,
ClusterPort: port,
})
if err != nil {
return fmt.Errorf("error when parsing virtual cluster apiserver deployment template: %w", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ spec:
- --kubelet-client-certificate=/etc/virtualcluster/pki/virtualCluster.crt
- --kubelet-client-key=/etc/virtualcluster/pki/virtualCluster.key
- --kubelet-preferred-address-types=InternalIP,ExternalIP,Hostname
- --secure-port=5443
- --secure-port={{ .ClusterPort }}
- --service-account-issuer=https://kubernetes.default.svc.cluster.local
- --service-account-key-file=/etc/virtualcluster/pki/virtualCluster.key
- --service-account-signing-key-file=/etc/virtualcluster/pki/virtualCluster.key
Expand All @@ -64,7 +64,7 @@ spec:
failureThreshold: 8
httpGet:
path: /livez
port: 5443
port: {{ .ClusterPort }}
scheme: HTTPS
initialDelaySeconds: 10
periodSeconds: 10
Expand All @@ -74,7 +74,7 @@ spec:
failureThreshold: 3
httpGet:
path: /readyz
port: 5443
port: {{ .ClusterPort }}
scheme: HTTPS
initialDelaySeconds: 10
periodSeconds: 10
Expand All @@ -91,7 +91,7 @@ spec:
- apiserver
topologyKey: kubernetes.io/hostname
ports:
- containerPort: 5443
- containerPort: {{ .ClusterPort }}
name: http
protocol: TCP
volumeMounts:
Expand Down
20 changes: 9 additions & 11 deletions pkg/kubenest/util/address.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,16 @@ func GetServiceClusterIp(namespace string, client clientset.Interface) (error, [
return nil, clusterIps
}

func GetEtcdServiceClusterIp(namespace string, client clientset.Interface) (error, []string) {
serviceLists, err := client.CoreV1().Services(namespace).List(context.TODO(), metav1.ListOptions{})
func GetEtcdServiceClusterIp(namespace string, serviceName string, client clientset.Interface) (string, error) {
service, err := client.CoreV1().Services(namespace).Get(context.TODO(), serviceName, metav1.GetOptions{})
if err != nil {
return err, nil
return "", err
}
var clusterIps []string
if serviceLists != nil {
for _, service := range serviceLists.Items {
if service.Spec.Type == constants.EtcdServiceType && service.Spec.ClusterIP != "" {
clusterIps = append(clusterIps, service.Spec.ClusterIP)
}
}

// 检查服务是否是期望的类型并且具有有效的 ClusterIP
if service.Spec.Type == constants.EtcdServiceType && service.Spec.ClusterIP != "" {
return service.Spec.ClusterIP, nil
}
return nil, clusterIps

return "", fmt.Errorf("Service %s not found or does not have a valid ClusterIP for Etcd", serviceName)
}

0 comments on commit f3ca7a9

Please sign in to comment.