Skip to content

Commit

Permalink
Merge pull request #520 from l1b0k/feat/daemon
Browse files Browse the repository at this point in the history
optimize daemon setup
  • Loading branch information
BSWANG authored Aug 17, 2023
2 parents 32f66fb + abfdaab commit 18a90db
Show file tree
Hide file tree
Showing 9 changed files with 395 additions and 334 deletions.
126 changes: 126 additions & 0 deletions daemon/config.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
package daemon

import (
"github.com/AliyunContainerService/terway/pkg/aliyun"
"github.com/AliyunContainerService/terway/pkg/utils"
"github.com/AliyunContainerService/terway/types"
"github.com/AliyunContainerService/terway/types/daemon"
)

// getDynamicConfig returns (config, label, error) specified in node
// ("", "", nil) for no dynamic config for this node
func getDynamicConfig(k8s Kubernetes) (string, string, error) {
Expand All @@ -12,3 +19,122 @@ func getDynamicConfig(k8s Kubernetes) (string, string, error) {

return cfg, label, err
}

// the actual size for pool is minIdle and maxIdle
func getPoolConfig(cfg *daemon.Config, daemonMode string, limit *aliyun.Limits) (*types.PoolConfig, error) {
poolConfig := &types.PoolConfig{
SecurityGroupIDs: cfg.GetSecurityGroups(),
VSwitchSelectionPolicy: cfg.VSwitchSelectionPolicy,
DisableSecurityGroupCheck: cfg.DisableSecurityGroupCheck,
}
if cfg.ENITags == nil {
cfg.ENITags = make(map[string]string)
}
cfg.ENITags[types.NetworkInterfaceTagCreatorKey] = types.NetworkInterfaceTagCreatorValue

poolConfig.ENITags = cfg.ENITags

capacity := 0
maxENI := 0
maxMemberENI := 0

switch daemonMode {
case daemonModeVPC, daemonModeENIOnly:
maxENI = limit.Adapters
maxENI = int(float64(maxENI)*cfg.EniCapRatio) + cfg.EniCapShift - 1

// set max eni node can use
if cfg.MaxENI > 0 && cfg.MaxENI < maxENI {
maxENI = cfg.MaxENI
}

capacity = maxENI

if cfg.MaxPoolSize > maxENI {
poolConfig.MaxPoolSize = maxENI
} else {
poolConfig.MaxPoolSize = cfg.MaxPoolSize
}

if cfg.MinENI > 0 {
poolConfig.MinPoolSize = cfg.MinENI
}

if poolConfig.MinPoolSize > poolConfig.MaxPoolSize {
poolConfig.MinPoolSize = poolConfig.MaxPoolSize
}

maxMemberENI = limit.MemberAdapterLimit
if cfg.ENICapPolicy == types.ENICapPolicyPreferTrunk {
maxMemberENI = limit.MaxMemberAdapterLimit
}

poolConfig.MaxIPPerENI = 1
case daemonModeENIMultiIP:
maxENI = limit.Adapters
maxENI = int(float64(maxENI)*cfg.EniCapRatio) + cfg.EniCapShift - 1

// set max eni node can use
if cfg.MaxENI > 0 && cfg.MaxENI < maxENI {
maxENI = cfg.MaxENI
}

ipPerENI := limit.IPv4PerAdapter
if utils.IsWindowsOS() {
// NB(thxCode): don't assign the primary IP of one assistant eni.
ipPerENI--
}

capacity = maxENI * ipPerENI
if cfg.MaxPoolSize > capacity {
poolConfig.MaxPoolSize = capacity
} else {
poolConfig.MaxPoolSize = cfg.MaxPoolSize
}

if cfg.MinENI > 0 {
poolConfig.MinPoolSize = cfg.MinENI * ipPerENI
}
if poolConfig.MinPoolSize > poolConfig.MaxPoolSize {
poolConfig.MinPoolSize = poolConfig.MaxPoolSize
}

maxMemberENI = limit.MemberAdapterLimit

poolConfig.MaxIPPerENI = ipPerENI
}

poolConfig.ENITags = cfg.ENITags

requireMeta := true
if cfg.IPAMType == types.IPAMTypeCRD {
poolConfig.MaxPoolSize = 0
poolConfig.MinPoolSize = 0

if cfg.DisableDevicePlugin {
requireMeta = false
}
}

if requireMeta {
ins := aliyun.GetInstanceMeta()
zone := ins.ZoneID
if cfg.VSwitches != nil {
zoneVswitchs, ok := cfg.VSwitches[zone]
if ok && len(zoneVswitchs) > 0 {
poolConfig.VSwitchOptions = cfg.VSwitches[zone]
}
}
if len(poolConfig.VSwitchOptions) == 0 {
poolConfig.VSwitchOptions = []string{ins.VSwitchID}
}
poolConfig.ZoneID = zone
poolConfig.InstanceID = ins.InstanceID
}

poolConfig.Capacity = capacity
poolConfig.MaxENI = maxENI
poolConfig.MaxMemberENI = maxMemberENI

return poolConfig, nil
}
180 changes: 110 additions & 70 deletions daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ import (
"sync"
"time"

"github.com/AliyunContainerService/terway/deviceplugin"
"github.com/AliyunContainerService/terway/pkg/aliyun"
"github.com/AliyunContainerService/terway/pkg/aliyun/client"
"github.com/AliyunContainerService/terway/pkg/aliyun/credential"
podENITypes "github.com/AliyunContainerService/terway/pkg/apis/network.alibabacloud.com/v1beta1"
"github.com/AliyunContainerService/terway/pkg/backoff"
vswpool "github.com/AliyunContainerService/terway/pkg/controller/vswitch"
terwayIP "github.com/AliyunContainerService/terway/pkg/ip"
"github.com/AliyunContainerService/terway/pkg/link"
"github.com/AliyunContainerService/terway/pkg/logger"
Expand Down Expand Up @@ -1335,7 +1337,7 @@ func newNetworkService(ctx context.Context, configFilePath, daemonMode string) (
if ipFamily.IPv6 {
if !limit.SupportIPv6() {
ipFamily.IPv6 = false
serviceLog.Warnf("instance %s is not support ipv6", aliyun.GetInstanceMeta().InstanceType)
serviceLog.Warnf("instance %s is not support ipv6", config.InstanceType)
} else if daemonMode == daemonModeENIMultiIP && !limit.SupportMultiIPIPv6() {
ipFamily.IPv6 = false
serviceLog.Warnf("instance %s is not support ipv6", config.InstanceType)
Expand All @@ -1344,22 +1346,6 @@ func newNetworkService(ctx context.Context, configFilePath, daemonMode string) (

ecs := aliyun.NewAliyunImpl(aliyunClient, config.EnableENITrunking && !config.WaitTrunkENI, ipFamily, config.ENITagFilter)

netSrv.enableTrunk = config.EnableENITrunking

ipNetSet := &types.IPNetSet{}
if config.ServiceCIDR != "" {
cidrs := strings.Split(config.ServiceCIDR, ",")

for _, cidr := range cidrs {
ipNetSet.SetIPNet(cidr)
}
}

err = netSrv.k8s.SetSvcCidr(ipNetSet)
if err != nil {
return nil, errors.Wrapf(err, "error set k8s svcCidr")
}

netSrv.resourceDB, err = storage.NewDiskStorage(
resDBName, utils.NormalizePath(resDBPath), json.Marshal, func(bytes []byte) (interface{}, error) {
resourceRel := &types.PodResources{}
Expand All @@ -1373,13 +1359,108 @@ func newNetworkService(ctx context.Context, configFilePath, daemonMode string) (
return nil, errors.Wrapf(err, "error init resource manager storage")
}

nodeAnnotations := map[string]string{}

// get pool config
poolConfig, err := getPoolConfig(config)
poolConfig, err := getPoolConfig(config, daemonMode, limit)
if err != nil {
return nil, errors.Wrapf(err, "error get pool config")
}
serviceLog.Infof("init pool config: %+v", poolConfig)

vswPool, err := vswpool.NewSwitchPool(100, "10m")
if err != nil {
return nil, fmt.Errorf("error init vsw pool, %w", err)
}

// init trunk
if config.EnableENITrunking {
preferTrunkID := netSrv.k8s.GetTrunkID()
if preferTrunkID == "" && config.WaitTrunkENI {
preferTrunkID, err = netSrv.k8s.WaitTrunkReady()
if err != nil {
return nil, fmt.Errorf("error wait trunk ready, %w", err)
}
}

if !config.WaitTrunkENI {
enis, err := ecs.GetAttachedENIs(ctx, false, preferTrunkID)
if err != nil {
return nil, fmt.Errorf("error get attached eni, %w", err)
}
found := false
for _, eni := range enis {
if eni.Trunk && eni.ID == preferTrunkID {
found = true

poolConfig.TrunkENIID = preferTrunkID
netSrv.enableTrunk = true

nodeAnnotations[types.TrunkOn] = preferTrunkID
nodeAnnotations[string(types.MemberENIIPTypeIPs)] = strconv.Itoa(poolConfig.MaxMemberENI)
break
}
}
if !found {
if poolConfig.MaxENI > len(enis) {
vsw, err := vswPool.GetOne(ctx, ecs, poolConfig.ZoneID, poolConfig.VSwitchOptions, &vswpool.SelectOptions{
VSwitchSelectPolicy: vswpool.VSwitchSelectionPolicyMost,
})
if err != nil {
return nil, fmt.Errorf("error get vsw, %w", err)
}

eni, err := ecs.AllocateENI(ctx, vsw.ID, poolConfig.SecurityGroupIDs, poolConfig.InstanceID, true, 1, poolConfig.ENITags)
if err != nil {
return nil, fmt.Errorf("error allocate eni, %w", err)
}

poolConfig.TrunkENIID = eni.ID
netSrv.enableTrunk = true

nodeAnnotations[types.TrunkOn] = eni.ID
nodeAnnotations[string(types.MemberENIIPTypeIPs)] = strconv.Itoa(poolConfig.MaxMemberENI)
} else {
serviceLog.Warnf("no trunk eni found, fallback to non-trunk mode")

config.EnableENITrunking = false
config.DisableDevicePlugin = true
}
}
} else {
// WaitTrunkENI enabled, we believe what we got.
poolConfig.TrunkENIID = preferTrunkID
netSrv.enableTrunk = true

nodeAnnotations[types.TrunkOn] = preferTrunkID
nodeAnnotations[string(types.MemberENIIPTypeIPs)] = strconv.Itoa(poolConfig.MaxMemberENI)
}
}

if daemonMode != daemonModeVPC {
nodeAnnotations[string(types.NormalIPTypeIPs)] = strconv.Itoa(poolConfig.Capacity)
}

if !(daemonMode == daemonModeENIMultiIP && !config.EnableENITrunking) {
if !config.DisableDevicePlugin {
res := deviceplugin.ENITypeENI
capacity := poolConfig.MaxENI
if config.EnableENITrunking {
res = deviceplugin.ENITypeMember
capacity = poolConfig.MaxMemberENI
}

dp := deviceplugin.NewENIDevicePlugin(capacity, res)
go dp.Serve()
}
}

// ensure node annotations
err = netSrv.k8s.PatchNodeAnnotations(nodeAnnotations)
if err != nil {
return nil, fmt.Errorf("error patch node annotations, %w", err)
}

localResource := make(map[string]map[string]resourceManagerInitItem)
resObjList, err := netSrv.resourceDB.List()
if err != nil {
Expand Down Expand Up @@ -1454,16 +1535,18 @@ func newNetworkService(ctx context.Context, configFilePath, daemonMode string) (
panic("unsupported daemon mode" + daemonMode)
}

//start gc loop
netSrv.startGarbageCollectionLoop()
period := poolCheckPeriod
periodCfg := os.Getenv("POOL_CHECK_PERIOD_SECONDS")
periodSeconds, err := strconv.Atoi(periodCfg)
if err == nil {
period = time.Duration(periodSeconds) * time.Second
}
if config.IPAMType != types.IPAMTypeCRD {
//start gc loop
netSrv.startGarbageCollectionLoop()
period := poolCheckPeriod
periodCfg := os.Getenv("POOL_CHECK_PERIOD_SECONDS")
periodSeconds, err := strconv.Atoi(periodCfg)
if err == nil {
period = time.Duration(periodSeconds) * time.Second
}

go wait.JitterUntil(netSrv.startPeriodCheck, period, 1, true, wait.NeverStop)
go wait.JitterUntil(netSrv.startPeriodCheck, period, 1, true, wait.NeverStop)
}

// register for tracing
_ = tracing.Register(tracing.ResourceTypeNetworkService, "default", netSrv)
Expand All @@ -1473,49 +1556,6 @@ func newNetworkService(ctx context.Context, configFilePath, daemonMode string) (
return netSrv, nil
}

func getPoolConfig(cfg *daemon.Config) (*types.PoolConfig, error) {
poolConfig := &types.PoolConfig{
MaxPoolSize: cfg.MaxPoolSize,
MinPoolSize: cfg.MinPoolSize,
MaxENI: cfg.MaxENI,
MinENI: cfg.MinENI,
EniCapRatio: cfg.EniCapRatio,
EniCapShift: cfg.EniCapShift,
SecurityGroups: cfg.GetSecurityGroups(),
VSwitchSelectionPolicy: cfg.VSwitchSelectionPolicy,
EnableENITrunking: cfg.EnableENITrunking,
ENICapPolicy: cfg.ENICapPolicy,
DisableDevicePlugin: cfg.DisableDevicePlugin,
WaitTrunkENI: cfg.WaitTrunkENI,
DisableSecurityGroupCheck: cfg.DisableSecurityGroupCheck,
}
if len(poolConfig.SecurityGroups) > 5 {
return nil, fmt.Errorf("security groups should not be more than 5, current %d", len(poolConfig.SecurityGroups))
}
ins := aliyun.GetInstanceMeta()
zone := ins.ZoneID
if cfg.VSwitches != nil {
zoneVswitchs, ok := cfg.VSwitches[zone]
if ok && len(zoneVswitchs) > 0 {
poolConfig.VSwitch = cfg.VSwitches[zone]
}
}
if len(poolConfig.VSwitch) == 0 {
poolConfig.VSwitch = []string{ins.VSwitchID}
}
poolConfig.ENITags = cfg.ENITags
poolConfig.VPC = ins.VPCID
poolConfig.InstanceID = ins.InstanceID

if cfg.IPAMType == types.IPAMTypeCRD {
poolConfig.MaxPoolSize = 0
poolConfig.MinPoolSize = 0
poolConfig.MaxENI = 0
poolConfig.MinENI = 0
}
return poolConfig, nil
}

func parseExtraRoute(routes []podENITypes.Route) []*rpc.Route {
if routes == nil {
return nil
Expand Down
Loading

0 comments on commit 18a90db

Please sign in to comment.