Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable ena-express #298

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitallowed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
key = 'ParallelClusterEnableEnaExpressPolicyArn'
15 changes: 13 additions & 2 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,16 @@ This project creates a ParallelCluster configuration file that is documented in
useOnDemand: bool
UseSpot: bool
DisableSimultaneousMultithreading: bool
EnableEfa: bool
PlacementGroupName: str
<a href="#include-instancetypes">InstanceTypes</a>:
- str
- str:
UseOnDemand: bool
UseSpot: bool
DisableSimultaneousMultithreading: bool
EnableEfa: bool
PlacementGroupName: str
<a href="#nodecounts">NodeCounts</a>:
<a href="#defaultmincount">DefaultMinCount</a>: str
<a href="#defaultmaxcount">DefaultMaxCount</a>: str
Expand Down Expand Up @@ -373,7 +377,14 @@ type: bool

default: False

Recommend to not use EFA unless necessary to avoid insufficient capacity errors when starting new instances in group or when multiple instance types in the group.
This will enable EFA for all compute resources with instances that support EFA.

This can also be controlled for individual instance types in the InstanceConfig section.

If EFA is enabled without specifying a placement group name, then each compute resource is assigned its own managed placement group.

NOTE: Most EDA workloads cannot take advantage of EFA because they don't use MPI or NCCL.
I recommend to not use EFA unless necessary to avoid insufficient capacity errors when starting new instances in group or when multiple instance types are in the group.

