diff --git a/daemon/config.go b/daemon/config.go index 950a334b..42c7bbf5 100644 --- a/daemon/config.go +++ b/daemon/config.go @@ -1,5 +1,12 @@ package daemon +import ( + "github.com/AliyunContainerService/terway/pkg/aliyun" + "github.com/AliyunContainerService/terway/pkg/utils" + "github.com/AliyunContainerService/terway/types" + "github.com/AliyunContainerService/terway/types/daemon" +) + // getDynamicConfig returns (config, label, error) specified in node // ("", "", nil) for no dynamic config for this node func getDynamicConfig(k8s Kubernetes) (string, string, error) { @@ -12,3 +19,122 @@ func getDynamicConfig(k8s Kubernetes) (string, string, error) { return cfg, label, err } + +// the actual size for pool is minIdle and maxIdle +func getPoolConfig(cfg *daemon.Config, daemonMode string, limit *aliyun.Limits) (*types.PoolConfig, error) { + poolConfig := &types.PoolConfig{ + SecurityGroupIDs: cfg.GetSecurityGroups(), + VSwitchSelectionPolicy: cfg.VSwitchSelectionPolicy, + DisableSecurityGroupCheck: cfg.DisableSecurityGroupCheck, + } + if cfg.ENITags == nil { + cfg.ENITags = make(map[string]string) + } + cfg.ENITags[types.NetworkInterfaceTagCreatorKey] = types.NetworkInterfaceTagCreatorValue + + poolConfig.ENITags = cfg.ENITags + + capacity := 0 + maxENI := 0 + maxMemberENI := 0 + + switch daemonMode { + case daemonModeVPC, daemonModeENIOnly: + maxENI = limit.Adapters + maxENI = int(float64(maxENI)*cfg.EniCapRatio) + cfg.EniCapShift - 1 + + // set max eni node can use + if cfg.MaxENI > 0 && cfg.MaxENI < maxENI { + maxENI = cfg.MaxENI + } + + capacity = maxENI + + if cfg.MaxPoolSize > maxENI { + poolConfig.MaxPoolSize = maxENI + } else { + poolConfig.MaxPoolSize = cfg.MaxPoolSize + } + + if cfg.MinENI > 0 { + poolConfig.MinPoolSize = cfg.MinENI + } + + if poolConfig.MinPoolSize > poolConfig.MaxPoolSize { + poolConfig.MinPoolSize = poolConfig.MaxPoolSize + } + + maxMemberENI = limit.MemberAdapterLimit + if cfg.ENICapPolicy == types.ENICapPolicyPreferTrunk { + maxMemberENI = limit.MaxMemberAdapterLimit + } + + poolConfig.MaxIPPerENI = 1 + case daemonModeENIMultiIP: + maxENI = limit.Adapters + maxENI = int(float64(maxENI)*cfg.EniCapRatio) + cfg.EniCapShift - 1 + + // set max eni node can use + if cfg.MaxENI > 0 && cfg.MaxENI < maxENI { + maxENI = cfg.MaxENI + } + + ipPerENI := limit.IPv4PerAdapter + if utils.IsWindowsOS() { + // NB(thxCode): don't assign the primary IP of one assistant eni. + ipPerENI-- + } + + capacity = maxENI * ipPerENI + if cfg.MaxPoolSize > capacity { + poolConfig.MaxPoolSize = capacity + } else { + poolConfig.MaxPoolSize = cfg.MaxPoolSize + } + + if cfg.MinENI > 0 { + poolConfig.MinPoolSize = cfg.MinENI * ipPerENI + } + if poolConfig.MinPoolSize > poolConfig.MaxPoolSize { + poolConfig.MinPoolSize = poolConfig.MaxPoolSize + } + + maxMemberENI = limit.MemberAdapterLimit + + poolConfig.MaxIPPerENI = ipPerENI + } + + poolConfig.ENITags = cfg.ENITags + + requireMeta := true + if cfg.IPAMType == types.IPAMTypeCRD { + poolConfig.MaxPoolSize = 0 + poolConfig.MinPoolSize = 0 + + if cfg.DisableDevicePlugin { + requireMeta = false + } + } + + if requireMeta { + ins := aliyun.GetInstanceMeta() + zone := ins.ZoneID + if cfg.VSwitches != nil { + zoneVswitchs, ok := cfg.VSwitches[zone] + if ok && len(zoneVswitchs) > 0 { + poolConfig.VSwitchOptions = cfg.VSwitches[zone] + } + } + if len(poolConfig.VSwitchOptions) == 0 { + poolConfig.VSwitchOptions = []string{ins.VSwitchID} + } + poolConfig.ZoneID = zone + poolConfig.InstanceID = ins.InstanceID + } + + poolConfig.Capacity = capacity + poolConfig.MaxENI = maxENI + poolConfig.MaxMemberENI = maxMemberENI + + return poolConfig, nil +} diff --git a/daemon/daemon.go b/daemon/daemon.go index 058531e2..f4fa38b7 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -12,11 +12,13 @@ import ( "sync" "time" + "github.com/AliyunContainerService/terway/deviceplugin" "github.com/AliyunContainerService/terway/pkg/aliyun" "github.com/AliyunContainerService/terway/pkg/aliyun/client" "github.com/AliyunContainerService/terway/pkg/aliyun/credential" podENITypes "github.com/AliyunContainerService/terway/pkg/apis/network.alibabacloud.com/v1beta1" "github.com/AliyunContainerService/terway/pkg/backoff" + vswpool "github.com/AliyunContainerService/terway/pkg/controller/vswitch" terwayIP "github.com/AliyunContainerService/terway/pkg/ip" "github.com/AliyunContainerService/terway/pkg/link" "github.com/AliyunContainerService/terway/pkg/logger" @@ -1335,7 +1337,7 @@ func newNetworkService(ctx context.Context, configFilePath, daemonMode string) ( if ipFamily.IPv6 { if !limit.SupportIPv6() { ipFamily.IPv6 = false - serviceLog.Warnf("instance %s is not support ipv6", aliyun.GetInstanceMeta().InstanceType) + serviceLog.Warnf("instance %s is not support ipv6", config.InstanceType) } else if daemonMode == daemonModeENIMultiIP && !limit.SupportMultiIPIPv6() { ipFamily.IPv6 = false serviceLog.Warnf("instance %s is not support ipv6", config.InstanceType) @@ -1344,22 +1346,6 @@ func newNetworkService(ctx context.Context, configFilePath, daemonMode string) ( ecs := aliyun.NewAliyunImpl(aliyunClient, config.EnableENITrunking && !config.WaitTrunkENI, ipFamily, config.ENITagFilter) - netSrv.enableTrunk = config.EnableENITrunking - - ipNetSet := &types.IPNetSet{} - if config.ServiceCIDR != "" { - cidrs := strings.Split(config.ServiceCIDR, ",") - - for _, cidr := range cidrs { - ipNetSet.SetIPNet(cidr) - } - } - - err = netSrv.k8s.SetSvcCidr(ipNetSet) - if err != nil { - return nil, errors.Wrapf(err, "error set k8s svcCidr") - } - netSrv.resourceDB, err = storage.NewDiskStorage( resDBName, utils.NormalizePath(resDBPath), json.Marshal, func(bytes []byte) (interface{}, error) { resourceRel := &types.PodResources{} @@ -1373,13 +1359,108 @@ func newNetworkService(ctx context.Context, configFilePath, daemonMode string) ( return nil, errors.Wrapf(err, "error init resource manager storage") } + nodeAnnotations := map[string]string{} + // get pool config - poolConfig, err := getPoolConfig(config) + poolConfig, err := getPoolConfig(config, daemonMode, limit) if err != nil { return nil, errors.Wrapf(err, "error get pool config") } serviceLog.Infof("init pool config: %+v", poolConfig) + vswPool, err := vswpool.NewSwitchPool(100, "10m") + if err != nil { + return nil, fmt.Errorf("error init vsw pool, %w", err) + } + + // init trunk + if config.EnableENITrunking { + preferTrunkID := netSrv.k8s.GetTrunkID() + if preferTrunkID == "" && config.WaitTrunkENI { + preferTrunkID, err = netSrv.k8s.WaitTrunkReady() + if err != nil { + return nil, fmt.Errorf("error wait trunk ready, %w", err) + } + } + + if !config.WaitTrunkENI { + enis, err := ecs.GetAttachedENIs(ctx, false, preferTrunkID) + if err != nil { + return nil, fmt.Errorf("error get attached eni, %w", err) + } + found := false + for _, eni := range enis { + if eni.Trunk && eni.ID == preferTrunkID { + found = true + + poolConfig.TrunkENIID = preferTrunkID + netSrv.enableTrunk = true + + nodeAnnotations[types.TrunkOn] = preferTrunkID + nodeAnnotations[string(types.MemberENIIPTypeIPs)] = strconv.Itoa(poolConfig.MaxMemberENI) + break + } + } + if !found { + if poolConfig.MaxENI > len(enis) { + vsw, err := vswPool.GetOne(ctx, ecs, poolConfig.ZoneID, poolConfig.VSwitchOptions, &vswpool.SelectOptions{ + VSwitchSelectPolicy: vswpool.VSwitchSelectionPolicyMost, + }) + if err != nil { + return nil, fmt.Errorf("error get vsw, %w", err) + } + + eni, err := ecs.AllocateENI(ctx, vsw.ID, poolConfig.SecurityGroupIDs, poolConfig.InstanceID, true, 1, poolConfig.ENITags) + if err != nil { + return nil, fmt.Errorf("error allocate eni, %w", err) + } + + poolConfig.TrunkENIID = eni.ID + netSrv.enableTrunk = true + + nodeAnnotations[types.TrunkOn] = eni.ID + nodeAnnotations[string(types.MemberENIIPTypeIPs)] = strconv.Itoa(poolConfig.MaxMemberENI) + } else { + serviceLog.Warnf("no trunk eni found, fallback to non-trunk mode") + + config.EnableENITrunking = false + config.DisableDevicePlugin = true + } + } + } else { + // WaitTrunkENI enabled, we believe what we got. + poolConfig.TrunkENIID = preferTrunkID + netSrv.enableTrunk = true + + nodeAnnotations[types.TrunkOn] = preferTrunkID + nodeAnnotations[string(types.MemberENIIPTypeIPs)] = strconv.Itoa(poolConfig.MaxMemberENI) + } + } + + if daemonMode != daemonModeVPC { + nodeAnnotations[string(types.NormalIPTypeIPs)] = strconv.Itoa(poolConfig.Capacity) + } + + if !(daemonMode == daemonModeENIMultiIP && !config.EnableENITrunking) { + if !config.DisableDevicePlugin { + res := deviceplugin.ENITypeENI + capacity := poolConfig.MaxENI + if config.EnableENITrunking { + res = deviceplugin.ENITypeMember + capacity = poolConfig.MaxMemberENI + } + + dp := deviceplugin.NewENIDevicePlugin(capacity, res) + go dp.Serve() + } + } + + // ensure node annotations + err = netSrv.k8s.PatchNodeAnnotations(nodeAnnotations) + if err != nil { + return nil, fmt.Errorf("error patch node annotations, %w", err) + } + localResource := make(map[string]map[string]resourceManagerInitItem) resObjList, err := netSrv.resourceDB.List() if err != nil { @@ -1454,16 +1535,18 @@ func newNetworkService(ctx context.Context, configFilePath, daemonMode string) ( panic("unsupported daemon mode" + daemonMode) } - //start gc loop - netSrv.startGarbageCollectionLoop() - period := poolCheckPeriod - periodCfg := os.Getenv("POOL_CHECK_PERIOD_SECONDS") - periodSeconds, err := strconv.Atoi(periodCfg) - if err == nil { - period = time.Duration(periodSeconds) * time.Second - } + if config.IPAMType != types.IPAMTypeCRD { + //start gc loop + netSrv.startGarbageCollectionLoop() + period := poolCheckPeriod + periodCfg := os.Getenv("POOL_CHECK_PERIOD_SECONDS") + periodSeconds, err := strconv.Atoi(periodCfg) + if err == nil { + period = time.Duration(periodSeconds) * time.Second + } - go wait.JitterUntil(netSrv.startPeriodCheck, period, 1, true, wait.NeverStop) + go wait.JitterUntil(netSrv.startPeriodCheck, period, 1, true, wait.NeverStop) + } // register for tracing _ = tracing.Register(tracing.ResourceTypeNetworkService, "default", netSrv) @@ -1473,49 +1556,6 @@ func newNetworkService(ctx context.Context, configFilePath, daemonMode string) ( return netSrv, nil } -func getPoolConfig(cfg *daemon.Config) (*types.PoolConfig, error) { - poolConfig := &types.PoolConfig{ - MaxPoolSize: cfg.MaxPoolSize, - MinPoolSize: cfg.MinPoolSize, - MaxENI: cfg.MaxENI, - MinENI: cfg.MinENI, - EniCapRatio: cfg.EniCapRatio, - EniCapShift: cfg.EniCapShift, - SecurityGroups: cfg.GetSecurityGroups(), - VSwitchSelectionPolicy: cfg.VSwitchSelectionPolicy, - EnableENITrunking: cfg.EnableENITrunking, - ENICapPolicy: cfg.ENICapPolicy, - DisableDevicePlugin: cfg.DisableDevicePlugin, - WaitTrunkENI: cfg.WaitTrunkENI, - DisableSecurityGroupCheck: cfg.DisableSecurityGroupCheck, - } - if len(poolConfig.SecurityGroups) > 5 { - return nil, fmt.Errorf("security groups should not be more than 5, current %d", len(poolConfig.SecurityGroups)) - } - ins := aliyun.GetInstanceMeta() - zone := ins.ZoneID - if cfg.VSwitches != nil { - zoneVswitchs, ok := cfg.VSwitches[zone] - if ok && len(zoneVswitchs) > 0 { - poolConfig.VSwitch = cfg.VSwitches[zone] - } - } - if len(poolConfig.VSwitch) == 0 { - poolConfig.VSwitch = []string{ins.VSwitchID} - } - poolConfig.ENITags = cfg.ENITags - poolConfig.VPC = ins.VPCID - poolConfig.InstanceID = ins.InstanceID - - if cfg.IPAMType == types.IPAMTypeCRD { - poolConfig.MaxPoolSize = 0 - poolConfig.MinPoolSize = 0 - poolConfig.MaxENI = 0 - poolConfig.MinENI = 0 - } - return poolConfig, nil -} - func parseExtraRoute(routes []podENITypes.Route) []*rpc.Route { if routes == nil { return nil diff --git a/daemon/eni-multi-ip.go b/daemon/eni-multi-ip.go index 12139687..0f42d1e8 100644 --- a/daemon/eni-multi-ip.go +++ b/daemon/eni-multi-ip.go @@ -9,8 +9,6 @@ import ( "sync" "time" - "github.com/AliyunContainerService/terway/deviceplugin" - "github.com/AliyunContainerService/terway/pkg/aliyun" apiErr "github.com/AliyunContainerService/terway/pkg/aliyun/client/errors" terwayIP "github.com/AliyunContainerService/terway/pkg/ip" "github.com/AliyunContainerService/terway/pkg/ipam" @@ -628,7 +626,7 @@ func (f *eniIPFactory) ListResource() (map[string]types.NetworkResource, error) func (f *eniIPFactory) Reconcile() { // check security group - err := f.eniFactory.ecs.CheckEniSecurityGroup(context.Background(), f.eniFactory.securityGroups) + err := f.eniFactory.ecs.CheckEniSecurityGroup(context.Background(), f.eniFactory.securityGroupIDs) if err != nil { _ = tracing.RecordNodeEvent(corev1.EventTypeWarning, "ResourceInvalid", fmt.Sprintf("eni has misconfiged security group. %s", err.Error())) } @@ -920,83 +918,18 @@ func newENIIPResourceManager(poolConfig *types.PoolConfig, ecs ipam.API, k8s Kub factory := &eniIPFactory{ name: factoryNameENIIP, eniFactory: eniFactory, - enableTrunk: poolConfig.EnableENITrunking, + enableTrunk: poolConfig.TrunkENIID != "", enis: []*ENI{}, eniOperChan: make(chan struct{}, maxEniOperating), ipResultChan: make(chan *ENIIP, maxIPBacklog), ipFamily: ipFamily, - } - var capacity, maxEni, memberENIPod, adapters int - - if !poolConfig.DisableDevicePlugin { - limit, err := aliyun.GetLimit(ecs, aliyun.GetInstanceMeta().InstanceType) - if err != nil { - return nil, fmt.Errorf("error get max eni for eniip factory, %w", err) - } - maxEni = limit.Adapters - maxEni = int(float64(maxEni)*poolConfig.EniCapRatio) + poolConfig.EniCapShift - 1 - - ipPerENI := limit.IPv4PerAdapter - if utils.IsWindowsOS() { - // NB(thxCode): don't assign the primary IP of one assistant eni. - ipPerENI-- - } - factory.eniMaxIP = ipPerENI - factory.nodeMaxENI = maxEni - - if poolConfig.MaxENI != 0 && poolConfig.MaxENI < maxEni { - maxEni = poolConfig.MaxENI - } - capacity = maxEni * ipPerENI - if capacity < 0 { - capacity = 0 - } - memberENIPod = limit.MemberAdapterLimit - if memberENIPod < 0 { - memberENIPod = 0 - } - - factory.maxENI = make(chan struct{}, maxEni) - - if poolConfig.MinENI != 0 { - poolConfig.MinPoolSize = poolConfig.MinENI * ipPerENI - } - - if poolConfig.MinPoolSize > capacity { - eniIPLog.Infof("min pool size bigger than node capacity, set min pool size to capacity") - poolConfig.MinPoolSize = capacity - } - - if poolConfig.MaxPoolSize > capacity { - eniIPLog.Infof("max pool size bigger than node capacity, set max pool size to capacity") - poolConfig.MaxPoolSize = capacity - } - - if poolConfig.MinPoolSize > poolConfig.MaxPoolSize { - eniIPLog.Warnf("min_pool_size bigger: %v than max_pool_size: %v, set max_pool_size to the min_pool_size", - poolConfig.MinPoolSize, poolConfig.MaxPoolSize) - poolConfig.MaxPoolSize = poolConfig.MinPoolSize - } - - adapters = limit.Adapters - } else { - capacity = 1 - maxEni = 1 - memberENIPod = 0 - adapters = 0 - } - - if poolConfig.WaitTrunkENI { - logger.DefaultLogger.Infof("waitting trunk eni ready") - factory.trunkOnEni, err = k8s.WaitTrunkReady() - if err != nil { - return nil, err - } - logger.DefaultLogger.Infof("trunk eni found %s", factory.trunkOnEni) + maxENI: make(chan struct{}, poolConfig.MaxENI), + eniMaxIP: poolConfig.MaxIPPerENI, + nodeMaxENI: poolConfig.MaxENI, } // eniip factory metrics - factory.metricENICount = metric.ENIIPFactoryENICount.WithLabelValues(factory.name, fmt.Sprint(maxEni)) + factory.metricENICount = metric.ENIIPFactoryENICount.WithLabelValues(factory.name, fmt.Sprint(poolConfig.MaxENI)) var trunkENI *types.ENI poolCfg := pool.Config{ Name: poolNameENIIP, @@ -1004,7 +937,7 @@ func newENIIPResourceManager(poolConfig *types.PoolConfig, ecs ipam.API, k8s Kub MaxIdle: poolConfig.MaxPoolSize, MinIdle: poolConfig.MinPoolSize, Factory: factory, - Capacity: capacity, + Capacity: poolConfig.Capacity, IPConditionHandler: k8s.PatchNodeIPResCondition, Initializer: func(holder pool.ResourceHolder) error { ctx := context.Background() @@ -1014,23 +947,17 @@ func newENIIPResourceManager(poolConfig *types.PoolConfig, ecs ipam.API, k8s Kub return fmt.Errorf("error get attach ENI on pool init, %w", err) } - if factory.enableTrunk && memberENIPod > 0 { + if factory.trunkOnEni != "" { + found := false for _, eni := range enis { - if eni.Trunk { - factory.trunkOnEni = eni.ID - } - if eni.ID == factory.trunkOnEni { + if eni.Trunk && factory.trunkOnEni == eni.ID { + found = true trunkENI = eni + break } } - if factory.trunkOnEni == "" && len(enis) < adapters-1 { - trunkENIRes, err := factory.eniFactory.CreateWithIPCount(1, true) - if err != nil { - return errors.Wrapf(err, "error init trunk eni") - } - trunkENI, _ = trunkENIRes[0].(*types.ENI) - factory.trunkOnEni = trunkENI.ID - enis = append(enis, trunkENI) + if !found { + return fmt.Errorf("trunk eni %s not found", factory.trunkOnEni) } } @@ -1072,7 +999,7 @@ func newENIIPResourceManager(poolConfig *types.PoolConfig, ecs ipam.API, k8s Kub poolENI.ips = append(poolENI.ips, &ENIIP{ ENIIP: eniIP, }) - metric.ENIIPFactoryIPCount.WithLabelValues(factory.name, poolENI.MAC, fmt.Sprint(maxEni)).Inc() + metric.ENIIPFactoryIPCount.WithLabelValues(factory.name, poolENI.MAC, fmt.Sprint(poolConfig.MaxENI)).Inc() if !ok { holder.AddIdle(eniIP) @@ -1098,7 +1025,7 @@ func newENIIPResourceManager(poolConfig *types.PoolConfig, ecs ipam.API, k8s Kub poolENI.ips = append(poolENI.ips, &ENIIP{ ENIIP: eniIP, }) - metric.ENIIPFactoryIPCount.WithLabelValues(factory.name, poolENI.MAC, fmt.Sprint(maxEni)).Inc() + metric.ENIIPFactoryIPCount.WithLabelValues(factory.name, poolENI.MAC, fmt.Sprint(poolConfig.MaxENI)).Inc() holder.AddInuse(eniIP, podInfoKey(res.podInfo.Namespace, res.podInfo.Name)) @@ -1125,7 +1052,7 @@ func newENIIPResourceManager(poolConfig *types.PoolConfig, ecs ipam.API, k8s Kub poolENI.ips = append(poolENI.ips, &ENIIP{ ENIIP: eniIP, }) - metric.ENIIPFactoryIPCount.WithLabelValues(factory.name, poolENI.MAC, fmt.Sprint(maxEni)).Inc() + metric.ENIIPFactoryIPCount.WithLabelValues(factory.name, poolENI.MAC, fmt.Sprint(poolConfig.MaxENI)).Inc() if ipFamily.IPv4 && ipFamily.IPv6 && (unUsed.IPv6 == nil || unUsed.IPv4 == nil) { holder.AddInvalid(eniIP) @@ -1155,30 +1082,6 @@ func newENIIPResourceManager(poolConfig *types.PoolConfig, ecs ipam.API, k8s Kub pool: p, } - //init device plugin for ENI - if poolConfig.EnableENITrunking && factory.trunkOnEni != "" && !poolConfig.DisableDevicePlugin { - dp := deviceplugin.NewENIDevicePlugin(memberENIPod, deviceplugin.ENITypeMember) - go dp.Serve() - - err = k8s.PatchTrunkInfo(factory.trunkOnEni) - if err != nil { - return nil, errors.Wrapf(err, "error patch trunk info on node") - } - } - - if capacity > 0 { - err = k8s.PatchAvailableIPs(types.NormalIPTypeIPs, capacity) - if err != nil { - return nil, errors.Wrapf(err, "error patch available ips") - } - } - if memberENIPod > 0 && factory.trunkOnEni != "" && poolConfig.EnableENITrunking { - err = k8s.PatchAvailableIPs(types.MemberENIIPTypeIPs, memberENIPod) - if err != nil { - return nil, errors.Wrapf(err, "error patch available ips") - } - } - _ = tracing.Register(tracing.ResourceTypeFactory, factory.name, factory) return mgr, nil } diff --git a/daemon/eni.go b/daemon/eni.go index 7dd50e47..76547354 100644 --- a/daemon/eni.go +++ b/daemon/eni.go @@ -13,8 +13,6 @@ import ( apiErr "github.com/AliyunContainerService/terway/pkg/aliyun/client/errors" - "github.com/AliyunContainerService/terway/deviceplugin" - "github.com/AliyunContainerService/terway/pkg/aliyun" "github.com/AliyunContainerService/terway/pkg/ipam" "github.com/AliyunContainerService/terway/pkg/logger" "github.com/AliyunContainerService/terway/pkg/pool" @@ -56,57 +54,14 @@ func newENIResourceManager(poolConfig *types.PoolConfig, ecs ipam.API, allocated _ = tracing.Register(tracing.ResourceTypeFactory, factoryNameENI, factory) - var capacity, memberLimit int - - if !poolConfig.DisableDevicePlugin { - limit, err := aliyun.GetLimit(ecs, aliyun.GetInstanceMeta().InstanceType) - if err != nil { - return nil, fmt.Errorf("error get max eni for eni factory, %w", err) - } - capacity = limit.Adapters - capacity = int(float64(capacity)*poolConfig.EniCapRatio) + poolConfig.EniCapShift - 1 - - if poolConfig.MaxENI != 0 && poolConfig.MaxENI < capacity { - capacity = poolConfig.MaxENI - } - - if poolConfig.MaxPoolSize > capacity { - poolConfig.MaxPoolSize = capacity - } - - if poolConfig.MinENI != 0 { - poolConfig.MinPoolSize = poolConfig.MinENI - } - - memberLimit = limit.MemberAdapterLimit - if poolConfig.ENICapPolicy == types.ENICapPolicyPreferTrunk { - memberLimit = limit.MaxMemberAdapterLimit - } - } else { - // NB(l1b0k): adapt DisableDevicePlugin func, will refactor latter - capacity = 1 - memberLimit = 0 - poolConfig.MaxPoolSize = 1 - poolConfig.MinPoolSize = 0 - } - var trunkENI *types.ENI - if poolConfig.WaitTrunkENI { - logger.DefaultLogger.Infof("waitting trunk eni ready") - factory.trunkOnEni, err = k8s.WaitTrunkReady() - if err != nil { - return nil, err - } - logger.DefaultLogger.Infof("trunk eni found %s", factory.trunkOnEni) - } - poolCfg := pool.Config{ Name: poolNameENI, Type: typeNameENI, MaxIdle: poolConfig.MaxPoolSize, MinIdle: poolConfig.MinPoolSize, - Capacity: capacity, + Capacity: poolConfig.Capacity, Factory: factory, IPConditionHandler: k8s.PatchNodeIPResCondition, Initializer: func(holder pool.ResourceHolder) error { @@ -119,28 +74,20 @@ func newENIResourceManager(poolConfig *types.PoolConfig, ecs ipam.API, allocated return fmt.Errorf("error get attach ENI on pool init, %w", err) } - if factory.enableTrunk && memberLimit > 0 { - logger.DefaultLogger.Infof("lookup trunk eni") + if factory.trunkOnEni != "" { + found := false for _, eni := range enis { - if eni.Trunk { - logger.DefaultLogger.Infof("find trunk eni %s", eni.ID) - factory.trunkOnEni = eni.ID - } - if eni.ID == factory.trunkOnEni { + if eni.Trunk && factory.trunkOnEni == eni.ID { + found = true trunkENI = eni - eni.Trunk = true + break } } - if factory.trunkOnEni == "" && len(enis) < capacity-1 { - trunkENIRes, err := factory.CreateWithIPCount(1, true) - if err != nil { - return errors.Wrapf(err, "error init trunk eni") - } - trunkENI, _ = trunkENIRes[0].(*types.ENI) - factory.trunkOnEni = trunkENI.ID - enis = append(enis, trunkENI) + if !found { + return fmt.Errorf("trunk eni %s not found", factory.trunkOnEni) } } + for _, e := range enis { if ipFamily.IPv6 { _, ipv6, err := ecs.GetENIIPs(ctx, e.MAC) @@ -169,37 +116,6 @@ func newENIResourceManager(poolConfig *types.PoolConfig, ecs ipam.API, allocated trunkENI: trunkENI, } - if poolConfig.DisableDevicePlugin { - return mgr, nil - } - //init deviceplugin for ENI - realCap := 0 - eniType := deviceplugin.ENITypeENI - if !poolConfig.EnableENITrunking { - realCap = capacity - } - - // report only trunk is created - if poolConfig.EnableENITrunking && factory.trunkOnEni != "" { - eniType = deviceplugin.ENITypeMember - realCap = memberLimit - err = k8s.PatchTrunkInfo(factory.trunkOnEni) - if err != nil { - return nil, errors.Wrapf(err, "error patch trunk info on node") - } - } - - if capacity > 0 { - err = k8s.PatchAvailableIPs(types.NormalIPTypeIPs, capacity) - if err != nil { - return nil, errors.Wrapf(err, "error patch available ips") - } - } - - logger.DefaultLogger.Infof("set deviceplugin cap %d", realCap) - dp := deviceplugin.NewENIDevicePlugin(realCap, eniType) - go dp.Serve() - return mgr, nil } @@ -247,9 +163,9 @@ type eniFactory struct { name string enableTrunk bool trunkOnEni string - switches []string + vSwitchOptions []string eniTags map[string]string - securityGroups []string + securityGroupIDs []string instanceID string ecs ipam.API vswitchCnt []vswitch @@ -260,19 +176,19 @@ type eniFactory struct { } func newENIFactory(poolConfig *types.PoolConfig, ecs ipam.API) (*eniFactory, error) { - if len(poolConfig.SecurityGroups) == 0 { + if len(poolConfig.SecurityGroupIDs) == 0 { securityGroups, err := ecs.GetAttachedSecurityGroups(context.Background(), poolConfig.InstanceID) if err != nil { return nil, errors.Wrapf(err, "error get security group on factory init") } - poolConfig.SecurityGroups = securityGroups + poolConfig.SecurityGroupIDs = securityGroups } return &eniFactory{ name: factoryNameENI, - switches: poolConfig.VSwitch, + vSwitchOptions: poolConfig.VSwitchOptions, eniTags: poolConfig.ENITags, - securityGroups: poolConfig.SecurityGroups, - enableTrunk: poolConfig.EnableENITrunking, + securityGroupIDs: poolConfig.SecurityGroupIDs, + enableTrunk: poolConfig.TrunkENIID != "", instanceID: poolConfig.InstanceID, ecs: ecs, vswitchCnt: make([]vswitch, 0), @@ -285,18 +201,18 @@ func (f *eniFactory) GetVSwitches() ([]vswitch, error) { var vSwitches []vswitch - vswCnt := len(f.switches) + vswCnt := len(f.vSwitchOptions) // If there is ONLY ONE vswitch, then there is no need for ordering per switches' available IP counts, // return the slice with only this vswitch. if vswCnt == 1 { return []vswitch{{ - id: f.switches[0], + id: f.vSwitchOptions[0], ipCount: 0, }}, nil } if f.vswitchSelectionPolicy == types.VSwitchSelectionPolicyRandom { - vSwitches = lo.Map(f.switches, func(item string, index int) vswitch { + vSwitches = lo.Map(f.vSwitchOptions, func(item string, index int) vswitch { return vswitch{ id: item, ipCount: 0, @@ -320,7 +236,7 @@ func (f *eniFactory) GetVSwitches() ([]vswitch, error) { if (len(f.vswitchCnt) == 0 && f.tsExpireAt.IsZero()) || start.After(f.tsExpireAt) { f.vswitchCnt = make([]vswitch, 0) // Loop vsw slice to get each vsw's available IP count. - for _, vswID := range f.switches { + for _, vswID := range f.vSwitchOptions { var vsw *vpc.VSwitch vsw, err = f.ecs.DescribeVSwitchByID(context.Background(), vswID) if err != nil { @@ -347,7 +263,7 @@ func (f *eniFactory) GetVSwitches() ([]vswitch, error) { return f.vswitchCnt[i].ipCount > f.vswitchCnt[j].ipCount }) } else { - vSwitches = lo.Map(f.switches, func(item string, index int) vswitch { + vSwitches = lo.Map(f.vSwitchOptions, func(item string, index int) vswitch { return vswitch{ id: item, ipCount: 0, @@ -373,7 +289,7 @@ func (f *eniFactory) CreateWithIPCount(count int, trunk bool) ([]types.NetworkRe for k, v := range f.eniTags { tags[k] = v } - eni, err := f.ecs.AllocateENI(context.Background(), vSwitches[0].id, f.securityGroups, f.instanceID, trunk, count, tags) + eni, err := f.ecs.AllocateENI(context.Background(), vSwitches[0].id, f.securityGroupIDs, f.instanceID, trunk, count, tags) if err != nil { if strings.Contains(err.Error(), apiErr.InvalidVSwitchIDIPNotEnough) { reportIPExhaustive := false @@ -395,7 +311,7 @@ func (f *eniFactory) CreateWithIPCount(count int, trunk bool) ([]types.NetworkRe } else if strings.Contains(err.Error(), apiErr.ErrSecurityGroupInstanceLimitExceed) { return nil, &types.IPInsufficientError{ Err: err, - Reason: fmt.Sprintf("security group %v exceeded max ip limit", f.securityGroups)} + Reason: fmt.Sprintf("security group %v exceeded max ip limit", f.securityGroupIDs)} } return nil, err } @@ -413,7 +329,7 @@ func (f *eniFactory) Dispose(resource types.NetworkResource) error { func (f *eniFactory) Config() []tracing.MapKeyValueEntry { config := []tracing.MapKeyValueEntry{ {Key: tracingKeyName, Value: f.name}, - {Key: tracingKeyVSwitches, Value: strings.Join(f.switches, " ")}, + {Key: tracingKeyVSwitches, Value: strings.Join(f.vSwitchOptions, " ")}, {Key: tracingKeyVSwitchSelectionPolicy, Value: f.vswitchSelectionPolicy}, } @@ -476,7 +392,7 @@ func (f *eniFactory) Reconcile() { if f.disableSecurityGroupCheck { return } - err := f.ecs.CheckEniSecurityGroup(context.Background(), f.securityGroups) + err := f.ecs.CheckEniSecurityGroup(context.Background(), f.securityGroupIDs) if err != nil { _ = tracing.RecordNodeEvent(corev1.EventTypeWarning, "ResourceInvalid", fmt.Sprintf("eni has misconfiged security group. %s", err.Error())) } diff --git a/daemon/k8s.go b/daemon/k8s.go index e10b673b..2486976d 100644 --- a/daemon/k8s.go +++ b/daemon/k8s.go @@ -63,8 +63,8 @@ type Kubernetes interface { GetNodeCidr() *types.IPNetSet SetNodeAllocatablePod(count int) error PatchEipInfo(info *types.PodInfo) error - PatchTrunkInfo(trunkEni string) error - PatchAvailableIPs(ipType types.PodIPTypeIPs, count int) error + + PatchNodeAnnotations(anno map[string]string) error PatchPodIPInfo(info *types.PodInfo, ips string) error PatchNodeIPResCondition(status corev1.ConditionStatus, reason, message string) error WaitPodENIInfo(info *types.PodInfo) (podEni *podENITypes.PodENI, err error) @@ -73,9 +73,10 @@ type Kubernetes interface { RecordPodEvent(podName, podNamespace, eventType, reason, message string) error GetNodeDynamicConfigLabel() string GetDynamicConfigWithName(name string) (string, error) - SetSvcCidr(svcCidr *types.IPNetSet) error SetCustomStatefulWorkloadKinds(kinds []string) error WaitTrunkReady() (string, error) + + GetTrunkID() string } type k8s struct { @@ -136,15 +137,11 @@ func (k *k8s) PatchNodeIPResCondition(status corev1.ConditionStatus, reason, mes return nil } -func (k *k8s) PatchAvailableIPs(ipType types.PodIPTypeIPs, count int) error { - return k.patchNodeAnno(string(ipType), strconv.Itoa(count)) -} - -func (k *k8s) PatchTrunkInfo(trunkEni string) error { - return k.patchNodeAnno(types.TrunkOn, trunkEni) -} +func (k *k8s) PatchNodeAnnotations(anno map[string]string) error { + if len(anno) == 0 { + return nil + } -func (k *k8s) patchNodeAnno(key, val string) error { node, err := k.client.CoreV1().Nodes().Get(context.TODO(), k.nodeName, metav1.GetOptions{ ResourceVersion: "0", }) @@ -152,21 +149,31 @@ func (k *k8s) patchNodeAnno(key, val string) error { return err } - if node.GetAnnotations() != nil { - if preVal, ok := node.GetAnnotations()[key]; ok { - if preVal == val { - return nil - } + satisfy := true + for key, val := range anno { + vv, ok := node.Annotations[key] + if !ok { + satisfy = false + break + } + if vv != val { + satisfy = false + break } } - node.Annotations[key] = val - annotationPatchStr := fmt.Sprintf(`{"metadata":{"annotations":{"%v":"%v"}}}`, key, val) - _, err = k.client.CoreV1().Nodes().Patch(context.TODO(), k.nodeName, apiTypes.MergePatchType, []byte(annotationPatchStr), metav1.PatchOptions{}) + if satisfy { + return nil + } + + out, err := json.Marshal(anno) if err != nil { return err } - return nil + + annotationPatchStr := fmt.Sprintf(`{"metadata":{"annotations":%s}}`, string(out)) + _, err = k.client.CoreV1().Nodes().Patch(context.TODO(), k.nodeName, apiTypes.MergePatchType, []byte(annotationPatchStr), metav1.PatchOptions{}) + return err } func (k *k8s) SetCustomStatefulWorkloadKinds(kinds []string) error { @@ -185,7 +192,7 @@ func (k *k8s) SetCustomStatefulWorkloadKinds(kinds []string) error { return nil } -func (k *k8s) SetSvcCidr(svcCidr *types.IPNetSet) error { +func (k *k8s) setSvcCIDR(svcCidr *types.IPNetSet) error { k.Lock() defer k.Unlock() @@ -306,6 +313,10 @@ func (k *k8s) WaitTrunkReady() (string, error) { return id, err } +func (k *k8s) GetTrunkID() string { + return k.node.Annotations[types.TrunkOn] +} + // newK8S return Kubernetes service by pod spec and daemon mode func newK8S(daemonMode string, globalConfig *daemon.Config) (Kubernetes, error) { restConfig := ctrl.GetConfigOrDie() @@ -374,6 +385,17 @@ func newK8S(daemonMode string, globalConfig *daemon.Config) (Kubernetes, error) apiConnTime: time.Now(), Locker: &sync.RWMutex{}, } + + svcCIDR := &types.IPNetSet{} + cidrs := strings.Split(globalConfig.ServiceCIDR, ",") + + for _, cidr := range cidrs { + svcCIDR.SetIPNet(cidr) + } + err = k8sObj.setSvcCIDR(svcCIDR) + if err != nil { + return nil, err + } podENICli, err := v1beta1.NewForConfig(restConfig) if err != nil { return nil, errors.Wrapf(err, "error init pod ENI client") diff --git a/daemon/server.go b/daemon/server.go index 048de85a..cc7c0010 100644 --- a/daemon/server.go +++ b/daemon/server.go @@ -95,6 +95,7 @@ func Run(ctx context.Context, socketFilePath, debugSocketListen, configFilePath, } go func() { + serviceLog.Infof("start serving on %s", socketFilePath) err = grpcServer.Serve(l) if err != nil { log.Errorf("error start grpc server: %v", err) diff --git a/pkg/controller/vswitch/vswitch.go b/pkg/controller/vswitch/vswitch.go index 4d116b6f..6d60e09a 100644 --- a/pkg/controller/vswitch/vswitch.go +++ b/pkg/controller/vswitch/vswitch.go @@ -20,11 +20,13 @@ import ( "context" "fmt" "math/rand" + "sort" "time" - "github.com/AliyunContainerService/terway/pkg/aliyun/client" "k8s.io/apimachinery/pkg/util/cache" "sigs.k8s.io/controller-runtime/pkg/log" + + "github.com/AliyunContainerService/terway/pkg/aliyun/client" ) // Switch hole all switch info from both terway config and podNetworking @@ -37,6 +39,12 @@ type Switch struct { IPv6CIDR string } +type ByAvailableIP []Switch + +func (a ByAvailableIP) Len() int { return len(a) } +func (a ByAvailableIP) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a ByAvailableIP) Less(i, j int) bool { return a[i].AvailableIPCount > a[j].AvailableIPCount } + // SwitchPool contain all vSwitches type SwitchPool struct { cache *cache.LRUExpireCache @@ -60,9 +68,37 @@ func (s *SwitchPool) GetOne(ctx context.Context, client client.VSwitch, zone str selectOptions := &SelectOptions{} selectOptions.ApplyOptions(opts) - if selectOptions.VSwitchSelectPolicy == VSwitchSelectionPolicyRandom { - rand.Seed(time.Now().UnixNano()) + switch selectOptions.VSwitchSelectPolicy { + case VSwitchSelectionPolicyRandom: rand.Shuffle(len(ids), func(i, j int) { ids[i], ids[j] = ids[j], ids[i] }) + case VSwitchSelectionPolicyMost: + // lookup all vsw in cache and get one matched + // try sort the vsw + + var byAvailableIP ByAvailableIP + for _, id := range ids { + vsw, err := s.GetByID(ctx, client, id) + if err != nil { + log.FromContext(ctx).Error(err, "get vSwitch", "id", id) + continue + } + + if vsw.Zone != zone { + continue + } + if vsw.AvailableIPCount == 0 { + continue + } + byAvailableIP = append(byAvailableIP, *vsw) + } + + sort.Sort(byAvailableIP) + // keep the below logic untouched + newOrder := make([]string, 0, len(byAvailableIP)) + for _, vsw := range byAvailableIP { + newOrder = append(newOrder, vsw.ID) + } + ids = newOrder } // lookup all vsw in cache and get one matched @@ -117,6 +153,16 @@ func (s *SwitchPool) GetByID(ctx context.Context, client client.VSwitch, id stri return sw, nil } +func (s *SwitchPool) Block(id string) { + v, ok := s.cache.Get(id) + if !ok { + return + } + vsw := *(v.(*Switch)) + vsw.AvailableIPCount = 0 + s.cache.Add(id, &vsw, s.ttl) +} + // Add Switch to cache. Test purpose. func (s *SwitchPool) Add(sw *Switch) { s.cache.Add(sw.ID, sw, s.ttl) @@ -133,6 +179,7 @@ type SelectionPolicy string const ( VSwitchSelectionPolicyOrdered SelectionPolicy = "ordered" VSwitchSelectionPolicyRandom SelectionPolicy = "random" + VSwitchSelectionPolicyMost SelectionPolicy = "most" ) type SelectOption interface { diff --git a/types/config.go b/types/config.go index 1eb1e2df..02d500bc 100644 --- a/types/config.go +++ b/types/config.go @@ -2,22 +2,23 @@ package types // PoolConfig configuration of pool and resource factory type PoolConfig struct { - MaxPoolSize int - MinPoolSize int - MinENI int - MaxENI int - VPC string - Zone string - VSwitch []string - ENITags map[string]string - SecurityGroups []string - InstanceID string - EniCapRatio float64 - EniCapShift int - VSwitchSelectionPolicy string - EnableENITrunking bool - ENICapPolicy ENICapPolicy - DisableDevicePlugin bool - WaitTrunkENI bool + Capacity int // the max res can hold in the pool + MaxENI int // the max eni terway can be created (already exclude main eni) + MaxMemberENI int // the max member eni can be created + MaxIPPerENI int + + MaxPoolSize int + MinPoolSize int + + ZoneID string + VSwitchOptions []string + ENITags map[string]string + SecurityGroupIDs []string + InstanceID string + + VSwitchSelectionPolicy string + DisableSecurityGroupCheck bool + + TrunkENIID string } diff --git a/types/daemon/config.go b/types/daemon/config.go index e8a8279b..b300bde0 100644 --- a/types/daemon/config.go +++ b/types/daemon/config.go @@ -109,6 +109,11 @@ func (c *Config) Validate() error { default: return fmt.Errorf("unsupported ipStack %s in configMap", c.IPStack) } + + if len(c.SecurityGroups) > 5 { + return fmt.Errorf("security groups should not be more than 5, current %d", len(c.SecurityGroups)) + } + return nil }