diff --git a/deploy/crds/kosmos.io_globalnodes.yaml b/deploy/crds/kosmos.io_globalnodes.yaml new file mode 100644 index 000000000..d51644bad --- /dev/null +++ b/deploy/crds/kosmos.io_globalnodes.yaml @@ -0,0 +1,93 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.11.0 + creationTimestamp: null + name: globalnodes.kosmos.io +spec: + group: kosmos.io + names: + kind: GlobalNode + listKind: GlobalNodeList + plural: globalnodes + singular: globalnode + scope: Cluster + versions: + - name: v1alpha1 + schema: + openAPIV3Schema: + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation + of an object. Servers should convert recognized schemas to the latest + internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' + type: string + kind: + description: 'Kind is a string value representing the REST resource this + object represents. Servers may infer this from the endpoint the client + submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + metadata: + type: object + spec: + description: Spec is the specification for the behaviour of the GlobalNodeSpec. + properties: + labels: + additionalProperties: + type: string + description: Set is a map of label:value. It implements Labels. + type: object + nodeIP: + type: string + state: + default: free + type: string + type: object + status: + properties: + conditions: + description: 'Conditions is an array of current observed node conditions. + More info: https://kubernetes.io/docs/concepts/nodes/node/#condition' + items: + description: NodeCondition contains condition information for a + node. + properties: + lastHeartbeatTime: + description: Last time we got an update on a given condition. + format: date-time + type: string + lastTransitionTime: + description: Last time the condition transit from one status + to another. + format: date-time + type: string + message: + description: Human readable message indicating details about + last transition. + type: string + reason: + description: (brief) reason for the condition's last transition. + type: string + status: + description: Status of the condition, one of True, False, Unknown. + type: string + type: + description: Type of node condition. + type: string + required: + - status + - type + type: object + type: array + virtualCluster: + type: string + type: object + required: + - spec + type: object + served: true + storage: true + subresources: + status: {} diff --git a/deploy/crds/kosmos.io_virtualclusters.yaml b/deploy/crds/kosmos.io_virtualclusters.yaml index e981c40e3..e91be9f95 100644 --- a/deploy/crds/kosmos.io_virtualclusters.yaml +++ b/deploy/crds/kosmos.io_virtualclusters.yaml @@ -105,6 +105,8 @@ spec: the kubernetes's control plane format: int32 type: integer + required: + - nodeCount type: object type: array promoteResources: diff --git a/global_node.yaml b/global_node.yaml new file mode 100644 index 000000000..183885e7d --- /dev/null +++ b/global_node.yaml @@ -0,0 +1,9 @@ +apiVersion: kosmos.io/v1alpha1 +kind: GlobalNode +metadata: + name: example-globalnode +spec: + nodeIP: "12.7" + labels: # Optional: fill in key-value pairs as needed + exampleKey: "exampleValue" + diff --git a/hack/generate_globalnode.sh b/hack/generate_globalnode.sh new file mode 100755 index 000000000..0285a3464 --- /dev/null +++ b/hack/generate_globalnode.sh @@ -0,0 +1,30 @@ +#!/bin/bash + +# 确保提供了kubeconfig路径 +if [ -z "$KUBECONFIG" ]; then + echo "KUBECONFIG环境变量未设置." + exit 1 +fi + +# 获取所有node的名称 +nodes=$(kubectl get nodes -o jsonpath='{.items[*].metadata.name}') +# 遍历所有node +for node in ${nodes}; do + # 获取node的IP地址 + nodeIP=$(kubectl get node ${node} -o jsonpath='{.status.addresses[0].address}') + # 获取node的标签,并转换为GlobalNode需要的格式 + labels=$(kubectl get node ${node} -o jsonpath='{.metadata.labels}') + labelsFormatted=$(echo "$labels" | jq -r 'to_entries | .[] | " \(.key): \(.value)"') + # 使用echo和管道将YAML直接传递给kubectl apply -f - + echo " +apiVersion: kosmos.io/v1alpha1 +kind: GlobalNode +metadata: + name: ${node}-globalnode +spec: + nodeIP: \"${nodeIP}\" + labels: +$(echo "${labelsFormatted}" | sed 's/=/": "/g' | awk '{print " " $0}') +" | kubectl apply -f - + +done diff --git a/pkg/apis/kosmos/v1alpha1/clusterpodconvertpolicy_types.go b/pkg/apis/kosmos/v1alpha1/clusterpodconvertpolicy_types.go index ed936d6a7..317fff0c1 100644 --- a/pkg/apis/kosmos/v1alpha1/clusterpodconvertpolicy_types.go +++ b/pkg/apis/kosmos/v1alpha1/clusterpodconvertpolicy_types.go @@ -28,7 +28,7 @@ type ClusterPodConvertPolicySpec struct { // A label query over a set of resources. // If name is not empty, LeafNodeSelector will be ignored. - // +option + // +optional LeafNodeSelector *metav1.LabelSelector `json:"leafNodeSelector,omitempty"` // Converters are some converter for convert pod when pod synced from root cluster to leaf cluster diff --git a/pkg/apis/kosmos/v1alpha1/global_node_types.go b/pkg/apis/kosmos/v1alpha1/global_node_types.go new file mode 100644 index 000000000..895fc408f --- /dev/null +++ b/pkg/apis/kosmos/v1alpha1/global_node_types.go @@ -0,0 +1,63 @@ +package v1alpha1 + +import ( + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" +) + +// +genclient +// +genclient:nonNamespaced +// +kubebuilder:resource:scope="Cluster" +// +kubebuilder:subresource:status +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +type GlobalNode struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + // Spec is the specification for the behaviour of the GlobalNodeSpec. + // +required + Spec GlobalNodeSpec `json:"spec"` + + // +optional + Status GlobalNodeStatus `json:"status,omitempty"` +} + +type GlobalNodeSpec struct { + // +optional + NodeIP string `json:"nodeIP,omitempty"` + + // +kubebuilder:default=free + // +optional + State NodeState `json:"state,omitempty"` + + // +optional + Labels labels.Set `json:"labels,omitempty"` +} + +type NodeState string + +const ( + NodeInUse NodeState = "occupied" + NodeFreeState NodeState = "free" + NodeReserved NodeState = "reserved" +) + +type GlobalNodeStatus struct { + // +optional + VirtualCluster string `json:"virtualCluster,omitempty"` + + // Conditions is an array of current observed node conditions. + // More info: https://kubernetes.io/docs/concepts/nodes/node/#condition + // +optional + Conditions []corev1.NodeCondition `json:"conditions,omitempty"` +} + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +type GlobalNodeList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata"` + Items []GlobalNode `json:"items"` +} diff --git a/pkg/apis/kosmos/v1alpha1/virtualcluster_types.go b/pkg/apis/kosmos/v1alpha1/virtualcluster_types.go index b375d0cb1..db6115c47 100644 --- a/pkg/apis/kosmos/v1alpha1/virtualcluster_types.go +++ b/pkg/apis/kosmos/v1alpha1/virtualcluster_types.go @@ -50,7 +50,7 @@ type VirtualClusterSpec struct { ExternalIP string `json:"externalIP,omitempty"` // PromotePolicies definites the policies for promote to the kubernetes's control plane - // +optional + // +required PromotePolicies []PromotePolicy `json:"promotePolicies,omitempty"` // PromoteResources definites the resources for promote to the kubernetes's control plane, @@ -65,8 +65,8 @@ type PromotePolicy struct { LabelSelector *metav1.LabelSelector `json:"labelSelector,omitempty"` // NodeCount is the number of nodes to promote to the kubernetes's control plane - // +optional - NodeCount *int32 `json:"nodeCount,omitempty"` + // +required + NodeCount int32 `json:"nodeCount"` } type PromoteResources struct { diff --git a/pkg/apis/kosmos/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/kosmos/v1alpha1/zz_generated.deepcopy.go index 968910849..9230fa45e 100644 --- a/pkg/apis/kosmos/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/kosmos/v1alpha1/zz_generated.deepcopy.go @@ -9,6 +9,7 @@ import ( appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + labels "k8s.io/apimachinery/pkg/labels" runtime "k8s.io/apimachinery/pkg/runtime" ) @@ -830,6 +831,113 @@ func (in *Fdb) DeepCopy() *Fdb { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *GlobalNode) DeepCopyInto(out *GlobalNode) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + in.Status.DeepCopyInto(&out.Status) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new GlobalNode. +func (in *GlobalNode) DeepCopy() *GlobalNode { + if in == nil { + return nil + } + out := new(GlobalNode) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *GlobalNode) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *GlobalNodeList) DeepCopyInto(out *GlobalNodeList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]GlobalNode, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new GlobalNodeList. +func (in *GlobalNodeList) DeepCopy() *GlobalNodeList { + if in == nil { + return nil + } + out := new(GlobalNodeList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *GlobalNodeList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *GlobalNodeSpec) DeepCopyInto(out *GlobalNodeSpec) { + *out = *in + if in.Labels != nil { + in, out := &in.Labels, &out.Labels + *out = make(labels.Set, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new GlobalNodeSpec. +func (in *GlobalNodeSpec) DeepCopy() *GlobalNodeSpec { + if in == nil { + return nil + } + out := new(GlobalNodeSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *GlobalNodeStatus) DeepCopyInto(out *GlobalNodeStatus) { + *out = *in + if in.Conditions != nil { + in, out := &in.Conditions, &out.Conditions + *out = make([]v1.NodeCondition, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new GlobalNodeStatus. +func (in *GlobalNodeStatus) DeepCopy() *GlobalNodeStatus { + if in == nil { + return nil + } + out := new(GlobalNodeStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Iptables) DeepCopyInto(out *Iptables) { *out = *in @@ -1327,11 +1435,6 @@ func (in *PromotePolicy) DeepCopyInto(out *PromotePolicy) { *out = new(metav1.LabelSelector) (*in).DeepCopyInto(*out) } - if in.NodeCount != nil { - in, out := &in.NodeCount, &out.NodeCount - *out = new(int32) - **out = **in - } return } diff --git a/pkg/apis/kosmos/v1alpha1/zz_generated.register.go b/pkg/apis/kosmos/v1alpha1/zz_generated.register.go index 9439e97d8..42e8b42cb 100644 --- a/pkg/apis/kosmos/v1alpha1/zz_generated.register.go +++ b/pkg/apis/kosmos/v1alpha1/zz_generated.register.go @@ -54,6 +54,8 @@ func addKnownTypes(scheme *runtime.Scheme) error { &DaemonSetList{}, &DistributionPolicy{}, &DistributionPolicyList{}, + &GlobalNode{}, + &GlobalNodeList{}, &Knode{}, &KnodeList{}, &NodeConfig{}, diff --git a/pkg/generated/clientset/versioned/typed/kosmos/v1alpha1/fake/fake_globalnode.go b/pkg/generated/clientset/versioned/typed/kosmos/v1alpha1/fake/fake_globalnode.go new file mode 100644 index 000000000..cc8ce6d6e --- /dev/null +++ b/pkg/generated/clientset/versioned/typed/kosmos/v1alpha1/fake/fake_globalnode.go @@ -0,0 +1,117 @@ +// Code generated by client-gen. DO NOT EDIT. + +package fake + +import ( + "context" + + v1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + labels "k8s.io/apimachinery/pkg/labels" + schema "k8s.io/apimachinery/pkg/runtime/schema" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + testing "k8s.io/client-go/testing" +) + +// FakeGlobalNodes implements GlobalNodeInterface +type FakeGlobalNodes struct { + Fake *FakeKosmosV1alpha1 +} + +var globalnodesResource = schema.GroupVersionResource{Group: "kosmos.io", Version: "v1alpha1", Resource: "globalnodes"} + +var globalnodesKind = schema.GroupVersionKind{Group: "kosmos.io", Version: "v1alpha1", Kind: "GlobalNode"} + +// Get takes name of the globalNode, and returns the corresponding globalNode object, and an error if there is any. +func (c *FakeGlobalNodes) Get(ctx context.Context, name string, options v1.GetOptions) (result *v1alpha1.GlobalNode, err error) { + obj, err := c.Fake. + Invokes(testing.NewRootGetAction(globalnodesResource, name), &v1alpha1.GlobalNode{}) + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.GlobalNode), err +} + +// List takes label and field selectors, and returns the list of GlobalNodes that match those selectors. +func (c *FakeGlobalNodes) List(ctx context.Context, opts v1.ListOptions) (result *v1alpha1.GlobalNodeList, err error) { + obj, err := c.Fake. + Invokes(testing.NewRootListAction(globalnodesResource, globalnodesKind, opts), &v1alpha1.GlobalNodeList{}) + if obj == nil { + return nil, err + } + + label, _, _ := testing.ExtractFromListOptions(opts) + if label == nil { + label = labels.Everything() + } + list := &v1alpha1.GlobalNodeList{ListMeta: obj.(*v1alpha1.GlobalNodeList).ListMeta} + for _, item := range obj.(*v1alpha1.GlobalNodeList).Items { + if label.Matches(labels.Set(item.Labels)) { + list.Items = append(list.Items, item) + } + } + return list, err +} + +// Watch returns a watch.Interface that watches the requested globalNodes. +func (c *FakeGlobalNodes) Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) { + return c.Fake. + InvokesWatch(testing.NewRootWatchAction(globalnodesResource, opts)) +} + +// Create takes the representation of a globalNode and creates it. Returns the server's representation of the globalNode, and an error, if there is any. +func (c *FakeGlobalNodes) Create(ctx context.Context, globalNode *v1alpha1.GlobalNode, opts v1.CreateOptions) (result *v1alpha1.GlobalNode, err error) { + obj, err := c.Fake. + Invokes(testing.NewRootCreateAction(globalnodesResource, globalNode), &v1alpha1.GlobalNode{}) + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.GlobalNode), err +} + +// Update takes the representation of a globalNode and updates it. Returns the server's representation of the globalNode, and an error, if there is any. +func (c *FakeGlobalNodes) Update(ctx context.Context, globalNode *v1alpha1.GlobalNode, opts v1.UpdateOptions) (result *v1alpha1.GlobalNode, err error) { + obj, err := c.Fake. + Invokes(testing.NewRootUpdateAction(globalnodesResource, globalNode), &v1alpha1.GlobalNode{}) + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.GlobalNode), err +} + +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). +func (c *FakeGlobalNodes) UpdateStatus(ctx context.Context, globalNode *v1alpha1.GlobalNode, opts v1.UpdateOptions) (*v1alpha1.GlobalNode, error) { + obj, err := c.Fake. + Invokes(testing.NewRootUpdateSubresourceAction(globalnodesResource, "status", globalNode), &v1alpha1.GlobalNode{}) + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.GlobalNode), err +} + +// Delete takes name of the globalNode and deletes it. Returns an error if one occurs. +func (c *FakeGlobalNodes) Delete(ctx context.Context, name string, opts v1.DeleteOptions) error { + _, err := c.Fake. + Invokes(testing.NewRootDeleteActionWithOptions(globalnodesResource, name, opts), &v1alpha1.GlobalNode{}) + return err +} + +// DeleteCollection deletes a collection of objects. +func (c *FakeGlobalNodes) DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error { + action := testing.NewRootDeleteCollectionAction(globalnodesResource, listOpts) + + _, err := c.Fake.Invokes(action, &v1alpha1.GlobalNodeList{}) + return err +} + +// Patch applies the patch and returns the patched globalNode. +func (c *FakeGlobalNodes) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.GlobalNode, err error) { + obj, err := c.Fake. + Invokes(testing.NewRootPatchSubresourceAction(globalnodesResource, name, pt, data, subresources...), &v1alpha1.GlobalNode{}) + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.GlobalNode), err +} diff --git a/pkg/generated/clientset/versioned/typed/kosmos/v1alpha1/fake/fake_kosmos_client.go b/pkg/generated/clientset/versioned/typed/kosmos/v1alpha1/fake/fake_kosmos_client.go index 0d4a2da4f..cc8a3a6b0 100644 --- a/pkg/generated/clientset/versioned/typed/kosmos/v1alpha1/fake/fake_kosmos_client.go +++ b/pkg/generated/clientset/versioned/typed/kosmos/v1alpha1/fake/fake_kosmos_client.go @@ -36,6 +36,10 @@ func (c *FakeKosmosV1alpha1) DistributionPolicies(namespace string) v1alpha1.Dis return &FakeDistributionPolicies{c, namespace} } +func (c *FakeKosmosV1alpha1) GlobalNodes() v1alpha1.GlobalNodeInterface { + return &FakeGlobalNodes{c} +} + func (c *FakeKosmosV1alpha1) Knodes() v1alpha1.KnodeInterface { return &FakeKnodes{c} } diff --git a/pkg/generated/clientset/versioned/typed/kosmos/v1alpha1/generated_expansion.go b/pkg/generated/clientset/versioned/typed/kosmos/v1alpha1/generated_expansion.go index bdba06cb2..fb8e102e4 100644 --- a/pkg/generated/clientset/versioned/typed/kosmos/v1alpha1/generated_expansion.go +++ b/pkg/generated/clientset/versioned/typed/kosmos/v1alpha1/generated_expansion.go @@ -14,6 +14,8 @@ type DaemonSetExpansion interface{} type DistributionPolicyExpansion interface{} +type GlobalNodeExpansion interface{} + type KnodeExpansion interface{} type NodeConfigExpansion interface{} diff --git a/pkg/generated/clientset/versioned/typed/kosmos/v1alpha1/globalnode.go b/pkg/generated/clientset/versioned/typed/kosmos/v1alpha1/globalnode.go new file mode 100644 index 000000000..a042cda05 --- /dev/null +++ b/pkg/generated/clientset/versioned/typed/kosmos/v1alpha1/globalnode.go @@ -0,0 +1,168 @@ +// Code generated by client-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + "context" + "time" + + v1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" + scheme "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned/scheme" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + rest "k8s.io/client-go/rest" +) + +// GlobalNodesGetter has a method to return a GlobalNodeInterface. +// A group's client should implement this interface. +type GlobalNodesGetter interface { + GlobalNodes() GlobalNodeInterface +} + +// GlobalNodeInterface has methods to work with GlobalNode resources. +type GlobalNodeInterface interface { + Create(ctx context.Context, globalNode *v1alpha1.GlobalNode, opts v1.CreateOptions) (*v1alpha1.GlobalNode, error) + Update(ctx context.Context, globalNode *v1alpha1.GlobalNode, opts v1.UpdateOptions) (*v1alpha1.GlobalNode, error) + UpdateStatus(ctx context.Context, globalNode *v1alpha1.GlobalNode, opts v1.UpdateOptions) (*v1alpha1.GlobalNode, error) + Delete(ctx context.Context, name string, opts v1.DeleteOptions) error + DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error + Get(ctx context.Context, name string, opts v1.GetOptions) (*v1alpha1.GlobalNode, error) + List(ctx context.Context, opts v1.ListOptions) (*v1alpha1.GlobalNodeList, error) + Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) + Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.GlobalNode, err error) + GlobalNodeExpansion +} + +// globalNodes implements GlobalNodeInterface +type globalNodes struct { + client rest.Interface +} + +// newGlobalNodes returns a GlobalNodes +func newGlobalNodes(c *KosmosV1alpha1Client) *globalNodes { + return &globalNodes{ + client: c.RESTClient(), + } +} + +// Get takes name of the globalNode, and returns the corresponding globalNode object, and an error if there is any. +func (c *globalNodes) Get(ctx context.Context, name string, options v1.GetOptions) (result *v1alpha1.GlobalNode, err error) { + result = &v1alpha1.GlobalNode{} + err = c.client.Get(). + Resource("globalnodes"). + Name(name). + VersionedParams(&options, scheme.ParameterCodec). + Do(ctx). + Into(result) + return +} + +// List takes label and field selectors, and returns the list of GlobalNodes that match those selectors. +func (c *globalNodes) List(ctx context.Context, opts v1.ListOptions) (result *v1alpha1.GlobalNodeList, err error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + result = &v1alpha1.GlobalNodeList{} + err = c.client.Get(). + Resource("globalnodes"). + VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). + Do(ctx). + Into(result) + return +} + +// Watch returns a watch.Interface that watches the requested globalNodes. +func (c *globalNodes) Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + opts.Watch = true + return c.client.Get(). + Resource("globalnodes"). + VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). + Watch(ctx) +} + +// Create takes the representation of a globalNode and creates it. Returns the server's representation of the globalNode, and an error, if there is any. +func (c *globalNodes) Create(ctx context.Context, globalNode *v1alpha1.GlobalNode, opts v1.CreateOptions) (result *v1alpha1.GlobalNode, err error) { + result = &v1alpha1.GlobalNode{} + err = c.client.Post(). + Resource("globalnodes"). + VersionedParams(&opts, scheme.ParameterCodec). + Body(globalNode). + Do(ctx). + Into(result) + return +} + +// Update takes the representation of a globalNode and updates it. Returns the server's representation of the globalNode, and an error, if there is any. +func (c *globalNodes) Update(ctx context.Context, globalNode *v1alpha1.GlobalNode, opts v1.UpdateOptions) (result *v1alpha1.GlobalNode, err error) { + result = &v1alpha1.GlobalNode{} + err = c.client.Put(). + Resource("globalnodes"). + Name(globalNode.Name). + VersionedParams(&opts, scheme.ParameterCodec). + Body(globalNode). + Do(ctx). + Into(result) + return +} + +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). +func (c *globalNodes) UpdateStatus(ctx context.Context, globalNode *v1alpha1.GlobalNode, opts v1.UpdateOptions) (result *v1alpha1.GlobalNode, err error) { + result = &v1alpha1.GlobalNode{} + err = c.client.Put(). + Resource("globalnodes"). + Name(globalNode.Name). + SubResource("status"). + VersionedParams(&opts, scheme.ParameterCodec). + Body(globalNode). + Do(ctx). + Into(result) + return +} + +// Delete takes name of the globalNode and deletes it. Returns an error if one occurs. +func (c *globalNodes) Delete(ctx context.Context, name string, opts v1.DeleteOptions) error { + return c.client.Delete(). + Resource("globalnodes"). + Name(name). + Body(&opts). + Do(ctx). + Error() +} + +// DeleteCollection deletes a collection of objects. +func (c *globalNodes) DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error { + var timeout time.Duration + if listOpts.TimeoutSeconds != nil { + timeout = time.Duration(*listOpts.TimeoutSeconds) * time.Second + } + return c.client.Delete(). + Resource("globalnodes"). + VersionedParams(&listOpts, scheme.ParameterCodec). + Timeout(timeout). + Body(&opts). + Do(ctx). + Error() +} + +// Patch applies the patch and returns the patched globalNode. +func (c *globalNodes) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.GlobalNode, err error) { + result = &v1alpha1.GlobalNode{} + err = c.client.Patch(pt). + Resource("globalnodes"). + Name(name). + SubResource(subresources...). + VersionedParams(&opts, scheme.ParameterCodec). + Body(data). + Do(ctx). + Into(result) + return +} diff --git a/pkg/generated/clientset/versioned/typed/kosmos/v1alpha1/kosmos_client.go b/pkg/generated/clientset/versioned/typed/kosmos/v1alpha1/kosmos_client.go index 103738f28..bbb6aa9a1 100644 --- a/pkg/generated/clientset/versioned/typed/kosmos/v1alpha1/kosmos_client.go +++ b/pkg/generated/clientset/versioned/typed/kosmos/v1alpha1/kosmos_client.go @@ -18,6 +18,7 @@ type KosmosV1alpha1Interface interface { ClusterPodConvertPoliciesGetter DaemonSetsGetter DistributionPoliciesGetter + GlobalNodesGetter KnodesGetter NodeConfigsGetter PodConvertPoliciesGetter @@ -54,6 +55,10 @@ func (c *KosmosV1alpha1Client) DistributionPolicies(namespace string) Distributi return newDistributionPolicies(c, namespace) } +func (c *KosmosV1alpha1Client) GlobalNodes() GlobalNodeInterface { + return newGlobalNodes(c) +} + func (c *KosmosV1alpha1Client) Knodes() KnodeInterface { return newKnodes(c) } diff --git a/pkg/generated/informers/externalversions/generic.go b/pkg/generated/informers/externalversions/generic.go index 297eb2dd8..84b081a3c 100644 --- a/pkg/generated/informers/externalversions/generic.go +++ b/pkg/generated/informers/externalversions/generic.go @@ -50,6 +50,8 @@ func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource return &genericInformer{resource: resource.GroupResource(), informer: f.Kosmos().V1alpha1().DaemonSets().Informer()}, nil case v1alpha1.SchemeGroupVersion.WithResource("distributionpolicies"): return &genericInformer{resource: resource.GroupResource(), informer: f.Kosmos().V1alpha1().DistributionPolicies().Informer()}, nil + case v1alpha1.SchemeGroupVersion.WithResource("globalnodes"): + return &genericInformer{resource: resource.GroupResource(), informer: f.Kosmos().V1alpha1().GlobalNodes().Informer()}, nil case v1alpha1.SchemeGroupVersion.WithResource("knodes"): return &genericInformer{resource: resource.GroupResource(), informer: f.Kosmos().V1alpha1().Knodes().Informer()}, nil case v1alpha1.SchemeGroupVersion.WithResource("nodeconfigs"): diff --git a/pkg/generated/informers/externalversions/kosmos/v1alpha1/globalnode.go b/pkg/generated/informers/externalversions/kosmos/v1alpha1/globalnode.go new file mode 100644 index 000000000..6a6093c5d --- /dev/null +++ b/pkg/generated/informers/externalversions/kosmos/v1alpha1/globalnode.go @@ -0,0 +1,73 @@ +// Code generated by informer-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + "context" + time "time" + + kosmosv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" + versioned "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" + internalinterfaces "github.com/kosmos.io/kosmos/pkg/generated/informers/externalversions/internalinterfaces" + v1alpha1 "github.com/kosmos.io/kosmos/pkg/generated/listers/kosmos/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtime "k8s.io/apimachinery/pkg/runtime" + watch "k8s.io/apimachinery/pkg/watch" + cache "k8s.io/client-go/tools/cache" +) + +// GlobalNodeInformer provides access to a shared informer and lister for +// GlobalNodes. +type GlobalNodeInformer interface { + Informer() cache.SharedIndexInformer + Lister() v1alpha1.GlobalNodeLister +} + +type globalNodeInformer struct { + factory internalinterfaces.SharedInformerFactory + tweakListOptions internalinterfaces.TweakListOptionsFunc +} + +// NewGlobalNodeInformer constructs a new informer for GlobalNode type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewGlobalNodeInformer(client versioned.Interface, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { + return NewFilteredGlobalNodeInformer(client, resyncPeriod, indexers, nil) +} + +// NewFilteredGlobalNodeInformer constructs a new informer for GlobalNode type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewFilteredGlobalNodeInformer(client versioned.Interface, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { + return cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options v1.ListOptions) (runtime.Object, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.KosmosV1alpha1().GlobalNodes().List(context.TODO(), options) + }, + WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.KosmosV1alpha1().GlobalNodes().Watch(context.TODO(), options) + }, + }, + &kosmosv1alpha1.GlobalNode{}, + resyncPeriod, + indexers, + ) +} + +func (f *globalNodeInformer) defaultInformer(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + return NewFilteredGlobalNodeInformer(client, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) +} + +func (f *globalNodeInformer) Informer() cache.SharedIndexInformer { + return f.factory.InformerFor(&kosmosv1alpha1.GlobalNode{}, f.defaultInformer) +} + +func (f *globalNodeInformer) Lister() v1alpha1.GlobalNodeLister { + return v1alpha1.NewGlobalNodeLister(f.Informer().GetIndexer()) +} diff --git a/pkg/generated/informers/externalversions/kosmos/v1alpha1/interface.go b/pkg/generated/informers/externalversions/kosmos/v1alpha1/interface.go index ec48ae33a..5af1f3b5c 100644 --- a/pkg/generated/informers/externalversions/kosmos/v1alpha1/interface.go +++ b/pkg/generated/informers/externalversions/kosmos/v1alpha1/interface.go @@ -20,6 +20,8 @@ type Interface interface { DaemonSets() DaemonSetInformer // DistributionPolicies returns a DistributionPolicyInformer. DistributionPolicies() DistributionPolicyInformer + // GlobalNodes returns a GlobalNodeInformer. + GlobalNodes() GlobalNodeInformer // Knodes returns a KnodeInformer. Knodes() KnodeInformer // NodeConfigs returns a NodeConfigInformer. @@ -73,6 +75,11 @@ func (v *version) DistributionPolicies() DistributionPolicyInformer { return &distributionPolicyInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} } +// GlobalNodes returns a GlobalNodeInformer. +func (v *version) GlobalNodes() GlobalNodeInformer { + return &globalNodeInformer{factory: v.factory, tweakListOptions: v.tweakListOptions} +} + // Knodes returns a KnodeInformer. func (v *version) Knodes() KnodeInformer { return &knodeInformer{factory: v.factory, tweakListOptions: v.tweakListOptions} diff --git a/pkg/generated/listers/kosmos/v1alpha1/expansion_generated.go b/pkg/generated/listers/kosmos/v1alpha1/expansion_generated.go index 36ec4c7d9..58bc0576a 100644 --- a/pkg/generated/listers/kosmos/v1alpha1/expansion_generated.go +++ b/pkg/generated/listers/kosmos/v1alpha1/expansion_generated.go @@ -34,6 +34,10 @@ type DistributionPolicyListerExpansion interface{} // DistributionPolicyNamespaceLister. type DistributionPolicyNamespaceListerExpansion interface{} +// GlobalNodeListerExpansion allows custom methods to be added to +// GlobalNodeLister. +type GlobalNodeListerExpansion interface{} + // KnodeListerExpansion allows custom methods to be added to // KnodeLister. type KnodeListerExpansion interface{} diff --git a/pkg/generated/listers/kosmos/v1alpha1/globalnode.go b/pkg/generated/listers/kosmos/v1alpha1/globalnode.go new file mode 100644 index 000000000..b8ccdd5ba --- /dev/null +++ b/pkg/generated/listers/kosmos/v1alpha1/globalnode.go @@ -0,0 +1,52 @@ +// Code generated by lister-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + v1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/cache" +) + +// GlobalNodeLister helps list GlobalNodes. +// All objects returned here must be treated as read-only. +type GlobalNodeLister interface { + // List lists all GlobalNodes in the indexer. + // Objects returned here must be treated as read-only. + List(selector labels.Selector) (ret []*v1alpha1.GlobalNode, err error) + // Get retrieves the GlobalNode from the index for a given name. + // Objects returned here must be treated as read-only. + Get(name string) (*v1alpha1.GlobalNode, error) + GlobalNodeListerExpansion +} + +// globalNodeLister implements the GlobalNodeLister interface. +type globalNodeLister struct { + indexer cache.Indexer +} + +// NewGlobalNodeLister returns a new GlobalNodeLister. +func NewGlobalNodeLister(indexer cache.Indexer) GlobalNodeLister { + return &globalNodeLister{indexer: indexer} +} + +// List lists all GlobalNodes in the indexer. +func (s *globalNodeLister) List(selector labels.Selector) (ret []*v1alpha1.GlobalNode, err error) { + err = cache.ListAll(s.indexer, selector, func(m interface{}) { + ret = append(ret, m.(*v1alpha1.GlobalNode)) + }) + return ret, err +} + +// Get retrieves the GlobalNode from the index for a given name. +func (s *globalNodeLister) Get(name string) (*v1alpha1.GlobalNode, error) { + obj, exists, err := s.indexer.GetByKey(name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(v1alpha1.Resource("globalnode"), name) + } + return obj.(*v1alpha1.GlobalNode), nil +} diff --git a/pkg/generated/openapi/zz_generated.openapi.go b/pkg/generated/openapi/zz_generated.openapi.go index 089cd5509..d6dab7a3b 100644 --- a/pkg/generated/openapi/zz_generated.openapi.go +++ b/pkg/generated/openapi/zz_generated.openapi.go @@ -45,6 +45,10 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1.DistributionPolicyList": schema_pkg_apis_kosmos_v1alpha1_DistributionPolicyList(ref), "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1.DistributionSpec": schema_pkg_apis_kosmos_v1alpha1_DistributionSpec(ref), "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1.Fdb": schema_pkg_apis_kosmos_v1alpha1_Fdb(ref), + "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1.GlobalNode": schema_pkg_apis_kosmos_v1alpha1_GlobalNode(ref), + "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1.GlobalNodeList": schema_pkg_apis_kosmos_v1alpha1_GlobalNodeList(ref), + "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1.GlobalNodeSpec": schema_pkg_apis_kosmos_v1alpha1_GlobalNodeSpec(ref), + "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1.GlobalNodeStatus": schema_pkg_apis_kosmos_v1alpha1_GlobalNodeStatus(ref), "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1.Iptables": schema_pkg_apis_kosmos_v1alpha1_Iptables(ref), "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1.Knode": schema_pkg_apis_kosmos_v1alpha1_Knode(ref), "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1.KnodeList": schema_pkg_apis_kosmos_v1alpha1_KnodeList(ref), @@ -1543,6 +1547,175 @@ func schema_pkg_apis_kosmos_v1alpha1_Fdb(ref common.ReferenceCallback) common.Op } } +func schema_pkg_apis_kosmos_v1alpha1_GlobalNode(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "kind": { + SchemaProps: spec.SchemaProps{ + Description: "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds", + Type: []string{"string"}, + Format: "", + }, + }, + "apiVersion": { + SchemaProps: spec.SchemaProps{ + Description: "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources", + Type: []string{"string"}, + Format: "", + }, + }, + "metadata": { + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta"), + }, + }, + "spec": { + SchemaProps: spec.SchemaProps{ + Description: "Spec is the specification for the behaviour of the GlobalNodeSpec.", + Default: map[string]interface{}{}, + Ref: ref("github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1.GlobalNodeSpec"), + }, + }, + "status": { + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1.GlobalNodeStatus"), + }, + }, + }, + Required: []string{"spec"}, + }, + }, + Dependencies: []string{ + "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1.GlobalNodeSpec", "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1.GlobalNodeStatus", "k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta"}, + } +} + +func schema_pkg_apis_kosmos_v1alpha1_GlobalNodeList(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "kind": { + SchemaProps: spec.SchemaProps{ + Description: "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds", + Type: []string{"string"}, + Format: "", + }, + }, + "apiVersion": { + SchemaProps: spec.SchemaProps{ + Description: "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources", + Type: []string{"string"}, + Format: "", + }, + }, + "metadata": { + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.ListMeta"), + }, + }, + "items": { + SchemaProps: spec.SchemaProps{ + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1.GlobalNode"), + }, + }, + }, + }, + }, + }, + Required: []string{"metadata", "items"}, + }, + }, + Dependencies: []string{ + "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1.GlobalNode", "k8s.io/apimachinery/pkg/apis/meta/v1.ListMeta"}, + } +} + +func schema_pkg_apis_kosmos_v1alpha1_GlobalNodeSpec(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "nodeIP": { + SchemaProps: spec.SchemaProps{ + Type: []string{"string"}, + Format: "", + }, + }, + "state": { + SchemaProps: spec.SchemaProps{ + Type: []string{"string"}, + Format: "", + }, + }, + "labels": { + SchemaProps: spec.SchemaProps{ + Type: []string{"object"}, + AdditionalProperties: &spec.SchemaOrBool{ + Allows: true, + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + }, + }, + }, + }, + }, + }, + } +} + +func schema_pkg_apis_kosmos_v1alpha1_GlobalNodeStatus(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "virtualCluster": { + SchemaProps: spec.SchemaProps{ + Type: []string{"string"}, + Format: "", + }, + }, + "conditions": { + SchemaProps: spec.SchemaProps{ + Description: "Conditions is an array of current observed node conditions. More info: https://kubernetes.io/docs/concepts/nodes/node/#condition", + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("k8s.io/api/core/v1.NodeCondition"), + }, + }, + }, + }, + }, + }, + }, + }, + Dependencies: []string{ + "k8s.io/api/core/v1.NodeCondition"}, + } +} + func schema_pkg_apis_kosmos_v1alpha1_Iptables(ref common.ReferenceCallback) common.OpenAPIDefinition { return common.OpenAPIDefinition{ Schema: spec.Schema{ @@ -2349,11 +2522,13 @@ func schema_pkg_apis_kosmos_v1alpha1_PromotePolicy(ref common.ReferenceCallback) "nodeCount": { SchemaProps: spec.SchemaProps{ Description: "NodeCount is the number of nodes to promote to the kubernetes's control plane", + Default: 0, Type: []string{"integer"}, Format: "int32", }, }, }, + Required: []string{"nodeCount"}, }, }, Dependencies: []string{ diff --git a/pkg/kubenest/constants/constant.go b/pkg/kubenest/constants/constant.go index 35fbb5add..478b5cc1e 100644 --- a/pkg/kubenest/constants/constant.go +++ b/pkg/kubenest/constants/constant.go @@ -82,7 +82,6 @@ const ( ManifestComponentsConfigmap = "components-manifest-cm" NodePoolConfigmap = "node-pool" - NodeShareState = "share" NodeVirtualclusterState = "virtualcluster" NodeFreeState = "free" diff --git a/pkg/kubenest/controller/virtualcluster.node.controller/host_port_manager.go b/pkg/kubenest/controller/virtualcluster.node.controller/hostport_manager.go similarity index 100% rename from pkg/kubenest/controller/virtualcluster.node.controller/host_port_manager.go rename to pkg/kubenest/controller/virtualcluster.node.controller/hostport_manager.go diff --git a/pkg/kubenest/controller/virtualcluster.node.controller/join-worker.go b/pkg/kubenest/controller/virtualcluster.node.controller/join_worker.go similarity index 68% rename from pkg/kubenest/controller/virtualcluster.node.controller/join-worker.go rename to pkg/kubenest/controller/virtualcluster.node.controller/join_worker.go index bdfeb8043..5e4d9ecfe 100644 --- a/pkg/kubenest/controller/virtualcluster.node.controller/join-worker.go +++ b/pkg/kubenest/controller/virtualcluster.node.controller/join_worker.go @@ -17,7 +17,6 @@ import ( "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/exector" - vcrnodepoolcontroller "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.nodepool.controller" ) // kubeadm join @@ -38,24 +37,24 @@ func isNodeReady(conditions []v1.NodeCondition) bool { return false } -func (r *NodeController) WaitNodeReady(ctx context.Context, nodeInfo vcrnodepoolcontroller.NodeItem, k8sClient kubernetes.Interface) error { +func (r *NodeController) WaitNodeReady(ctx context.Context, globalNode v1alpha1.GlobalNode, k8sClient kubernetes.Interface) error { waitCtx, cancel := context.WithTimeout(ctx, 60*time.Second) // total waiting time defer cancel() isReady := false wait.UntilWithContext(waitCtx, func(ctx context.Context) { - node, err := k8sClient.CoreV1().Nodes().Get(waitCtx, nodeInfo.Name, metav1.GetOptions{}) + node, err := k8sClient.CoreV1().Nodes().Get(waitCtx, globalNode.Name, metav1.GetOptions{}) if err == nil { if isNodeReady(node.Status.Conditions) { - klog.V(4).Infof("node %s is ready", nodeInfo.Name) + klog.V(4).Infof("node %s is ready", globalNode.Name) isReady = true cancel() } else { - klog.V(4).Infof("node %s is not ready, status: %s", nodeInfo.Name, node.Status.Phase) + klog.V(4).Infof("node %s is not ready, status: %s", globalNode.Name, node.Status.Phase) } } else { - klog.V(4).Infof("get node %s failed: %s", nodeInfo.Name, err) + klog.V(4).Infof("get node %s failed: %s", globalNode.Name, err) } }, 10*time.Second) // Interval time @@ -63,11 +62,11 @@ func (r *NodeController) WaitNodeReady(ctx context.Context, nodeInfo vcrnodepool return nil } - return fmt.Errorf("node %s is not ready", nodeInfo.Name) + return fmt.Errorf("node %s is not ready", globalNode.Name) } -func (r *NodeController) joinNode(ctx context.Context, nodeInfos []vcrnodepoolcontroller.NodeItem, virtualCluster v1alpha1.VirtualCluster, k8sClient kubernetes.Interface) error { - if len(nodeInfos) == 0 { +func (r *NodeController) joinNode(ctx context.Context, globalNodes []v1alpha1.GlobalNode, virtualCluster v1alpha1.VirtualCluster, k8sClient kubernetes.Interface) error { + if len(globalNodes) == 0 { return nil } @@ -81,9 +80,9 @@ func (r *NodeController) joinNode(ctx context.Context, nodeInfos []vcrnodepoolco clusterDNS = dnssvc.Spec.ClusterIP } - for _, nodeInfo := range nodeInfos { + for _, globalNode := range globalNodes { // add node to new cluster - exectHelper := exector.NewExectorHelper(nodeInfo.Address, "") + exectHelper := exector.NewExectorHelper(globalNode.Spec.NodeIP, "") // check checkCmd := &exector.CMDExector{ @@ -91,7 +90,7 @@ func (r *NodeController) joinNode(ctx context.Context, nodeInfos []vcrnodepoolco } ret := exectHelper.DoExector(ctx.Done(), checkCmd) if ret.Status != exector.SUCCESS { - return fmt.Errorf("check node %s failed: %s", nodeInfo.Name, ret.String()) + return fmt.Errorf("check node %s failed: %s", globalNode.Name, ret.String()) } // step(1/5) reset node @@ -100,7 +99,7 @@ func (r *NodeController) joinNode(ctx context.Context, nodeInfos []vcrnodepoolco } ret = exectHelper.DoExector(ctx.Done(), resetCmd) if ret.Status != exector.SUCCESS { - return fmt.Errorf("reset node %s failed: %s", nodeInfo.Name, ret.String()) + return fmt.Errorf("reset node %s failed: %s", globalNode.Name, ret.String()) } // step(2/5) scp ca of virtualcluster nn := types.NamespacedName{ @@ -120,7 +119,7 @@ func (r *NodeController) joinNode(ctx context.Context, nodeInfos []vcrnodepoolco } ret = exectHelper.DoExector(ctx.Done(), scpCrtCmd) if ret.Status != exector.SUCCESS { - return fmt.Errorf("scp ca.crt to node %s failed: %s", nodeInfo.Name, ret.String()) + return fmt.Errorf("scp ca.crt to node %s failed: %s", globalNode.Name, ret.String()) } // step(3/5) scp kubeconfig of virtualcluster @@ -136,7 +135,7 @@ func (r *NodeController) joinNode(ctx context.Context, nodeInfos []vcrnodepoolco } ret = exectHelper.DoExector(ctx.Done(), scpKCCmd) if ret.Status != exector.SUCCESS { - return fmt.Errorf("scp kubeconfig to node %s failed: %s", nodeInfo.Name, ret.String()) + return fmt.Errorf("scp kubeconfig to node %s failed: %s", globalNode.Name, ret.String()) } // step(4/5) scp kubelet config @@ -152,7 +151,7 @@ func (r *NodeController) joinNode(ctx context.Context, nodeInfos []vcrnodepoolco ret = exectHelper.DoExector(ctx.Done(), scpKubeletConfigCmd) if ret.Status != exector.SUCCESS { - return fmt.Errorf("scp kubelet config to node %s failed: %s", nodeInfo.Name, ret.String()) + return fmt.Errorf("scp kubelet config to node %s failed: %s", globalNode.Name, ret.String()) } // step(5/5) join node @@ -161,31 +160,35 @@ func (r *NodeController) joinNode(ctx context.Context, nodeInfos []vcrnodepoolco } ret = exectHelper.DoExector(ctx.Done(), joinCmd) if ret.Status != exector.SUCCESS { - return fmt.Errorf("join node %s failed: %s", nodeInfo.Name, ret.String()) + return fmt.Errorf("join node %s failed: %s", globalNode.Name, ret.String()) } // wait node ready - if err := r.WaitNodeReady(ctx, nodeInfo, k8sClient); err != nil { + if err := r.WaitNodeReady(ctx, globalNode, k8sClient); err != nil { + klog.Errorf("wait node %s ready failed: %s", globalNode.Name, err) return err } // TODO: maybe change kubeadm-flags.env // add label - node, err := k8sClient.CoreV1().Nodes().Get(ctx, nodeInfo.Name, metav1.GetOptions{}) + node, err := k8sClient.CoreV1().Nodes().Get(ctx, globalNode.Name, metav1.GetOptions{}) if err != nil { - return fmt.Errorf("get node %s failed: %s", nodeInfo.Name, err) + return fmt.Errorf("get node %s failed: %s", globalNode.Name, err) } updateNode := node.DeepCopy() - for k, v := range nodeInfo.Labels { + for k, v := range globalNode.Spec.Labels { node.Labels[k] = v } if _, err := k8sClient.CoreV1().Nodes().Update(ctx, updateNode, metav1.UpdateOptions{}); err != nil { - return fmt.Errorf("add label to node %s failed: %s", nodeInfo.Name, err) + return fmt.Errorf("add label to node %s failed: %s", globalNode.Name, err) } - // update nodepool status - if err := r.UpdateNodePoolState(ctx, nodeInfo.Name, NodePoolStateVirtualCluster); err != nil { + + updateGlobalNode := globalNode.DeepCopy() + updateGlobalNode.Spec.State = v1alpha1.NodeInUse + if err := r.Client.Update(context.TODO(), updateGlobalNode); err != nil { + klog.Errorf("update global node %s state failed: %s", globalNode.Name, err) return err } } diff --git a/pkg/kubenest/controller/virtualcluster.node.controller/node_controller.go b/pkg/kubenest/controller/virtualcluster.node.controller/node_controller.go index b8f7f48ee..3079fe099 100644 --- a/pkg/kubenest/controller/virtualcluster.node.controller/node_controller.go +++ b/pkg/kubenest/controller/virtualcluster.node.controller/node_controller.go @@ -8,7 +8,6 @@ import ( v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/retry" @@ -24,7 +23,7 @@ import ( "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" "github.com/kosmos.io/kosmos/pkg/kubenest/constants" - vcrnodepoolcontroller "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.nodepool.controller" + "github.com/kosmos.io/kosmos/pkg/kubenest/util" "github.com/kosmos.io/kosmos/pkg/utils" ) @@ -63,7 +62,7 @@ func (r *NodeController) SetupWithManager(mgr manager.Manager) error { Complete(r) } -func (c *NodeController) GenerateKubeclient(virtualCluster *v1alpha1.VirtualCluster) (kubernetes.Interface, error) { +func (r *NodeController) GenerateKubeclient(virtualCluster *v1alpha1.VirtualCluster) (kubernetes.Interface, error) { if len(virtualCluster.Spec.Kubeconfig) == 0 { return nil, fmt.Errorf("virtualcluster %s kubeconfig is empty", virtualCluster.Name) } @@ -89,9 +88,14 @@ func hasItemInArray(name string, f func(string) bool) bool { return f(name) } -func compareAndTranformNodes(targetNodes []v1alpha1.NodeInfo, actualNodes []v1.Node, nodePools map[string]vcrnodepoolcontroller.NodeItem) ([]vcrnodepoolcontroller.NodeItem, []vcrnodepoolcontroller.NodeItem, error) { - unjoinNodes := []vcrnodepoolcontroller.NodeItem{} - joinNodes := []vcrnodepoolcontroller.NodeItem{} +func (r *NodeController) compareAndTranformNodes(targetNodes []v1alpha1.NodeInfo, actualNodes []v1.Node) ([]v1alpha1.GlobalNode, []v1alpha1.GlobalNode, error) { + unjoinNodes := make([]v1alpha1.GlobalNode, 0) + joinNodes := make([]v1alpha1.GlobalNode, 0) + + globalNodes := &v1alpha1.GlobalNodeList{} + if err := r.Client.List(context.TODO(), globalNodes); err != nil { + return nil, nil, fmt.Errorf("failed to list global nodes: %v", err) + } // cacheMap := map[string]string{} for _, targetNode := range targetNodes { @@ -105,11 +109,11 @@ func compareAndTranformNodes(targetNodes []v1alpha1.NodeInfo, actualNodes []v1.N }) if !has { - nodePool, ok := nodePools[targetNode.NodeName] + globalNode, ok := util.FindGlobalNode(targetNode.NodeName, globalNodes.Items) if !ok { - return nil, nil, fmt.Errorf("node %s not found in node pool", targetNode.NodeName) + return nil, nil, fmt.Errorf("global node %s not found", targetNode.NodeName) } - joinNodes = append(joinNodes, nodePool) + joinNodes = append(joinNodes, *globalNode) } } @@ -124,32 +128,17 @@ func compareAndTranformNodes(targetNodes []v1alpha1.NodeInfo, actualNodes []v1.N }) if !has { - nodePool, ok := nodePools[actualNode.Name] + globalNode, ok := util.FindGlobalNode(actualNode.Name, globalNodes.Items) if !ok { - return nil, nil, fmt.Errorf("node %s not found in node pool", actualNode.Name) + return nil, nil, fmt.Errorf("global node %s not found", actualNode.Name) } - - unjoinNodes = append(unjoinNodes, nodePool) + unjoinNodes = append(unjoinNodes, *globalNode) } } return unjoinNodes, joinNodes, nil } -func (r *NodeController) GetNodePool(ctx context.Context) (map[string]vcrnodepoolcontroller.NodeItem, error) { - nodePool := v1.ConfigMap{} - if err := r.Client.Get(ctx, types.NamespacedName{Name: NodePoolCMName, Namespace: NodePoolCMNS}, &nodePool); err != nil { - return nil, fmt.Errorf("get node-pool failed: %v", err) - } - - nodePools, err := vcrnodepoolcontroller.ConvertYamlToNodeItem(nodePool.Data[NodePoolCMKeyName]) - if err != nil { - return nil, fmt.Errorf("convert node-pool failed: %v", err) - } - - return nodePools, nil -} - func (r *NodeController) UpdateVirtualClusterStatus(ctx context.Context, virtualCluster v1alpha1.VirtualCluster, status v1alpha1.Phase, reason string) error { retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { targetObj := v1alpha1.VirtualCluster{} @@ -186,13 +175,8 @@ func (r *NodeController) DoNodeTask(ctx context.Context, virtualCluster v1alpha1 return fmt.Errorf("virtualcluster %s get virtual-cluster nodes list failed: %v", virtualCluster.Name, err) } - nodePools, err := r.GetNodePool(ctx) - if err != nil { - return err - } - // compare cr and actual nodes in k8s - unjoinNodes, joinNodes, err := compareAndTranformNodes(virtualCluster.Spec.PromoteResources.NodeInfos, nodes.Items, nodePools) + unjoinNodes, joinNodes, err := r.compareAndTranformNodes(virtualCluster.Spec.PromoteResources.NodeInfos, nodes.Items) if err != nil { return fmt.Errorf("compare cr and actual nodes failed, virtual-cluster-name: %v, err: %s", virtualCluster.Name, err) } diff --git a/pkg/kubenest/controller/virtualcluster.node.controller/share.go b/pkg/kubenest/controller/virtualcluster.node.controller/share.go index 2770556cf..e69de29bb 100644 --- a/pkg/kubenest/controller/virtualcluster.node.controller/share.go +++ b/pkg/kubenest/controller/virtualcluster.node.controller/share.go @@ -1,49 +0,0 @@ -package vcnodecontroller - -import ( - "context" - - v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/util/retry" - - vcrnodepoolcontroller "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.nodepool.controller" -) - -// TODO: biz -func (r *NodeController) UpdateNodePoolState(ctx context.Context, nodeName string, nodePoolState string) error { - err := retry.RetryOnConflict(retry.DefaultRetry, func() error { - nodePool := v1.ConfigMap{} - if err := r.Client.Get(ctx, types.NamespacedName{Name: NodePoolCMName, Namespace: NodePoolCMNS}, &nodePool); err != nil { - return err - } - - updateNodePool := nodePool.DeepCopy() - - yamlStr := updateNodePool.Data[NodePoolCMKeyName] - nodePoolItem, err := vcrnodepoolcontroller.ConvertYamlToNodePoolItem(yamlStr) - if err != nil { - return err - } - - targetNodePoolItem := nodePoolItem[nodeName] - targetNodePoolItem.State = nodePoolState - - nodePoolItem[nodeName] = targetNodePoolItem - - nodePoolBytes, err := vcrnodepoolcontroller.ConvertNodePoolItemToYaml(nodePoolItem) - if err != nil { - return err - } - - updateNodePool.Data[NodePoolCMKeyName] = string(nodePoolBytes) - - if err := r.Client.Update(ctx, updateNodePool); err != nil { - return err - } - - return nil - }) - - return err -} diff --git a/pkg/kubenest/controller/virtualcluster.node.controller/unjoin-worker.go b/pkg/kubenest/controller/virtualcluster.node.controller/unjoin_worker.go similarity index 58% rename from pkg/kubenest/controller/virtualcluster.node.controller/unjoin-worker.go rename to pkg/kubenest/controller/virtualcluster.node.controller/unjoin_worker.go index 6deeb904d..35ccae0f2 100644 --- a/pkg/kubenest/controller/virtualcluster.node.controller/unjoin-worker.go +++ b/pkg/kubenest/controller/virtualcluster.node.controller/unjoin_worker.go @@ -9,11 +9,11 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" + "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/exector" - vcrnodepoolcontroller "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.nodepool.controller" ) -func (r *NodeController) joinNodeToHost(ctx context.Context, nodeInfo vcrnodepoolcontroller.NodeItem) error { +func (r *NodeController) joinNodeToHost(ctx context.Context, globalNode v1alpha1.GlobalNode) error { masterNodeIP := os.Getenv("EXECTOR_HOST_MASTER_NODE_IP") hostPort := "" if len(masterNodeIP) == 0 { @@ -26,14 +26,14 @@ func (r *NodeController) joinNodeToHost(ctx context.Context, nodeInfo vcrnodepoo // step(1/3) get join cmd ret := hostExectorHelper.DoExector(ctx.Done(), joinCmdStrCmd) if ret.Status != exector.SUCCESS { - return fmt.Errorf("get host join cmd on node %s failed: %s", nodeInfo.Name, ret.String()) + return fmt.Errorf("get host join cmd on node %s failed: %s", globalNode.Name, ret.String()) } joinCmdStr, err := getJoinCmdStr(ret.LastLog) if err != nil { return err } - exectHelper := exector.NewExectorHelper(nodeInfo.Address, "") + exectHelper := exector.NewExectorHelper(globalNode.Spec.NodeIP, "") // step(2/3) remove node from old cluster resetCmd := &exector.CMDExector{ Cmd: "sh kubelet_node_helper.sh unjoin", @@ -41,7 +41,7 @@ func (r *NodeController) joinNodeToHost(ctx context.Context, nodeInfo vcrnodepoo ret = exectHelper.DoExector(ctx.Done(), resetCmd) if ret.Status != exector.SUCCESS { - return fmt.Errorf("reset node %s failed: %s", nodeInfo.Name, ret.String()) + return fmt.Errorf("reset node %s failed: %s", globalNode.Name, ret.String()) } // step(3/3) add node to host-cluster @@ -51,33 +51,36 @@ func (r *NodeController) joinNodeToHost(ctx context.Context, nodeInfo vcrnodepoo ret = exectHelper.DoExector(ctx.Done(), joinCmd) if ret.Status != exector.SUCCESS { - return fmt.Errorf("exec join cmd on node %s failed: %s, join cmd: %s", nodeInfo.Name, ret.String(), joinCmdStr) + return fmt.Errorf("exec join cmd on node %s failed: %s, join cmd: %s", globalNode.Name, ret.String(), joinCmdStr) } return nil } -func (r *NodeController) unjoinNode(ctx context.Context, nodeInfos []vcrnodepoolcontroller.NodeItem, k8sClient kubernetes.Interface) error { +func (r *NodeController) unjoinNode(ctx context.Context, GlobalNodes []v1alpha1.GlobalNode, k8sClient kubernetes.Interface) error { // delete node from cluster - for _, nodeInfo := range nodeInfos { + for _, globalNode := range GlobalNodes { // remove node from cluster - klog.V(4).Infof("start remove node from cluster, node name: %s", nodeInfo.Name) - err := k8sClient.CoreV1().Nodes().Delete(ctx, nodeInfo.Name, metav1.DeleteOptions{}) + klog.V(4).Infof("start remove node from cluster, node name: %s", globalNode.Name) + err := k8sClient.CoreV1().Nodes().Delete(ctx, globalNode.Name, metav1.DeleteOptions{}) if err != nil { - klog.Errorf("remove node from cluster failed, node name: %s", nodeInfo.Name) - return fmt.Errorf("%s, %s", nodeInfo.Name, err) + klog.Errorf("remove node from cluster failed, node name: %s", globalNode.Name) + return fmt.Errorf("%s, %s", globalNode.Name, err) } - klog.V(4).Infof("remove node from cluster successed, node name: %s", nodeInfo.Name) + klog.V(4).Infof("remove node from cluster successed, node name: %s", globalNode.Name) // TODO: reset kubeadm-flags.env // TODO: move to node pool controller, add node to host cluster - if err := r.joinNodeToHost(ctx, nodeInfo); err != nil { - klog.Errorf("join node %s to host cluster failed: %s", nodeInfo.Name, err) + if err := r.joinNodeToHost(ctx, globalNode); err != nil { + klog.Errorf("join node %s to host cluster failed: %s", globalNode.Name, err) return err } - // update nodepool status - if err := r.UpdateNodePoolState(ctx, nodeInfo.Name, NodePoolStateFree); err != nil { + + updateGlobalNode := globalNode.DeepCopy() + updateGlobalNode.Spec.State = v1alpha1.NodeFreeState + if err := r.Client.Update(context.TODO(), updateGlobalNode); err != nil { + klog.Errorf("update global node %s state failed: %s", globalNode.Name, err) return err } } diff --git a/pkg/kubenest/controller/virtualcluster.nodepool.controller/nodepool.go b/pkg/kubenest/controller/virtualcluster.nodepool.controller/nodepool.go deleted file mode 100644 index 132c6fddf..000000000 --- a/pkg/kubenest/controller/virtualcluster.nodepool.controller/nodepool.go +++ /dev/null @@ -1,56 +0,0 @@ -package vcrnodepoolcontroller - -import ( - "gopkg.in/yaml.v3" -) - -type NodePoolMapItem struct { - Address string `yaml:"address"` - Labels map[string]string `yaml:"labels"` - Cluster string `yaml:"cluster"` - State string `yaml:"state"` -} - -type NodeItem struct { - NodePoolMapItem - Name string `yaml:"-"` -} - -func ConvertYamlToNodeItem(yamlStr string) (map[string]NodeItem, error) { - nodepoolMap := map[string]NodeItem{} - - nodepoolItem, err := ConvertYamlToNodePoolItem(yamlStr) - if err != nil { - return nil, err - } - - for k, v := range nodepoolItem { - nodepoolMap[k] = NodeItem{ - NodePoolMapItem: v, - Name: k, - } - } - - return nodepoolMap, nil -} - -func ConvertYamlToNodePoolItem(yamlStr string) (map[string]NodePoolMapItem, error) { - nodepoolItem := map[string]NodePoolMapItem{} - err := yaml.Unmarshal([]byte(yamlStr), &nodepoolItem) - if err != nil { - return nil, err - } - return nodepoolItem, nil -} - -func ConvertNodePoolItemToYaml(nodepoolItem map[string]NodePoolMapItem) ([]byte, error) { - yamlStr, err := yaml.Marshal(nodepoolItem) - if err != nil { - return nil, err - } - return yamlStr, nil -} - -// controller task -// TODO: free node need join to host cluster -// TODO: check orphan node diff --git a/pkg/kubenest/controller/virtualcluster_init_controller.go b/pkg/kubenest/controller/virtualcluster_init_controller.go index 04785b592..03b74a90a 100644 --- a/pkg/kubenest/controller/virtualcluster_init_controller.go +++ b/pkg/kubenest/controller/virtualcluster_init_controller.go @@ -8,10 +8,10 @@ import ( "time" "github.com/pkg/errors" - "gopkg.in/yaml.v3" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" + _ "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" @@ -30,6 +30,7 @@ import ( "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" "github.com/kosmos.io/kosmos/pkg/kubenest/constants" vcnodecontroller "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller" + "github.com/kosmos.io/kosmos/pkg/kubenest/util" ) type VirtualClusterInitController struct { @@ -124,17 +125,15 @@ func (c *VirtualClusterInitController) Reconcile(ctx context.Context, request re } case v1alpha1.Completed: //update request, check if promotepolicy nodes increase or decrease. - assigned, err := c.assignWorkNodes(updatedCluster) + err := c.assignWorkNodes(updatedCluster) if err != nil { return reconcile.Result{}, errors.Wrapf(err, "Error update virtualcluster %s", updatedCluster.Name) } - if assigned { // indicate nodes change request - updatedCluster.Status.Phase = v1alpha1.Updating - err = c.Update(originalCluster, updatedCluster) - if err != nil { - klog.Errorf("Error update virtualcluster %s status to %s", updatedCluster.Name, updatedCluster.Status.Phase) - return reconcile.Result{}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name) - } + updatedCluster.Status.Phase = v1alpha1.Updating + err = c.Update(originalCluster, updatedCluster) + if err != nil { + klog.Errorf("Error update virtualcluster %s status to %s", updatedCluster.Name, updatedCluster.Status.Phase) + return reconcile.Result{}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name) } default: klog.Warningf("Skip virtualcluster %s reconcile status: %s", originalCluster.Name, originalCluster.Status.Phase) @@ -178,13 +177,13 @@ func (c *VirtualClusterInitController) ensureFinalizer(virtualCluster *v1alpha1. return reconcile.Result{}, nil } -func (r *VirtualClusterInitController) removeFinalizer(virtualCluster *v1alpha1.VirtualCluster) (reconcile.Result, error) { +func (c *VirtualClusterInitController) removeFinalizer(virtualCluster *v1alpha1.VirtualCluster) (reconcile.Result, error) { if !controllerutil.ContainsFinalizer(virtualCluster, VirtualClusterControllerFinalizer) { return reconcile.Result{}, nil } controllerutil.RemoveFinalizer(virtualCluster, VirtualClusterControllerFinalizer) - err := r.Client.Update(context.TODO(), virtualCluster) + err := c.Client.Update(context.TODO(), virtualCluster) if err != nil { return reconcile.Result{Requeue: true}, err } @@ -200,7 +199,7 @@ func (c *VirtualClusterInitController) createVirtualCluster(virtualCluster *v1al if err != nil { return err } - _, err = c.assignWorkNodes(virtualCluster) + err = c.assignWorkNodes(virtualCluster) if err != nil { return errors.Wrap(err, "Error in assign work nodes") } @@ -229,94 +228,37 @@ func (c *VirtualClusterInitController) destroyVirtualCluster(virtualCluster *v1a return execute.Execute() } -// assignWorkNodes assign nodes for virtualcluster when creating or updating. return true if successfully assigned -func (c *VirtualClusterInitController) assignWorkNodes(virtualCluster *v1alpha1.VirtualCluster) (bool, error) { - promotepolicies := virtualCluster.Spec.PromotePolicies - if len(promotepolicies) == 0 { - return false, errors.New("PromotePolicies parameter undefined") - } +func (c *VirtualClusterInitController) assignWorkNodes(virtualCluster *v1alpha1.VirtualCluster) error { c.lock.Lock() defer c.lock.Unlock() - nodePool, err := c.getNodePool() - klog.V(2).Infof("Get node pool %v", nodePool) - if err != nil { - return false, errors.Wrap(err, "Get node pool error.") - } - klog.V(2).Infof("Total %d nodes in pool", len(nodePool)) - assigned := false - for _, policy := range promotepolicies { - assignedByPolicy, nodeInfos, err := c.assignNodesByPolicy(virtualCluster, policy, nodePool) - if err != nil { - return false, errors.Wrap(err, "Reassign nodes error") - } - if !assignedByPolicy { - continue - } else { - assigned = true - virtualCluster.Spec.PromoteResources.NodeInfos = nodeInfos - } + globalNodeList := &v1alpha1.GlobalNodeList{} + if err := c.Client.List(context.TODO(), globalNodeList); err != nil { + return fmt.Errorf("list global nodes: %w", err) } - if assigned { - err := c.updateNodePool(nodePool) + allNodeInfos := make([]v1alpha1.NodeInfo, 0) + globalNodes := globalNodeList.Items + for _, policy := range virtualCluster.Spec.PromotePolicies { + nodeInfos, err := c.assignNodesByPolicy(virtualCluster, policy, globalNodes) if err != nil { - return false, errors.Wrap(err, "Update node pool error.") + return fmt.Errorf("assign nodes by policy: %w", err) } - } - return assigned, nil -} - -// getNodePool get node pool configmap -func (c *VirtualClusterInitController) getNodePool() (map[string]NodePool, error) { - nodesPoolCm, err := c.RootClientSet.CoreV1().ConfigMaps(constants.KosmosNs).Get(context.TODO(), constants.NodePoolConfigmap, metav1.GetOptions{}) - if err != nil { - return nil, err - } - var nodesPool map[string]NodePool - data, ok := nodesPoolCm.Data["nodes"] - if !ok { - return nil, errors.New("Error parse nodes pool data") - } - err = yaml.Unmarshal([]byte(data), &nodesPool) - if err != nil { - return nil, errors.Wrap(err, "Unmarshal nodes pool data error") - } - return nodesPool, nil -} - -// updateNodePool update node pool configmap -func (c *VirtualClusterInitController) updateNodePool(nodePool map[string]NodePool) error { - klog.V(2).Infof("Update node pool %v", nodePool) - nodePoolYAML, err := yaml.Marshal(nodePool) - if err != nil { - return errors.Wrap(err, "Serialized node pool data error") + allNodeInfos = append(allNodeInfos, nodeInfos...) } - originalCm, err := c.RootClientSet.CoreV1().ConfigMaps(constants.KosmosNs).Get(context.TODO(), constants.NodePoolConfigmap, metav1.GetOptions{}) - if err != nil { - return err - } - originalCm.Data = map[string]string{ - "nodes": string(nodePoolYAML), - } - - _, err = c.RootClientSet.CoreV1().ConfigMaps(constants.KosmosNs).Update(context.TODO(), originalCm, metav1.UpdateOptions{}) - if err != nil { - return errors.Wrap(err, "Update node pool configmap data error") - } - klog.V(2).Info("Update node pool Success") + virtualCluster.Spec.PromoteResources.NodeInfos = allNodeInfos return nil } // nodesChangeCalculate calculate nodes changed when update virtualcluster. -func (c *VirtualClusterInitController) assignNodesByPolicy(virtualCluster *v1alpha1.VirtualCluster, policy v1alpha1.PromotePolicy, nodesPool map[string]NodePool) (bool, []v1alpha1.NodeInfo, error) { +func (c *VirtualClusterInitController) assignNodesByPolicy(virtualCluster *v1alpha1.VirtualCluster, policy v1alpha1.PromotePolicy, globalNodes []v1alpha1.GlobalNode) ([]v1alpha1.NodeInfo, error) { var matched int32 = 0 var nodesAssignedMatchedPolicy []v1alpha1.NodeInfo var nodesAssignedUnMatched []v1alpha1.NodeInfo nodesAssigned := virtualCluster.Spec.PromoteResources.NodeInfos for _, nodeInfo := range nodesAssigned { - node, ok := nodesPool[nodeInfo.NodeName] + node, ok := util.FindGlobalNode(nodeInfo.NodeName, globalNodes) if !ok { - return false, nodesAssigned, errors.Errorf("Node %s doesn't find in nodes pool", nodeInfo.NodeName) + return nodesAssigned, errors.Errorf("Node %s doesn't find in nodes pool", nodeInfo.NodeName) } if mapContains(node.Labels, policy.LabelSelector.MatchLabels) { nodesAssignedMatchedPolicy = append(nodesAssignedMatchedPolicy, nodeInfo) @@ -325,21 +267,24 @@ func (c *VirtualClusterInitController) assignNodesByPolicy(virtualCluster *v1alp nodesAssignedUnMatched = append(nodesAssignedUnMatched, nodeInfo) } } - requestNodesChanged := *policy.NodeCount - matched + requestNodesChanged := policy.NodeCount - matched if requestNodesChanged == 0 { klog.V(2).Infof("Nothing to do for policy %s", policy.LabelSelector.String()) - return false, nodesAssigned, nil + return nodesAssigned, nil } else if requestNodesChanged > 0 { // nodes needs to be increased klog.V(2).Infof("Try allocate %d nodes for policy %s", requestNodesChanged, policy.LabelSelector.String()) var cnt int32 = 0 - for name, nodeInfo := range nodesPool { - if nodeInfo.State == constants.NodeFreeState && mapContains(nodeInfo.Labels, policy.LabelSelector.MatchLabels) { - nodeInfo.State = constants.NodeVirtualclusterState - nodeInfo.Cluster = virtualCluster.Name - nodesPool[name] = nodeInfo + for _, globalNode := range globalNodes { + if globalNode.Spec.State == v1alpha1.NodeFreeState && mapContains(globalNode.Labels, policy.LabelSelector.MatchLabels) { nodesAssigned = append(nodesAssigned, v1alpha1.NodeInfo{ - NodeName: name, + NodeName: globalNode.Name, }) + // 更新globalNode的状态为占用状态 + updated := globalNode.DeepCopy() + updated.Spec.State = v1alpha1.NodeInUse + if err := c.Client.Update(context.TODO(), updated); err != nil { + return nodesAssigned, errors.Wrapf(err, "Failed to update globalNode %s to InUse", globalNode.Name) + } cnt++ } if cnt == requestNodesChanged { @@ -347,18 +292,18 @@ func (c *VirtualClusterInitController) assignNodesByPolicy(virtualCluster *v1alp } } if cnt < requestNodesChanged { - return false, nodesAssigned, errors.Errorf("There is not enough work nodes for promotepolicy %s. Desired %d, matched %d", policy.LabelSelector.String(), requestNodesChanged, matched) + return nodesAssigned, errors.Errorf("There is not enough work nodes for promotepolicy %s. Desired %d, matched %d", policy.LabelSelector.String(), requestNodesChanged, matched) } } else { // nodes needs to decrease klog.V(2).Infof("Try decrease nodes %d for policy %s", -requestNodesChanged, policy.LabelSelector.String()) decrease := int(-requestNodesChanged) if len(nodesAssignedMatchedPolicy) < decrease { - return false, nodesAssigned, errors.Errorf("Illegal work nodes decrease operation for promotepolicy %s. Desired %d, matched %d", policy.LabelSelector.String(), decrease, len(nodesAssignedMatchedPolicy)) + return nodesAssigned, errors.Errorf("Illegal work nodes decrease operation for promotepolicy %s. Desired %d, matched %d", policy.LabelSelector.String(), decrease, len(nodesAssignedMatchedPolicy)) } nodesAssigned = append(nodesAssignedUnMatched, nodesAssignedMatchedPolicy[:(len(nodesAssignedMatchedPolicy)-decrease)]...) // note: node pool will not be modified here. NodeController will modify it when node delete success } - return true, nodesAssigned, nil + return nodesAssigned, nil } func (c *VirtualClusterInitController) ensureAllPodsRunning(virtualCluster *v1alpha1.VirtualCluster, timeout time.Duration) error { diff --git a/pkg/kubenest/util/util.go b/pkg/kubenest/util/util.go new file mode 100644 index 000000000..1b7569b1e --- /dev/null +++ b/pkg/kubenest/util/util.go @@ -0,0 +1,12 @@ +package util + +import "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" + +func FindGlobalNode(nodeName string, globalNodes []v1alpha1.GlobalNode) (*v1alpha1.GlobalNode, bool) { + for _, globalNode := range globalNodes { + if globalNode.Name == nodeName { + return &globalNode, true + } + } + return nil, false +}