From 6d4cdd2283b22a75e43f1b47f38d0495b0437700 Mon Sep 17 00:00:00 2001 From: GreatLazyMan Date: Wed, 29 Nov 2023 17:38:05 +0800 Subject: [PATCH] title: Add ipsec support Description: Sync code, adapt to ACK Signed-off-by: GreatLazyMan --- cmd/clusterlink/elector/app/elector.go | 7 +- deploy/crds/kosmos.io_clusternodes.yaml | 8 +- deploy/crds/kosmos.io_clusters.yaml | 6 + deploy/crds/kosmos.io_nodeconfigs.yaml | 46 +++++ pkg/apis/kosmos/v1alpha1/cluster_types.go | 4 + pkg/apis/kosmos/v1alpha1/clusternode_types.go | 5 +- pkg/apis/kosmos/v1alpha1/constants.go | 14 ++ pkg/apis/kosmos/v1alpha1/nodeconfig_types.go | 46 ++++- .../network-manager/network_manager.go | 51 ++++- .../calicoippool/calicoippool_controller.go | 8 +- .../controllers/cluster/cluster_controller.go | 59 +++++- .../controllers/node/node_controller.go | 18 ++ pkg/clusterlink/elector/elector.go | 63 +++++- .../network-manager/handlers/nodeconfig.go | 24 ++- .../network-manager/handlers/pod_routes.go | 86 ++++++++- pkg/clusterlink/network/adapter.go | 107 +++++++++++ pkg/clusterlink/network/interface.go | 6 + pkg/clusterlink/network/xfrm_policy.go | 180 ++++++++++++++++++ pkg/kosmosctl/install/install.go | 19 ++ pkg/kosmosctl/join/join.go | 20 +- pkg/kosmosctl/manifest/manifest_crds.go | 63 +++++- .../manifest/manifest_deployments.go | 6 +- 22 files changed, 797 insertions(+), 49 deletions(-) create mode 100644 pkg/clusterlink/network/xfrm_policy.go diff --git a/cmd/clusterlink/elector/app/elector.go b/cmd/clusterlink/elector/app/elector.go index e805615f1..334363e34 100644 --- a/cmd/clusterlink/elector/app/elector.go +++ b/cmd/clusterlink/elector/app/elector.go @@ -126,10 +126,11 @@ func run(ctx context.Context, opts *options.Options) error { err := elector.EnsureGateWayRole() if err != nil { klog.Errorf("set gateway role failure: %v, retry after 10 sec.", err) - time.Sleep(10 * time.Second) + time.Sleep(3 * time.Second) } else { - klog.V(4).Info("ensure gateway role success, recheck after 60 sec.") - time.Sleep(60 * time.Second) + timeToRecheck := 3 * time.Second + klog.V(4).Infof("ensure gateway role success, recheck after %d sec.", int(timeToRecheck)) + time.Sleep(timeToRecheck) } } } diff --git a/deploy/crds/kosmos.io_clusternodes.yaml b/deploy/crds/kosmos.io_clusternodes.yaml index fb61383c0..9d5dace88 100644 --- a/deploy/crds/kosmos.io_clusternodes.yaml +++ b/deploy/crds/kosmos.io_clusternodes.yaml @@ -45,6 +45,8 @@ spec: properties: clusterName: type: string + elasticip: + type: string interfaceName: type: string ip: @@ -63,11 +65,13 @@ spec: type: array type: object status: + properties: + nodeStatus: + type: string type: object required: - spec type: object served: true storage: true - subresources: - status: {} + subresources: {} diff --git a/deploy/crds/kosmos.io_clusters.yaml b/deploy/crds/kosmos.io_clusters.yaml index d96895c13..2f20e84bd 100644 --- a/deploy/crds/kosmos.io_clusters.yaml +++ b/deploy/crds/kosmos.io_clusters.yaml @@ -105,6 +105,12 @@ spec: - nodeName type: object type: array + nodeElasticIPMap: + additionalProperties: + type: string + description: NodeElasticIPMap presents mapping between nodename + in kubernetes and elasticIP + type: object useIPPool: default: false type: boolean diff --git a/deploy/crds/kosmos.io_nodeconfigs.yaml b/deploy/crds/kosmos.io_nodeconfigs.yaml index 78679263e..99db35885 100644 --- a/deploy/crds/kosmos.io_nodeconfigs.yaml +++ b/deploy/crds/kosmos.io_nodeconfigs.yaml @@ -122,6 +122,52 @@ spec: - gw type: object type: array + xfrmpolicies: + items: + properties: + dir: + type: integer + leftip: + type: string + leftnet: + type: string + reqid: + type: integer + rightip: + type: string + rightnet: + type: string + required: + - dir + - leftip + - leftnet + - reqid + - rightip + - rightnet + type: object + type: array + xfrmstates: + items: + properties: + PSK: + type: string + leftip: + type: string + reqid: + type: integer + rightip: + type: string + spi: + format: int32 + type: integer + required: + - PSK + - leftip + - reqid + - rightip + - spi + type: object + type: array type: object status: properties: diff --git a/pkg/apis/kosmos/v1alpha1/cluster_types.go b/pkg/apis/kosmos/v1alpha1/cluster_types.go index 9c1fe5a25..ef4b505fc 100644 --- a/pkg/apis/kosmos/v1alpha1/cluster_types.go +++ b/pkg/apis/kosmos/v1alpha1/cluster_types.go @@ -91,6 +91,10 @@ type ClusterLinkOptions struct { // +optional GlobalCIDRsMap map[string]string `json:"globalCIDRsMap,omitempty"` + + // NodeElasticIPMap presents mapping between nodename in kubernetes and elasticIP + // +optional + NodeElasticIPMap map[string]string `json:"nodeElasticIPMap,omitempty"` } type ClusterTreeOptions struct { diff --git a/pkg/apis/kosmos/v1alpha1/clusternode_types.go b/pkg/apis/kosmos/v1alpha1/clusternode_types.go index 8829d55b2..6708419d4 100644 --- a/pkg/apis/kosmos/v1alpha1/clusternode_types.go +++ b/pkg/apis/kosmos/v1alpha1/clusternode_types.go @@ -8,7 +8,6 @@ import ( // +genclient // +genclient:nonNamespaced -// +kubebuilder:subresource:status // +kubebuilder:resource:scope="Cluster" // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // +kubebuilder:printcolumn:name="ROLES",type=string,JSONPath=`.spec.roles` @@ -33,6 +32,8 @@ type ClusterNodeSpec struct { // +optional IP string `json:"ip,omitempty"` // +optional + ElasticIP string `json:"elasticip,omitempty"` + // +optional IP6 string `json:"ip6,omitempty"` // +optional Roles []Role `json:"roles,omitempty"` @@ -41,6 +42,8 @@ type ClusterNodeSpec struct { } type ClusterNodeStatus struct { + // +optional + NodeStatus string `json:"nodeStatus,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/apis/kosmos/v1alpha1/constants.go b/pkg/apis/kosmos/v1alpha1/constants.go index 56b37c550..7361a43b1 100644 --- a/pkg/apis/kosmos/v1alpha1/constants.go +++ b/pkg/apis/kosmos/v1alpha1/constants.go @@ -28,3 +28,17 @@ type DeviceType string const ( VxlanDevice DeviceType = "vxlan" ) + +const ( + DefaultPSK string = "bfd6224354977084568832b811226b3d6cff6685" + DefaultPSKPreStr = "WelcometoKosmos" + DefaultReqID int = 336 +) + +type IPSECDirection int + +const ( + IPSECIn IPSECDirection = 0 + IPSECOut IPSECDirection = 1 + IPSECFwd IPSECDirection = 2 +) diff --git a/pkg/apis/kosmos/v1alpha1/nodeconfig_types.go b/pkg/apis/kosmos/v1alpha1/nodeconfig_types.go index 1f4a77a02..c200ffd2e 100644 --- a/pkg/apis/kosmos/v1alpha1/nodeconfig_types.go +++ b/pkg/apis/kosmos/v1alpha1/nodeconfig_types.go @@ -21,11 +21,13 @@ type NodeConfig struct { } type NodeConfigSpec struct { - Devices []Device `json:"devices,omitempty"` - Routes []Route `json:"routes,omitempty"` - Iptables []Iptables `json:"iptables,omitempty"` - Fdbs []Fdb `json:"fdbs,omitempty"` - Arps []Arp `json:"arps,omitempty"` + Devices []Device `json:"devices,omitempty"` + Routes []Route `json:"routes,omitempty"` + Iptables []Iptables `json:"iptables,omitempty"` + Fdbs []Fdb `json:"fdbs,omitempty"` + Arps []Arp `json:"arps,omitempty"` + XfrmPolicies []XfrmPolicy `json:"xfrmpolicies,omitempty"` + XfrmStates []XfrmState `json:"xfrmstates,omitempty"` } type NodeConfigStatus struct { @@ -101,6 +103,40 @@ func (a *Arp) Compare(v Arp) bool { a.Dev == v.Dev } +type XfrmPolicy struct { + LeftIP string `json:"leftip"` + LeftNet string `json:"leftnet"` + RightIP string `json:"rightip"` + RightNet string `json:"rightnet"` + ReqID int `json:"reqid"` + Dir int `json:"dir"` +} + +func (a *XfrmPolicy) Compare(v XfrmPolicy) bool { + return a.LeftIP == v.LeftIP && + a.LeftNet == v.LeftNet && + a.RightNet == v.RightNet && + a.RightIP == v.RightIP && + a.ReqID == v.ReqID && + a.Dir == v.Dir +} + +type XfrmState struct { + LeftIP string `json:"leftip"` + RightIP string `json:"rightip"` + ReqID int `json:"reqid"` + SPI uint32 `json:"spi"` + PSK string `json:"PSK"` +} + +func (a *XfrmState) Compare(v XfrmState) bool { + return a.LeftIP == v.LeftIP && + a.RightIP == v.RightIP && + a.ReqID == v.ReqID && + a.PSK == v.PSK && + a.SPI == v.SPI +} + // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object type NodeConfigList struct { diff --git a/pkg/clusterlink/agent-manager/network-manager/network_manager.go b/pkg/clusterlink/agent-manager/network-manager/network_manager.go index f1898fa55..4583b71d8 100644 --- a/pkg/clusterlink/agent-manager/network-manager/network_manager.go +++ b/pkg/clusterlink/agent-manager/network-manager/network_manager.go @@ -112,6 +112,21 @@ func (e *NetworkManager) Diff(oldConfig, newConfig *clusterlinkv1alpha1.NodeConf createConfig.Routes = createRecord isSame = false } + // ipsec: + if flag, deleteRecord, createRecord := compareFunc(oldConfig.XfrmPolicies, newConfig.XfrmPolicies, func(a, b clusterlinkv1alpha1.XfrmPolicy) bool { + return a.Compare(b) + }); !flag { + deleteConfig.XfrmPolicies = deleteRecord + createConfig.XfrmPolicies = createRecord + isSame = false + } + if flag, deleteRecord, createRecord := compareFunc(oldConfig.XfrmStates, newConfig.XfrmStates, func(a, b clusterlinkv1alpha1.XfrmState) bool { + return a.Compare(b) + }); !flag { + deleteConfig.XfrmStates = deleteRecord + createConfig.XfrmStates = createRecord + isSame = false + } // iptables: if flag, deleteRecord, createRecord := compareFunc(oldConfig.Iptables, newConfig.Iptables, func(a, b clusterlinkv1alpha1.Iptables) bool { return a.Compare(b) @@ -188,6 +203,18 @@ func (e *NetworkManager) WriteSys(configDiff *ConfigDiff) error { errs = errors.Wrap(err, fmt.Sprint(errs)) } } + if config.XfrmPolicies != nil { + if err := e.NetworkInterface.DeleteXfrmPolicies(config.XfrmPolicies); err != nil { + klog.Warning(err) + errs = errors.Wrap(err, fmt.Sprint(errs)) + } + } + if config.XfrmStates != nil { + if err := e.NetworkInterface.DeleteXfrmStates(config.XfrmStates); err != nil { + klog.Warning(err) + errs = errors.Wrap(err, fmt.Sprint(errs)) + } + } } if configDiff.createConfig != nil { @@ -223,6 +250,18 @@ func (e *NetworkManager) WriteSys(configDiff *ConfigDiff) error { errs = errors.Wrap(err, fmt.Sprint(errs)) } } + if config.XfrmPolicies != nil { + if err := e.NetworkInterface.AddXfrmPolicies(config.XfrmPolicies); err != nil { + klog.Warning(err) + errs = errors.Wrap(err, fmt.Sprint(errs)) + } + } + if config.XfrmStates != nil { + if err := e.NetworkInterface.AddXfrmStates(config.XfrmStates); err != nil { + klog.Warning(err) + errs = errors.Wrap(err, fmt.Sprint(errs)) + } + } } return errs @@ -254,11 +293,13 @@ func (e *NetworkManager) UpdateFromChecker() NodeConfigSyncStatus { } func printNodeConfig(data *clusterlinkv1alpha1.NodeConfigSpec) { - klog.Infof("device: ", data.Devices) - klog.Infof("Arps: ", data.Arps) - klog.Infof("Fdbs: ", data.Fdbs) - klog.Infof("Iptables: ", data.Iptables) - klog.Infof("Routes: ", data.Routes) + klog.Infof("device: %v", data.Devices) + klog.Infof("Arps: %v", data.Arps) + klog.Infof("Fdbs: %v", data.Fdbs) + klog.Infof("Iptables: %v", data.Iptables) + klog.Infof("Routes: %v", data.Routes) + klog.Infof("XfrmPolicys: %v", data.XfrmPolicies) + klog.Infof("XfrmStates: %v", data.XfrmStates) } func (e *NetworkManager) UpdateSync() NodeConfigSyncStatus { diff --git a/pkg/clusterlink/controllers/calicoippool/calicoippool_controller.go b/pkg/clusterlink/controllers/calicoippool/calicoippool_controller.go index ba409c794..40391810b 100644 --- a/pkg/clusterlink/controllers/calicoippool/calicoippool_controller.go +++ b/pkg/clusterlink/controllers/calicoippool/calicoippool_controller.go @@ -339,10 +339,6 @@ func (c *Controller) Reconcile(key utils.QueueKey) error { } klog.Infof("start reconcile cluster %s", cluster.Name) - if cluster.Spec.ClusterLinkOptions.CNI != utils.CNITypeCalico { - klog.Infof("cluster %s cni type is %s skip reconcile", cluster.Name, cluster.Spec.ClusterLinkOptions.CNI) - return nil - } for ipPool := range c.globalExtIPPoolSet { if ipPool.cluster == cluster.Name { delete(c.globalExtIPPoolSet, ipPool) @@ -375,6 +371,10 @@ func (c *Controller) Reconcile(key utils.QueueKey) error { c.globalExtIPPoolSet[extIPPool] = struct{}{} } klog.Infof("now has %d globalIPPools", len(c.globalExtIPPoolSet)) + if cluster.Spec.ClusterLinkOptions.CNI != utils.CNITypeCalico { + klog.Infof("cluster %s cni type is %s skip reconcile", cluster.Name, cluster.Spec.ClusterLinkOptions.CNI) + return nil + } if c.iPPoolClient == nil { if cluster.Name == c.clusterName { ipPoolClient, err := c.createIPPoolClient(cluster) diff --git a/pkg/clusterlink/controllers/cluster/cluster_controller.go b/pkg/clusterlink/controllers/cluster/cluster_controller.go index a99fdcde7..4b976d805 100644 --- a/pkg/clusterlink/controllers/cluster/cluster_controller.go +++ b/pkg/clusterlink/controllers/cluster/cluster_controller.go @@ -3,6 +3,7 @@ package cluster import ( "context" "fmt" + "net" "reflect" "strings" "time" @@ -31,6 +32,8 @@ import ( clusterlinkv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" + "github.com/kosmos.io/kosmos/pkg/kosmosctl/manifest" + "github.com/kosmos.io/kosmos/pkg/kosmosctl/util" "github.com/kosmos.io/kosmos/pkg/utils" "github.com/kosmos.io/kosmos/pkg/utils/flags" "github.com/kosmos.io/kosmos/pkg/utils/keys" @@ -42,6 +45,31 @@ const ( KubeFlannelConfigMap = "kube-flannel-cfg" KubeFlannelNetworkConf = "net-conf.json" KubeFlannelIPPool = "Network" + KubeSystemNamespace = "kube-system" + InvalidService = ` +apiVersion: v1 +kind: Service +metadata: + labels: + kosmos.io/app: coredns + name: invalidsvc + namespace: {{ .Namespace }} +spec: + clusterIP: 8.8.8.8 + clusterIPs: + - 8.8.8.8 + ipFamilies: + - IPv4 + ports: + - name: dns + port: 53 + protocol: UDP + targetPort: 53 + selector: + invalid/app: null + sessionAffinity: None + type: ClusterIP +` ) type SetClusterPodCIDRFun func(cluster *clusterlinkv1alpha1.Cluster) error @@ -192,7 +220,36 @@ func (c *Controller) Reconcile(key utils.QueueKey) error { } if len(serviceCIDRS) == 0 { klog.Errorf("resolve serviceCIDRS for cluster %s failure", c.clusterName) - return err + // Todo: new a function + // Aliyun ACK don't put apiserver as a pod in cluster, try to resolve svccidr from error message + // For reference: https://stackoverflow.com/questions/44190607/how-do-you-find-the-cluster-service-cidr-of-a-kubernetes-cluster + svc, err := util.GenerateService(InvalidService, manifest.ServiceReplace{ + Namespace: KubeSystemNamespace, + }) + if err != nil { + return err + } + _, err = c.kubeClient.CoreV1().Services(KubeSystemNamespace).Create(context.Background(), svc, metav1.CreateOptions{}) + klog.Infof("Try creating invalid svc to get svccidr info ") + if err == nil { + err = c.kubeClient.CoreV1().Services(KubeSystemNamespace).Delete(context.Background(), svc.Name, metav1.DeleteOptions{}) + if err != nil { + return fmt.Errorf("Strange create invalid svc succcessfully,but delete failed,error : %v", err) + } + return fmt.Errorf("Strange create invalid svc succcessfully.") + } + + klog.Infof("Created invalid svc, the error is : %v ", err) + i := strings.Index(err.Error(), "The range of valid IPs is") + if i == -1 { + return fmt.Errorf("can't find valid service cidr in error message") + } + s := strings.TrimSpace(err.Error()[i+len("The range of valid IPs is"):]) + _, svccidr, err := net.ParseCIDR(s) + if err != nil { + return fmt.Errorf("can't find valid service cidr in error message, cidr str is %s", s) + } + serviceCIDRS = append(serviceCIDRS, svccidr.String()) } // sync pod cidr err = c.setClusterPodCIDRFun(reconcileCluster) diff --git a/pkg/clusterlink/controllers/node/node_controller.go b/pkg/clusterlink/controllers/node/node_controller.go index f0a5f8c4f..5bb74237d 100644 --- a/pkg/clusterlink/controllers/node/node_controller.go +++ b/pkg/clusterlink/controllers/node/node_controller.go @@ -115,6 +115,17 @@ func (r *Reconciler) Reconcile(ctx context.Context, request reconcile.Request) ( return reconcile.Result{Requeue: true}, nil } + var elasticIP string + elasticIPMap := cluster.Spec.ClusterLinkOptions.NodeElasticIPMap + if len(elasticIPMap) != 0 { + if elasticIPtoParse, ok := elasticIPMap[node.Name]; ok { + _, proto := ParseIP(elasticIPtoParse) + // Now elasticIP only support IPv4 + if proto == 4 { + elasticIP = elasticIPtoParse + } + } + } err = CreateOrUpdateClusterNode(r.ClusterLinkClient, clusterNode, func(n *clusterlinkv1alpha1.ClusterNode) error { n.Spec.NodeName = node.Name n.Spec.ClusterName = r.ClusterName @@ -123,6 +134,13 @@ func (r *Reconciler) Reconcile(ctx context.Context, request reconcile.Request) ( n.Spec.IP = internalIP n.Spec.IP6 = internalIP6 } + n.Spec.ElasticIP = elasticIP + + if utils.NodeReady(&node) { + n.Status.NodeStatus = string(corev1.NodeReady) + } else { + n.Status.NodeStatus = "NotReady" + } return nil }) if err != nil { diff --git a/pkg/clusterlink/elector/elector.go b/pkg/clusterlink/elector/elector.go index 4d1004496..ec52ef95b 100644 --- a/pkg/clusterlink/elector/elector.go +++ b/pkg/clusterlink/elector/elector.go @@ -2,12 +2,16 @@ package elector import ( "context" + "net" "os" + "sort" + apicorev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" + "github.com/kosmos.io/kosmos/pkg/clusterlink/controllers/node" "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" "github.com/kosmos.io/kosmos/pkg/utils" "github.com/kosmos.io/kosmos/pkg/utils/role" @@ -32,12 +36,61 @@ func (e *Elector) EnsureGateWayRole() error { if err != nil { return err } - _, err = e.controlPanelClient.KosmosV1alpha1().Clusters().Get(context.TODO(), e.clusterName, metav1.GetOptions{}) + cluster, err := e.controlPanelClient.KosmosV1alpha1().Clusters().Get(context.TODO(), e.clusterName, metav1.GetOptions{}) if err != nil { return err } + if len(cluster.Spec.ClusterLinkOptions.NodeElasticIPMap) > 0 { + var readyNodes = make([]string, 0, 5) + currentNodeName := os.Getenv(utils.EnvNodeName) + elasticIPMap := cluster.Spec.ClusterLinkOptions.NodeElasticIPMap + isCurrentNodeWithEIP := false + needReelect := true + + for nodeName := range elasticIPMap { + if nodeName == currentNodeName { + isCurrentNodeWithEIP = true + break + } + } + for nodeName := range elasticIPMap { + if net.ParseIP(elasticIPMap[nodeName]) == nil { + klog.Errorf("elasticIP %s is invalid", elasticIPMap[nodeName]) + continue + } + clusternode, err := e.controlPanelClient.KosmosV1alpha1().ClusterNodes().Get(context.TODO(), + node.ClusterNodeName(e.clusterName, nodeName), metav1.GetOptions{}) + if err != nil { + klog.Errorf("node %s is invalid: %v", nodeName, err) + continue + } + if len(clusternode.Status.NodeStatus) > 0 && + clusternode.Status.NodeStatus == string(apicorev1.NodeReady) { + if clusternode.IsGateway() { + needReelect = false + e.nodeName = clusternode.Spec.NodeName + break + } + readyNodes = append(readyNodes, nodeName) + } + } + + if needReelect { + if !isCurrentNodeWithEIP && len(readyNodes) > 0 { + sort.Strings(readyNodes) + e.nodeName = readyNodes[0] + } else { + e.nodeName = os.Getenv(utils.EnvNodeName) + } + } + } else { + e.nodeName = os.Getenv(utils.EnvNodeName) + } + modifyNodes := e.genModifyNode(clusterNodes.Items) - klog.Infof("%d node need modify", len(modifyNodes)) + if len(modifyNodes) > 0 { + klog.Infof("%d node need modify", len(modifyNodes)) + } for i := range modifyNodes { node := modifyNodes[i] _, err := e.controlPanelClient.KosmosV1alpha1().ClusterNodes().Update(context.TODO(), &node, metav1.UpdateOptions{}) @@ -56,12 +109,12 @@ func (e *Elector) genModifyNode(clusterNodes []v1alpha1.ClusterNode) []v1alpha1. clusterNode := clusterNodes[i] isGateWay := clusterNode.IsGateway() isSameCluster := clusterNode.Spec.ClusterName == e.clusterName - isCurrentNode := clusterNode.Spec.NodeName == e.nodeName + isNewGwNode := clusterNode.Spec.NodeName == e.nodeName if isSameCluster { - if !isCurrentNode && isGateWay { + if !isNewGwNode && isGateWay { role.RemoveRole(&clusterNode, v1alpha1.RoleGateway) modifyNodes = append(modifyNodes, clusterNode) - } else if isCurrentNode && !isGateWay { + } else if isNewGwNode && !isGateWay { role.AddRole(&clusterNode, v1alpha1.RoleGateway) modifyNodes = append(modifyNodes, clusterNode) } diff --git a/pkg/clusterlink/network-manager/handlers/nodeconfig.go b/pkg/clusterlink/network-manager/handlers/nodeconfig.go index e352c6671..d1f1f5165 100644 --- a/pkg/clusterlink/network-manager/handlers/nodeconfig.go +++ b/pkg/clusterlink/network-manager/handlers/nodeconfig.go @@ -12,11 +12,13 @@ import ( // NodeConfig network configuration of the node type NodeConfig struct { - Devices []v1alpha1.Device `json:"devices,omitempty"` - Routes []v1alpha1.Route `json:"routes,omitempty"` - Iptables []v1alpha1.Iptables `json:"iptables,omitempty"` - Fdbs []v1alpha1.Fdb `json:"fdbs,omitempty"` - Arps []v1alpha1.Arp `json:"arps,omitempty"` + Devices []v1alpha1.Device `json:"devices,omitempty"` + Routes []v1alpha1.Route `json:"routes,omitempty"` + Iptables []v1alpha1.Iptables `json:"iptables,omitempty"` + Fdbs []v1alpha1.Fdb `json:"fdbs,omitempty"` + Arps []v1alpha1.Arp `json:"arps,omitempty"` + XfrmPolicies []v1alpha1.XfrmPolicy `json:"xfrmpolicies,omitempty"` + XfrmStates []v1alpha1.XfrmState `json:"xfrmstates,omitempty"` } func (c *NodeConfig) ToString() string { @@ -33,11 +35,13 @@ func (c *NodeConfig) ToJson() ([]byte, error) { func (c *NodeConfig) ConvertToNodeConfigSpec() v1alpha1.NodeConfigSpec { return v1alpha1.NodeConfigSpec{ - Devices: c.Devices, - Routes: c.Routes, - Iptables: c.Iptables, - Fdbs: c.Fdbs, - Arps: c.Arps, + Devices: c.Devices, + Routes: c.Routes, + Iptables: c.Iptables, + Fdbs: c.Fdbs, + Arps: c.Arps, + XfrmStates: c.XfrmStates, + XfrmPolicies: c.XfrmPolicies, } } diff --git a/pkg/clusterlink/network-manager/handlers/pod_routes.go b/pkg/clusterlink/network-manager/handlers/pod_routes.go index 2d208f6df..464ce8e54 100644 --- a/pkg/clusterlink/network-manager/handlers/pod_routes.go +++ b/pkg/clusterlink/network-manager/handlers/pod_routes.go @@ -1,7 +1,13 @@ package handlers import ( + "bytes" + "crypto/md5" //nolint:gosec + "encoding/hex" + "fmt" + "hash/crc32" "net" + "os" "k8s.io/klog/v2" @@ -134,11 +140,81 @@ func BuildRoutes(ctx *Context, target *v1alpha1.ClusterNode, cidrs []string) { } if n.IsGateway() || srcCluster.IsP2P() { - ctx.Results[n.Name].Routes = append(ctx.Results[n.Name].Routes, v1alpha1.Route{ - CIDR: cidr, - Gw: targetIP.String(), - Dev: vxBridge, - }) + klog.Infof("Chekc node %s is gateway,t ElasticIP:%s,n ElasticIP: %s", n.Spec.NodeName, target.Spec.ElasticIP, n.Spec.ElasticIP) + if len(target.Spec.ElasticIP) > 0 && len(n.Spec.ElasticIP) > 0 { + nCluster := ctx.Filter.GetClusterByName(n.Spec.ClusterName) + var nPodCIDRs []string + if nCluster.IsP2P() { + nPodCIDRs = n.Spec.PodCIDRs + } else { + nPodCIDRs = nCluster.Status.ClusterLinkStatus.PodCIDRs + } + nPodCIDRs = FilterByIPFamily(nPodCIDRs, nCluster.Spec.ClusterLinkOptions.IPFamily) + nPodCIDRs = ConvertToGlobalCIDRs(nPodCIDRs, nCluster.Spec.ClusterLinkOptions.GlobalCIDRsMap) + var bt bytes.Buffer + if n.Name > target.Name { + bt.WriteString(n.Name) + bt.WriteString(target.Name) + } else { + bt.WriteString(target.Name) + bt.WriteString(n.Name) + } + spi := crc32.ChecksumIEEE(bt.Bytes()) + + psk_pre := md5.Sum([]byte(os.Getenv("PSK_PRE_STR"))) //nolint:gosec + psk_suffix := fmt.Sprintf("%08x", spi) + psk_suffix_byte, _ := hex.DecodeString(psk_suffix) + psk_byte := append(psk_pre[:], psk_suffix_byte...) + psk := hex.EncodeToString(psk_byte) + klog.Infof("psk_suffix: %s,psk: %s", psk_suffix, psk) + + ctx.Results[n.Name].XfrmStates = append(ctx.Results[n.Name].XfrmStates, v1alpha1.XfrmState{ + LeftIP: n.Spec.IP, + RightIP: target.Spec.ElasticIP, + ReqID: v1alpha1.DefaultReqID, + PSK: psk, + SPI: spi, + }) + ctx.Results[n.Name].XfrmStates = append(ctx.Results[n.Name].XfrmStates, v1alpha1.XfrmState{ + RightIP: n.Spec.IP, + LeftIP: target.Spec.ElasticIP, + ReqID: v1alpha1.DefaultReqID, + PSK: psk, + SPI: spi, + }) + for _, ncidr := range nPodCIDRs { + ctx.Results[n.Name].XfrmPolicies = append(ctx.Results[n.Name].XfrmPolicies, v1alpha1.XfrmPolicy{ + LeftIP: n.Spec.IP, + LeftNet: ncidr, + RightIP: target.Spec.ElasticIP, + RightNet: cidr, + ReqID: v1alpha1.DefaultReqID, + Dir: int(v1alpha1.IPSECOut), + }) + ctx.Results[n.Name].XfrmPolicies = append(ctx.Results[n.Name].XfrmPolicies, v1alpha1.XfrmPolicy{ + LeftIP: target.Spec.ElasticIP, + LeftNet: cidr, + RightIP: n.Spec.IP, + RightNet: ncidr, + ReqID: v1alpha1.DefaultReqID, + Dir: int(v1alpha1.IPSECIn), + }) + ctx.Results[n.Name].XfrmPolicies = append(ctx.Results[n.Name].XfrmPolicies, v1alpha1.XfrmPolicy{ + LeftIP: target.Spec.ElasticIP, + LeftNet: cidr, + RightIP: n.Spec.IP, + RightNet: ncidr, + ReqID: v1alpha1.DefaultReqID, + Dir: int(v1alpha1.IPSECFwd), + }) + } + } else { + ctx.Results[n.Name].Routes = append(ctx.Results[n.Name].Routes, v1alpha1.Route{ + CIDR: cidr, + Gw: targetIP.String(), + Dev: vxBridge, + }) + } continue } diff --git a/pkg/clusterlink/network/adapter.go b/pkg/clusterlink/network/adapter.go index c8ce08b80..e45cbb6f9 100644 --- a/pkg/clusterlink/network/adapter.go +++ b/pkg/clusterlink/network/adapter.go @@ -2,10 +2,13 @@ package network import ( "fmt" + "net" "github.com/pkg/errors" + "github.com/vishvananda/netlink" "k8s.io/klog/v2" + "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" clusterlinkv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" ) @@ -51,6 +54,20 @@ func (n *DefaultNetWork) LoadSysConfig() (*clusterlinkv1alpha1.NodeConfigSpec, e nodeConfigSpec.Arps = arps } + xfrmpolicies, err := loadXfrmPolicy() + if err != nil { + errs = errors.Wrap(err, fmt.Sprint(errs)) + } else { + nodeConfigSpec.XfrmPolicies = xfrmpolicies + } + + xfrmstates, err := loadXfrmState() + if err != nil { + errs = errors.Wrap(err, fmt.Sprint(errs)) + } else { + nodeConfigSpec.XfrmStates = xfrmstates + } + return nodeConfigSpec, errs } @@ -124,6 +141,14 @@ func (n *DefaultNetWork) UpdateDevices([]clusterlinkv1alpha1.Device) error { return ErrNotImplemented } +func (n *DefaultNetWork) UpdateXfrmPolicies([]clusterlinkv1alpha1.XfrmPolicy) error { + return ErrNotImplemented +} + +func (n *DefaultNetWork) UpdateXfrmStates([]clusterlinkv1alpha1.XfrmState) error { + return ErrNotImplemented +} + func (n *DefaultNetWork) AddArps(arps []clusterlinkv1alpha1.Arp) error { var errs error for _, arp := range arps { @@ -164,6 +189,88 @@ func (n *DefaultNetWork) AddRoutes(routes []clusterlinkv1alpha1.Route) error { return errs } +// For reference: +// https://github.com/flannel-io/flannel +func (n *DefaultNetWork) AddXfrmPolicies(xfrmpolicies []clusterlinkv1alpha1.XfrmPolicy) error { + for _, xfrmpolicy := range xfrmpolicies { + srcIP := net.ParseIP(xfrmpolicy.LeftIP) + dstIP := net.ParseIP(xfrmpolicy.RightIP) + _, srcNet, _ := net.ParseCIDR(xfrmpolicy.LeftNet) + _, dstNet, _ := net.ParseCIDR(xfrmpolicy.RightNet) + reqID := xfrmpolicy.ReqID + + var err error + var xfrmpolicydir netlink.Dir + switch v1alpha1.IPSECDirection(xfrmpolicy.Dir) { + case v1alpha1.IPSECOut: + xfrmpolicydir = netlink.XFRM_DIR_OUT + case v1alpha1.IPSECIn: + xfrmpolicydir = netlink.XFRM_DIR_IN + case v1alpha1.IPSECFwd: + xfrmpolicydir = netlink.XFRM_DIR_FWD + } + err = AddXFRMPolicy(srcNet, dstNet, srcIP, dstIP, xfrmpolicydir, reqID) + if err != nil { + return fmt.Errorf("error adding ipsec out policy: %v", err) + } + } + return nil +} + +// For reference: +// https://github.com/flannel-io/flannel +func (n *DefaultNetWork) DeleteXfrmPolicies(xfrmpolicies []clusterlinkv1alpha1.XfrmPolicy) error { + for _, xfrmpolicy := range xfrmpolicies { + srcIP := net.ParseIP(xfrmpolicy.LeftIP) + dstIP := net.ParseIP(xfrmpolicy.RightIP) + _, srcNet, _ := net.ParseCIDR(xfrmpolicy.LeftNet) + _, dstNet, _ := net.ParseCIDR(xfrmpolicy.RightNet) + reqID := xfrmpolicy.ReqID + + var xfrmpolicydir netlink.Dir + switch v1alpha1.IPSECDirection(xfrmpolicy.Dir) { + case v1alpha1.IPSECOut: + xfrmpolicydir = netlink.XFRM_DIR_OUT + case v1alpha1.IPSECIn: + xfrmpolicydir = netlink.XFRM_DIR_IN + case v1alpha1.IPSECFwd: + xfrmpolicydir = netlink.XFRM_DIR_FWD + } + + err := DeleteXFRMPolicy(srcNet, dstNet, srcIP, dstIP, xfrmpolicydir, reqID) + if err != nil { + return fmt.Errorf("error deleting ipsec out policy: %v", err) + } + } + return nil +} + +func (n *DefaultNetWork) AddXfrmStates(xfrmstates []clusterlinkv1alpha1.XfrmState) error { + for _, xfrmstate := range xfrmstates { + srcIP := net.ParseIP(xfrmstate.LeftIP) + dstIP := net.ParseIP(xfrmstate.RightIP) + reqID := xfrmstate.ReqID + err := AddXFRMState(srcIP, dstIP, reqID, int(xfrmstate.SPI), xfrmstate.PSK) + if err != nil { + return fmt.Errorf("error adding ipsec state: %v", err) + } + } + return nil +} + +func (n *DefaultNetWork) DeleteXfrmStates(xfrmstates []clusterlinkv1alpha1.XfrmState) error { + for _, xfrmstate := range xfrmstates { + srcIP := net.ParseIP(xfrmstate.LeftIP) + dstIP := net.ParseIP(xfrmstate.RightIP) + reqID := xfrmstate.ReqID + err := DeleteXFRMState(srcIP, dstIP, reqID, int(xfrmstate.SPI), xfrmstate.PSK) + if err != nil { + return fmt.Errorf("error deleting ipsec state: %v", err) + } + } + return nil +} + func (n *DefaultNetWork) AddDevices(devices []clusterlinkv1alpha1.Device) error { var errs error for _, device := range devices { diff --git a/pkg/clusterlink/network/interface.go b/pkg/clusterlink/network/interface.go index 58b6a43d1..247b5500d 100644 --- a/pkg/clusterlink/network/interface.go +++ b/pkg/clusterlink/network/interface.go @@ -19,18 +19,24 @@ type NetWork interface { DeleteIptables([]clusterlinkv1alpha1.Iptables) error DeleteRoutes([]clusterlinkv1alpha1.Route) error DeleteDevices([]clusterlinkv1alpha1.Device) error + DeleteXfrmPolicies([]clusterlinkv1alpha1.XfrmPolicy) error + DeleteXfrmStates([]clusterlinkv1alpha1.XfrmState) error UpdateArps([]clusterlinkv1alpha1.Arp) error UpdateFdbs([]clusterlinkv1alpha1.Fdb) error UpdateIptables([]clusterlinkv1alpha1.Iptables) error UpdateRoutes([]clusterlinkv1alpha1.Route) error UpdateDevices([]clusterlinkv1alpha1.Device) error + UpdateXfrmPolicies([]clusterlinkv1alpha1.XfrmPolicy) error + UpdateXfrmStates([]clusterlinkv1alpha1.XfrmState) error AddArps([]clusterlinkv1alpha1.Arp) error AddFdbs([]clusterlinkv1alpha1.Fdb) error AddIptables([]clusterlinkv1alpha1.Iptables) error AddRoutes([]clusterlinkv1alpha1.Route) error AddDevices([]clusterlinkv1alpha1.Device) error + AddXfrmPolicies([]clusterlinkv1alpha1.XfrmPolicy) error + AddXfrmStates([]clusterlinkv1alpha1.XfrmState) error InitSys() diff --git a/pkg/clusterlink/network/xfrm_policy.go b/pkg/clusterlink/network/xfrm_policy.go new file mode 100644 index 000000000..f9d56fbcc --- /dev/null +++ b/pkg/clusterlink/network/xfrm_policy.go @@ -0,0 +1,180 @@ +package network + +import ( + "encoding/hex" + "errors" + "fmt" + "net" + "syscall" + + "github.com/vishvananda/netlink" + log "k8s.io/klog/v2" + + clusterlinkv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" +) + +// For reference: +// https://github.com/flannel-io/flannel +func AddXFRMPolicy(srcNet, dstNet *net.IPNet, srcIP, dstIP net.IP, dir netlink.Dir, reqID int) error { + policy := &netlink.XfrmPolicy{ + Src: srcNet, + Dst: dstNet, + Dir: dir, + } + + tmpl := netlink.XfrmPolicyTmpl{ + Src: srcIP, + Dst: dstIP, + Proto: netlink.XFRM_PROTO_ESP, + Mode: netlink.XFRM_MODE_TUNNEL, + Reqid: reqID, + } + + policy.Tmpls = append(policy.Tmpls, tmpl) + + if existingPolicy, err := netlink.XfrmPolicyGet(policy); err != nil { + if errors.Is(err, syscall.ENOENT) { + log.Infof("Adding ipsec policy: %+v", tmpl) + if err := netlink.XfrmPolicyAdd(policy); err != nil { + return fmt.Errorf("error adding policy: %+v err: %v", policy, err) + } + } else { + return fmt.Errorf("error getting policy: %+v err: %v", policy, err) + } + } else { + log.Infof("Updating ipsec policy %+v with %+v", existingPolicy, policy) + if err := netlink.XfrmPolicyUpdate(policy); err != nil { + return fmt.Errorf("error updating policy: %+v err: %v", policy, err) + } + } + return nil +} + +func DeleteXFRMPolicy(srcNet, dstNet *net.IPNet, srcIP, dstIP net.IP, dir netlink.Dir, reqID int) error { + policy := netlink.XfrmPolicy{ + Src: srcNet, + Dst: dstNet, + Dir: dir, + } + + tmpl := netlink.XfrmPolicyTmpl{ + Src: srcIP, + Dst: dstIP, + Proto: netlink.XFRM_PROTO_ESP, + Mode: netlink.XFRM_MODE_TUNNEL, + Reqid: reqID, + } + + log.Infof("Deleting ipsec policy: %+v", tmpl) + + policy.Tmpls = append(policy.Tmpls, tmpl) + + if err := netlink.XfrmPolicyDel(&policy); err != nil { + return fmt.Errorf("error deleting policy: %+v err: %v", policy, err) + } + + return nil +} + +func AddXFRMState(srcIP, dstIP net.IP, reqID int, spi int, psk string) error { + k, _ := hex.DecodeString(psk) + state := netlink.XfrmState{ + Src: srcIP, + Dst: dstIP, + Proto: netlink.XFRM_PROTO_ESP, + Mode: netlink.XFRM_MODE_TUNNEL, + Spi: spi, + Reqid: reqID, + Aead: &netlink.XfrmStateAlgo{ + Name: "rfc4106(gcm(aes))", + Key: k, + ICVLen: 128, + }, + } + + if existingState, err := netlink.XfrmStateGet(&state); err != nil { + if errors.Is(err, syscall.ESRCH) || errors.Is(err, syscall.ENOENT) { + log.Infof("Adding xfrm state: %+v", state) + if err := netlink.XfrmStateAdd(&state); err != nil { + return fmt.Errorf("error adding state: %+v err: %v", state, err) + } + } else { + return fmt.Errorf("error getting state: %+v err: %v", state, err) + } + } else { + log.Infof("Updating xfrm state %+v with %+v", existingState, state) + if err := netlink.XfrmStateUpdate(&state); err != nil { + return fmt.Errorf("error updating state: %+v err: %v", state, err) + } + } + return nil +} + +func DeleteXFRMState(srcIP, dstIP net.IP, reqID int, spi int, psk string) error { + k, _ := hex.DecodeString(psk) + state := netlink.XfrmState{ + Src: srcIP, + Dst: dstIP, + Proto: netlink.XFRM_PROTO_ESP, + Mode: netlink.XFRM_MODE_TUNNEL, + Spi: spi, + Reqid: reqID, + Aead: &netlink.XfrmStateAlgo{ + Name: "rfc4106(gcm(aes))", + Key: k, + ICVLen: 128, + }, + } + log.Infof("Deleting ipsec state: %+v", state) + err := netlink.XfrmStateDel(&state) + if err != nil { + return fmt.Errorf("error delete xfrm state: %+v err: %v", state, err) + } + return nil +} + +func ListXfrmPolicy() ([]clusterlinkv1alpha1.XfrmPolicy, error) { + xfrmpolicies, err := netlink.XfrmPolicyList(netlink.FAMILY_ALL) + if err != nil { + return nil, fmt.Errorf("error list xfrm policy: %v", err) + } + var ret []clusterlinkv1alpha1.XfrmPolicy + for _, policy := range xfrmpolicies { + ret = append(ret, clusterlinkv1alpha1.XfrmPolicy{ + LeftIP: policy.Tmpls[0].Src.String(), + LeftNet: policy.Src.String(), + RightIP: policy.Tmpls[0].Dst.String(), + RightNet: policy.Dst.String(), + ReqID: policy.Tmpls[0].Reqid, + Dir: int(policy.Dir), + }) + } + return ret, nil +} + +func ListXfrmState() ([]clusterlinkv1alpha1.XfrmState, error) { + xfrmstates, err := netlink.XfrmStateList(netlink.FAMILY_ALL) + if err != nil { + return nil, fmt.Errorf("error list xfrm state: %v", err) + } + var ret []clusterlinkv1alpha1.XfrmState + for _, state := range xfrmstates { + k := hex.EncodeToString(state.Aead.Key) + ret = append(ret, clusterlinkv1alpha1.XfrmState{ + LeftIP: state.Src.String(), + RightIP: state.Dst.String(), + ReqID: state.Reqid, + PSK: k, + SPI: uint32(state.Spi), + }) + } + return ret, nil +} + +func loadXfrmPolicy() ([]clusterlinkv1alpha1.XfrmPolicy, error) { + return ListXfrmPolicy() +} + +func loadXfrmState() ([]clusterlinkv1alpha1.XfrmState, error) { + return ListXfrmState() +} diff --git a/pkg/kosmosctl/install/install.go b/pkg/kosmosctl/install/install.go index 6c7703e3b..ff4ef499b 100644 --- a/pkg/kosmosctl/install/install.go +++ b/pkg/kosmosctl/install/install.go @@ -3,6 +3,7 @@ package install import ( "context" "fmt" + "net" "os" "path/filepath" @@ -61,6 +62,7 @@ type CommandInstallOptions struct { NetworkType string IpFamily string UseProxy string + NodeElasticIP map[string]string KosmosClient versioned.Interface K8sClient kubernetes.Interface @@ -100,6 +102,7 @@ func NewCmdInstall(f ctlutil.Factory) *cobra.Command { flags.StringVar(&o.IpFamily, "ip-family", string(v1alpha1.IPFamilyTypeIPV4), "Specify the IP protocol version used by network devices, common IP families include IPv4 and IPv6.") flags.StringVar(&o.UseProxy, "use-proxy", "false", "Set whether to enable proxy.") flags.IntVarP(&o.WaitTime, "wait-time", "", utils.DefaultWaitTime, "Wait the specified time for the Kosmos install ready.") + flags.StringToStringVar(&o.NodeElasticIP, "node-elasticip", nil, "Set cluster node with elastic ip.") flags.StringVar(&o.CertEncode, "cert-encode", cert.GetCrtEncode(), "cert base64 string for node server.") flags.StringVar(&o.KeyEncode, "key-encode", cert.GetKeyEncode(), "key base64 string for node server.") @@ -154,6 +157,17 @@ func (o *CommandInstallOptions) Validate() error { return fmt.Errorf("kosmosctl install validate error, namespace is not valid") } + validationErr := "kosmosctl install validate error" + for nodeName, elasticIP := range o.NodeElasticIP { + _, err := o.K8sClient.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("%s, node %s is invalid: %v", validationErr, nodeName, err) + } + if net.ParseIP(elasticIP) == nil { + return fmt.Errorf("%s, ElasticIP %s is invalid", validationErr, elasticIP) + } + } + return nil } @@ -293,6 +307,7 @@ func (o *CommandInstallOptions) runClusterlink() error { Namespace: o.Namespace, ImageRepository: o.ImageRegistry, Version: version.GetReleaseVersion().PatchRelease(), + PSKPreStr: v1alpha1.DefaultPSKPreStr, }) if err != nil { return err @@ -580,6 +595,7 @@ func (o *CommandInstallOptions) createControlCluster() error { NetworkType: o.NetworkType, IpFamily: o.IpFamily, UseProxy: o.UseProxy, + NodeElasticIP: o.NodeElasticIP, } err = joinOptions.Run(clusterArgs) @@ -609,6 +625,7 @@ func (o *CommandInstallOptions) createControlCluster() error { case utils.DefaultIPv6: controlCluster.Spec.ClusterLinkOptions.IPFamily = v1alpha1.IPFamilyTypeIPV6 } + controlCluster.Spec.ClusterLinkOptions.NodeElasticIPMap = o.NodeElasticIP _, err = o.KosmosClient.KosmosV1alpha1().Clusters().Update(context.TODO(), controlCluster, metav1.UpdateOptions{}) if err != nil { klog.Infof("ControlCluster-Link: ", controlCluster) @@ -674,6 +691,7 @@ func (o *CommandInstallOptions) createControlCluster() error { NetworkType: o.NetworkType, IpFamily: o.IpFamily, UseProxy: o.UseProxy, + NodeElasticIP: o.NodeElasticIP, EnableTree: true, } @@ -705,6 +723,7 @@ func (o *CommandInstallOptions) createControlCluster() error { case utils.DefaultIPv6: controlCluster.Spec.ClusterLinkOptions.IPFamily = v1alpha1.IPFamilyTypeIPV6 } + controlCluster.Spec.ClusterLinkOptions.NodeElasticIPMap = o.NodeElasticIP _, err = o.KosmosClient.KosmosV1alpha1().Clusters().Update(context.TODO(), controlCluster, metav1.UpdateOptions{}) if err != nil { klog.Infof("ControlCluster-All: ", controlCluster) diff --git a/pkg/kosmosctl/join/join.go b/pkg/kosmosctl/join/join.go index d8018a8e4..3e5bac29f 100644 --- a/pkg/kosmosctl/join/join.go +++ b/pkg/kosmosctl/join/join.go @@ -3,6 +3,7 @@ package join import ( "context" "fmt" + "net" "os" "path/filepath" @@ -59,6 +60,7 @@ type CommandJoinOptions struct { NetworkType string IpFamily string UseProxy string + NodeElasticIP map[string]string EnableTree bool LeafModel string @@ -104,6 +106,7 @@ func NewCmdJoin(f ctlutil.Factory) *cobra.Command { flags.BoolVar(&o.EnableTree, "enable-tree", false, "Turn on clustertree.") flags.StringVar(&o.LeafModel, "leaf-model", "", "Set leaf cluster model, which supports one-to-one model.") flags.IntVarP(&o.WaitTime, "wait-time", "", utils.DefaultWaitTime, "Wait the specified time for the Kosmos install ready.") + flags.StringToStringVar(&o.NodeElasticIP, "node-elasticip", nil, "Set cluster node with elastic ip.") return cmd } @@ -174,6 +177,17 @@ func (o *CommandJoinOptions) Validate(args []string) error { return fmt.Errorf("kosmosctl join validate error, namespace is not valid") } + validationErr := "kosmosctl join validate error" + for nodeName, elasticIP := range o.NodeElasticIP { + _, err := o.K8sClient.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("%s, node %s is invalid: %v", validationErr, nodeName, err) + } + if net.ParseIP(elasticIP) == nil { + return fmt.Errorf("%s, ElasticIP %s is invalid", validationErr, elasticIP) + } + } + switch args[0] { case "cluster": _, err := o.KosmosClient.KosmosV1alpha1().Clusters().Get(context.TODO(), o.Name, metav1.GetOptions{}) @@ -225,8 +239,9 @@ func (o *CommandJoinOptions) runCluster() error { IP: "210.0.0.0/8", IP6: "9470::0/16", }, - NetworkType: v1alpha1.NetWorkTypeGateWay, - IPFamily: v1alpha1.IPFamilyTypeIPV4, + NetworkType: v1alpha1.NetWorkTypeGateWay, + IPFamily: v1alpha1.IPFamilyTypeIPV4, + NodeElasticIPMap: o.NodeElasticIP, }, ClusterTreeOptions: &v1alpha1.ClusterTreeOptions{ Enable: o.EnableTree, @@ -253,6 +268,7 @@ func (o *CommandJoinOptions) runCluster() error { cluster.Spec.ClusterLinkOptions.DefaultNICName = o.DefaultNICName cluster.Spec.ClusterLinkOptions.CNI = o.CNI + cluster.Spec.ClusterLinkOptions.NodeElasticIPMap = o.NodeElasticIP } if o.EnableTree { diff --git a/pkg/kosmosctl/manifest/manifest_crds.go b/pkg/kosmosctl/manifest/manifest_crds.go index 1174c1550..7a3cabf2b 100644 --- a/pkg/kosmosctl/manifest/manifest_crds.go +++ b/pkg/kosmosctl/manifest/manifest_crds.go @@ -347,6 +347,8 @@ spec: properties: clusterName: type: string + elasticip: + type: string interfaceName: type: string ip: @@ -365,16 +367,17 @@ spec: type: array type: object status: + properties: + nodeStatus: + type: string type: object required: - spec type: object served: true storage: true - subresources: - status: {} + subresources: {} ` - const Cluster = `--- apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition @@ -482,6 +485,12 @@ spec: - nodeName type: object type: array + nodeElasticIPMap: + additionalProperties: + type: string + description: NodeElasticIPMap presents mapping between nodename + in kubernetes and elasticIP + type: object useIPPool: default: false type: boolean @@ -652,7 +661,6 @@ spec: storage: true subresources: {} ` - const NodeConfig = `--- apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition @@ -777,6 +785,52 @@ spec: - gw type: object type: array + xfrmpolicies: + items: + properties: + dir: + type: integer + leftip: + type: string + leftnet: + type: string + reqid: + type: integer + rightip: + type: string + rightnet: + type: string + required: + - dir + - leftip + - leftnet + - reqid + - rightip + - rightnet + type: object + type: array + xfrmstates: + items: + properties: + PSK: + type: string + leftip: + type: string + reqid: + type: integer + rightip: + type: string + spi: + format: int32 + type: integer + required: + - PSK + - leftip + - reqid + - rightip + - spi + type: object + type: array type: object status: properties: @@ -795,7 +849,6 @@ spec: subresources: status: {} ` - const DaemonSet = `--- apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition diff --git a/pkg/kosmosctl/manifest/manifest_deployments.go b/pkg/kosmosctl/manifest/manifest_deployments.go index f888726b4..2d3e8fbc6 100644 --- a/pkg/kosmosctl/manifest/manifest_deployments.go +++ b/pkg/kosmosctl/manifest/manifest_deployments.go @@ -34,6 +34,9 @@ spec: requests: cpu: 500m memory: 500Mi + env: + - name: PSK_PRE_STR + value: "{{ .PSKPreStr }}" ` KosmosOperatorDeployment = ` @@ -268,5 +271,6 @@ type DeploymentReplace struct { ImageRepository string Version string - UseProxy string + UseProxy string + PSKPreStr string }