See [https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/placement-groups.html#placement-groups-cluster](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/placement-groups.html#placement-groups-cluster)

Expand Down Expand Up @@ -827,7 +838,7 @@ Exclude patterns are processed first and take precedence over any includes.
Instance families and types are regular expressions with implicit '^' and '$' at the begining and end.

Each element in the array can be either a regular expression string or a dictionary where the only key
is the regular expression string and that has overrides **UseOnDemand**, **UseSpot**, and **DisableSimultaneousMultithreading** for the matching instance families or instance types.
is the regular expression string and that has overrides **UseOnDemand**, **UseSpot**, **DisableSimultaneousMultithreading**, **EnableEfa**, and **PlacementGroupName** for the matching instance families or instance types.

The settings for instance families overrides the defaults, and the settings for instance types override the others.

Expand Down
18 changes: 15 additions & 3 deletions source/EC2InstanceTypeInfoPkg/EC2InstanceTypeInfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,24 @@ def __init__(self, regions, get_savings_plans=True, json_filename=None, debug=Fa
# Endpoints only supported in 2 regions: https://docs.aws.amazon.com/cli/latest/reference/pricing/index.html
self.pricing_client = boto3.client('pricing', region_name='us-east-1')

# Check region names first to make sure opt-in regions are enabled
self.region_names = {}
missing_region_names = False
for region in sorted(self.regions):
if region in self.instance_type_and_family_info and json_filename:
logger.info(f'Using EC2 instance info from {json_filename} for {region}')
continue
region_name = self.get_region_name(region)
if not region_name:
logger.error(f"Could not find region name for {region}. Is this a new region or does it need to be enabled for your account?")
missing_region_names = True
continue
self.region_names[region] = region_name
if missing_region_names:
exit(1)

for region in sorted(self.regions):
if region in self.instance_type_and_family_info and json_filename:
logger.info(f'Using EC2 instance info from {json_filename} for {region}')
continue
region_name = self.region_names[region]
logger.info(f'Getting EC2 instance info for {region} ({region_name})')
assert(self.valid_credentials)
self.ec2_client = boto3.client('ec2', region_name=region)
Expand Down Expand Up @@ -187,6 +197,7 @@ def get_instance_type_and_family_info(self, region):
instance_type_info[instanceType]['Hypervisor'] = instanceTypeDict.get('Hypervisor', '')
instance_type_info[instanceType]['NetworkPerformance'] = instanceTypeDict['NetworkInfo']['NetworkPerformance']
instance_type_info[instanceType]['EfaSupported'] = instanceTypeDict['NetworkInfo']['EfaSupported']
instance_type_info[instanceType]['EnaSrdSupported'] = instanceTypeDict['NetworkInfo']['EnaSrdSupported']
if 'GpuInfo' in instanceTypeDict and 'Gpus' in instanceTypeDict['GpuInfo']:
instance_type_info[instanceType]['GpuCount'] = int(instanceTypeDict['GpuInfo']['Gpus'][0].get('Count', 0))
instance_type_info[instanceType]['GpuManufacturer'] = instanceTypeDict['GpuInfo']['Gpus'][0].get('Manufacturer', "")
Expand Down Expand Up @@ -528,6 +539,7 @@ def get_region_name(self, region_code):
with open(endpoint_file, 'r') as f:
data = json.load(f)
missing_region_names = {
'ap-southeast-5': {'description': 'Asia Pacific (Malaysia)'},
'ca-west-1': {'description': 'Canada (Calgary)'}
}
for missing_region in missing_region_names:
Expand Down
8 changes: 8 additions & 0 deletions source/EC2InstanceTypeInfoPkg/get_ec2_instance_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,24 @@
from EC2InstanceTypeInfoPkg.EC2InstanceTypeInfo import EC2InstanceTypeInfo
import logging
from sys import exit
from VersionCheck import logger as VersionCheck_logger, VersionCheck

if __name__ == '__main__':
try:
parser = argparse.ArgumentParser(description="Get EC2 instance pricing info.", formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("--region", "-r", type=str, default=[], action='append', help="AWS region(s) to get info for.")
parser.add_argument("--input", '-i', type=str, default=None, help="JSON input file. Reads existing info from previous runs. Can speed up rerun if it failed to collect the data for a region.")
parser.add_argument("--output-csv", '-o', type=str, default=None, help="CSV output file. Default: instance_type_info.csv")
parser.add_argument("--disable-version-check", action='store_const', const=True, default=False, help="Disable git version check")
parser.add_argument("--debug", "-d", action='store_const', const=True, default=False, help="Enable debug messages")
args = parser.parse_args()

if args.debug:
VersionCheck_logger.setLevel(logging.DEBUG)

if not args.disable_version_check and not VersionCheck().check_git_version():
exit(1)

if args.input:
print(f"Reading existing instance info from {args.input}")
ec2InstanceTypeInfo = EC2InstanceTypeInfo(args.region, json_filename=args.input, debug=args.debug)
Expand Down
4 changes: 2 additions & 2 deletions source/EC2InstanceTypeInfoPkg/retry_boto3_throttling.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ def f_retry(*args, **kwargs):
attempt += 1
return f(*args, **kwargs)
except ClientError as e:
logging.exception("Caught exception")
logging.debug("Caught exception")
if e.response['Error']['Code'] in ['RequestLimitExceeded', 'InternalError', 'ThrottlingException']:
pass
else:
logging.exception("Rethrew exception")
logging.debug("Rethrew exception")
raise e
logger.debug("%s" % (traceback.format_exc()))
logger.debug("attempt=%d" % attempt)
Expand Down
6 changes: 5 additions & 1 deletion source/SlurmPlugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -1964,7 +1964,9 @@ def get_instance_types_from_instance_config(self, instance_config: dict, regions
default_instance_type_config = {
'UseOnDemand': instance_config['UseOnDemand'],
'UseSpot': instance_config['UseSpot'],
'DisableSimultaneousMultithreading': instance_config['DisableSimultaneousMultithreading']
'DisableSimultaneousMultithreading': instance_config['DisableSimultaneousMultithreading'],
'EnableEfa': instance_config['EnableEfa'],
'PlacementGroupName': instance_config.get('PlacementGroupName', None)
}

instance_types = {}
Expand Down Expand Up @@ -2069,6 +2071,8 @@ def get_instance_types_from_instance_config(self, instance_config: dict, regions
instance_type_config['UseOnDemand'] = instance_type_config.get('UseOnDemand', instance_family_config.get('UseOnDemand', default_instance_type_config['UseOnDemand']))
instance_type_config['UseSpot'] = instance_type_config.get('UseSpot', instance_family_config.get('UseSpot', default_instance_type_config['UseSpot']))
instance_type_config['DisableSimultaneousMultithreading'] = instance_type_config.get('DisableSimultaneousMultithreading', instance_family_config.get('DisableSimultaneousMultithreading', default_instance_type_config['DisableSimultaneousMultithreading']))
instance_type_config['EnableEfa'] = instance_type_config.get('EnableEfa', instance_family_config.get('EnableEfa', default_instance_type_config['EnableEfa']))
instance_type_config['PlacementGroupName'] = instance_type_config.get('PlacementGroupName', instance_family_config.get('PlacementGroupName', default_instance_type_config['PlacementGroupName']))

region_instance_types[instance_type] = instance_type_config

Expand Down
30 changes: 26 additions & 4 deletions source/cdk/cdk_slurm_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -1054,6 +1054,20 @@ def create_parallel_cluster_assets(self):
# If use managed_policy_name, then get the following cfn_nag warning.
# W28: Resource found with an explicit name, this disallows updates that require replacement of this resource

self.parallel_cluster_enable_ena_express_policy = iam.ManagedPolicy(
self, "ParallelClusterEnableEnaExpressPolicy",
path = '/parallelcluster/',
statements = [
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=[
'ec2:ModifyNetworkInterfaceAttribute',
],
resources=['*']
)
]
)

self.create_munge_key_secret()

self.playbooks_asset = s3_assets.Asset(self, 'Playbooks',
Expand Down Expand Up @@ -2760,7 +2774,7 @@ def create_parallel_cluster_config(self):
if not instance_type_config['UseSpot']:
continue
logger.debug(f"Creating queue for {purchase_option} {instance_type}")
efa_supported = self.plugin.get_EfaSupported(self.cluster_region, instance_type) and self.config['slurm']['ParallelClusterConfig']['EnableEfa']
efa_enabled = self.plugin.get_EfaSupported(self.cluster_region, instance_type) and instance_type_config['EnableEfa']
mem_mb = self.plugin.get_MemoryInMiB(self.cluster_region, instance_type)
mem_gb = int(mem_mb / 1024)
core_count = int(self.plugin.get_CoreCount(self.cluster_region, instance_type))
Expand Down Expand Up @@ -2828,18 +2842,21 @@ def create_parallel_cluster_config(self):
'MaxCount': max_count,
'DisableSimultaneousMultithreading': instance_type_config['DisableSimultaneousMultithreading'],
'Instances': [],
'Efa': {'Enabled': efa_supported},
'Efa': {'Enabled': efa_enabled},
'Networking': {
'PlacementGroup': {
'Enabled': efa_supported
'Enabled': efa_enabled
}
}
}
if efa_enabled and instance_type_config['PlacementGroupName']:
compute_resource['Networking']['PlacementGroup']['Name'] = instance_type_config['PlacementGroupName']
compute_resource['Instances'].append(
{
'InstanceType': instance_type
}
)

if config_schema.PARALLEL_CLUSTER_SUPPORTS_NODE_WEIGHTS(self.PARALLEL_CLUSTER_VERSION):
compute_resource['StaticNodePriority'] = int(price * 1000)
compute_resource['DynamicNodePriority'] = int(price * 10000)
Expand Down Expand Up @@ -3020,6 +3037,10 @@ def create_parallel_cluster_config(self):
key = 'ParallelClusterAssetReadPolicyArn',
value = self.parallel_cluster_asset_read_policy.managed_policy_arn
)
self.create_parallel_cluster_config_lambda.add_environment(
key = 'ParallelClusterEnableEnaExpressPolicyArn',
value = self.parallel_cluster_enable_ena_express_policy.managed_policy_arn
)
self.create_parallel_cluster_config_lambda.add_environment(
key = 'ParallelClusterJwtWritePolicyArn',
value = self.parallel_cluster_jwt_write_policy.managed_policy_arn
Expand Down Expand Up @@ -3161,7 +3182,8 @@ def create_queue_config(self, queue_name, allocation_strategy, purchase_option):
'AdditionalIamPolicies': [
{'Policy': 'arn:aws:iam::aws:policy/AmazonSSMManagedInstanceCore'},
{'Policy': '{{ParallelClusterAssetReadPolicyArn}}'},
{'Policy': '{{ParallelClusterSnsPublishPolicyArn}}'}
{'Policy': '{{ParallelClusterSnsPublishPolicyArn}}'},
{'Policy': '{{ParallelClusterEnableEnaExpressPolicyArn}}'}
]
},
'Networking': {
Expand Down
16 changes: 11 additions & 5 deletions source/cdk/config_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -1530,9 +1530,6 @@ def get_config_schema(config):
},
Optional('Architecture', default=DEFAULT_ARCHITECTURE): And(str, lambda s: s in VALID_ARCHITECTURES),
Optional('ComputeNodeAmi'): And(str, lambda s: s.startswith('ami-')),
# Recommend to not use EFA unless necessary to avoid insufficient capacity errors when starting new instances in group or when multiple instance types in the group
# See https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/placement-groups.html#placement-groups-cluster
Optional('EnableEfa', default=False): bool,
Optional('Database'): {
Optional('DatabaseStackName'): str,
Optional('FQDN'): str,
Expand Down Expand Up @@ -1641,6 +1638,11 @@ def get_config_schema(config):
# Configure spot instances
Optional('UseSpot', default=True): bool,
Optional('DisableSimultaneousMultithreading', default=True): bool,
# This is a global setting that can be overridden for instance types in InstanceConfig.
# Recommend to not use EFA unless necessary to avoid insufficient capacity errors when starting new instances in group or when multiple instance types in the group
# See https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/placement-groups.html#placement-groups-cluster
Optional('EnableEfa', default=False): bool,
Optional('PlacementGroupName'): str,
Optional('CpuVendor', default=cpu_vendors): [
And(str, lambda s: s in cpu_vendors)
],
Expand All @@ -1665,7 +1667,9 @@ def get_config_schema(config):
str: {
Optional('UseOnDemand'): bool,
Optional('UseSpot'): bool,
Optional('DisableSimultaneousMultithreading'): bool
Optional('DisableSimultaneousMultithreading'): bool,
Optional('EnableEfa'): bool,
Optional('PlacementGroupName'): str
}
},
lambda d: len(d) == 1
Expand All @@ -1680,7 +1684,9 @@ def get_config_schema(config):
str: {
Optional('UseOnDemand'): bool,
Optional('UseSpot'): bool,
Optional('DisableSimultaneousMultithreading'): bool
Optional('DisableSimultaneousMultithreading'): bool,
Optional('EnableEfa'): bool,
Optional('PlacementGroupName'): str
}
},
lambda d: len(d) == 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ if [[ -e $config_dir/users_groups.json ]]; then
$config_bin_dir/create_users_groups.py -i $config_dir/users_groups.json
fi

# Enable ENA Express
TOKEN=$(curl -X PUT "http://169.254.169.254/latest/api/token" -H "X-aws-ec2-metadata-token-ttl-seconds: 21600")
mac=$(curl -H "X-aws-ec2-metadata-token: $TOKEN" http://169.254.169.254/latest/meta-data/network/interfaces/macs/)
eni_id=$(curl -H "X-aws-ec2-metadata-token: $TOKEN" http://169.254.169.254/latest/meta-data/network/interfaces/macs/${mac}interface-id/)
aws ec2 modify-network-interface-attribute --network-interface-id ${eni_id} --ena-srd-specification 'EnaSrdEnabled=true,EnaSrdUdpSpecification={EnaSrdUdpEnabled=true}'

# ansible_compute_node_vars_yml_s3_url="s3://$assets_bucket/$assets_base_key/config/ansible/ansible_compute_node_vars.yml"

# # Configure using ansible
Expand Down