diff --git a/.buildkite/generate_pipeline.py b/.buildkite/generate_pipeline.py index 3889105d02d..050e129135e 100644 --- a/.buildkite/generate_pipeline.py +++ b/.buildkite/generate_pipeline.py @@ -21,11 +21,12 @@ clouds are not supported yet, smoke tests for those clouds are not generated. """ +import argparse +import collections import os -import random import re import subprocess -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Tuple import click from conftest import cloud_to_pytest_keyword @@ -64,8 +65,59 @@ 'edit directly.\n') -def _extract_marked_tests(file_path: str, - filter_marks: List[str]) -> Dict[str, List[str]]: +def _parse_args(args: Optional[str] = None): + """ + Parse command-line arguments to figure out which clouds to run + and the -k pattern for tests. + + :return: (list_of_clouds, k_pattern) + """ + if args: + args_list = args.split() + else: + args_list = [] + parser = argparse.ArgumentParser( + description="Process cloud arguments for tests") + + # Flags for recognized clouds + for cloud in PYTEST_TO_CLOUD_KEYWORD.keys(): + parser.add_argument(f"--{cloud}", action="store_true") + + # Generic cloud argument, which takes a value (e.g., --generic-cloud aws) + parser.add_argument("--generic-cloud") + + # -k argument for a test selection pattern + parser.add_argument("-k") + + parsed_args, _ = parser.parse_known_args(args_list) + + # Collect chosen clouds from the flags + # TODO(zpoint): get default clouds from the conftest.py + default_clouds_to_run = [] + for cloud in PYTEST_TO_CLOUD_KEYWORD.keys(): + if getattr(parsed_args, cloud): + default_clouds_to_run.append(cloud) + if default_clouds_to_run: + default_clouds_to_run = list( + set(default_clouds_to_run) & set(CLOUD_QUEUE_MAP.keys())) + # if user pass clouds we don't support, we should revert back to default + if not default_clouds_to_run: + default_clouds_to_run = DEFAULT_CLOUDS_TO_RUN + + # If a generic cloud is specified, it overrides any chosen clouds + if (parsed_args.generic_cloud and + parsed_args.generic_cloud in CLOUD_QUEUE_MAP): + default_clouds_to_run = [parsed_args.generic_cloud] + + if not default_clouds_to_run: + default_clouds_to_run = DEFAULT_CLOUDS_TO_RUN + + return default_clouds_to_run, parsed_args.k + + +def _extract_marked_tests( + file_path: str, args: str +) -> Dict[str, Tuple[List[str], List[str], List[Optional[str]]]]: """Extract test functions and filter clouds using pytest.mark from a Python test file. @@ -79,43 +131,58 @@ def _extract_marked_tests(file_path: str, rerun failures. Additionally, the parallelism would be controlled by pytest instead of the buildkite job queue. """ - cmd = f'pytest {file_path} --collect-only' + cmd = f'pytest {file_path} --collect-only {args}' output = subprocess.run(cmd, shell=True, capture_output=True, text=True) matches = re.findall('Collected .+?\.py::(.+?) with marks: \[(.*?)\]', output.stdout) - function_name_marks_map = {} + print(f'args: {args}') + default_clouds_to_run, k_value = _parse_args(args) + + print(f'default_clouds_to_run: {default_clouds_to_run}, k_value: {k_value}') + function_name_marks_map = collections.defaultdict(set) + function_name_param_map = collections.defaultdict(list) + for function_name, marks in matches: - function_name = re.sub(r'\[.*?\]', '', function_name) + clean_function_name = re.sub(r'\[.*?\]', '', function_name) + clean_function_name = re.sub(r'@.*?$', '', clean_function_name) + # The skip mark is generated by pytest naturally, and print in + # conftest.py + if 'skip' in marks: + continue + if k_value is not None and k_value not in function_name: + # TODO(zpoint): support and/or in k_value + continue + marks = marks.replace('\'', '').split(',') marks = [i.strip() for i in marks] - if function_name not in function_name_marks_map: - function_name_marks_map[function_name] = set(marks) - else: - function_name_marks_map[function_name].update(marks) + + function_name_marks_map[clean_function_name].update(marks) + + # extract parameter from function name + # example: test_skyserve_new_autoscaler_update[rolling] + # param: rolling + # function_name: test_skyserve_new_autoscaler_update + param = None + if '[' in function_name and 'serve' in marks: + # Only serve tests are slow and flaky, so we separate them + # to different steps for parallel execution + param = re.search('\[(.+?)\]', function_name).group(1) + if param: + function_name_param_map[clean_function_name].append(param) + function_cloud_map = {} - filter_marks = set(filter_marks) for function_name, marks in function_name_marks_map.items(): - if filter_marks and not filter_marks & marks: - continue clouds_to_include = [] - clouds_to_exclude = [] is_serve_test = 'serve' in marks run_on_gke = 'requires_gke' in marks for mark in marks: - if mark.startswith('no_'): - clouds_to_exclude.append(mark[3:]) - else: - if mark not in PYTEST_TO_CLOUD_KEYWORD: - # This mark does not specify a cloud, so we skip it. - continue - clouds_to_include.append(PYTEST_TO_CLOUD_KEYWORD[mark]) + if mark not in PYTEST_TO_CLOUD_KEYWORD: + # This mark does not specify a cloud, so we skip it. + continue + clouds_to_include.append(PYTEST_TO_CLOUD_KEYWORD[mark]) clouds_to_include = (clouds_to_include - if clouds_to_include else DEFAULT_CLOUDS_TO_RUN) - clouds_to_include = [ - cloud for cloud in clouds_to_include - if cloud not in clouds_to_exclude - ] + if clouds_to_include else default_clouds_to_run) cloud_queue_map = SERVE_CLOUD_QUEUE_MAP if is_serve_test else CLOUD_QUEUE_MAP final_clouds_to_include = [ cloud for cloud in clouds_to_include if cloud in cloud_queue_map @@ -132,37 +199,57 @@ def _extract_marked_tests(file_path: str, f'Warning: {function_name} is marked to run on {clouds_to_include}, ' f'but we only have credentials for {final_clouds_to_include}. ' f'clouds {excluded_clouds} are skipped.') + + # pytest will only run the first cloud if there are multiple clouds + # make it consistent with pytest behavior + # print(f"final_clouds_to_include: {final_clouds_to_include}") + final_clouds_to_include = [final_clouds_to_include[0]] + param_list = function_name_param_map.get(function_name, [None]) + if len(param_list) < len(final_clouds_to_include): + # align, so we can zip them together + param_list += [None + ] * (len(final_clouds_to_include) - len(param_list)) function_cloud_map[function_name] = (final_clouds_to_include, [ QUEUE_GKE if run_on_gke else cloud_queue_map[cloud] for cloud in final_clouds_to_include - ]) + ], param_list) + return function_cloud_map def _generate_pipeline(test_file: str, - filter_marks: List[str], + args: str, auto_retry: bool = False) -> Dict[str, Any]: """Generate a Buildkite pipeline from test files.""" steps = [] - function_cloud_map = _extract_marked_tests(test_file, filter_marks) - for test_function, clouds_and_queues in function_cloud_map.items(): - for cloud, queue in zip(*clouds_and_queues): + generated_function_set = set() + function_cloud_map = _extract_marked_tests(test_file, args) + for test_function, clouds_queues_param in function_cloud_map.items(): + for cloud, queue, param in zip(*clouds_queues_param): + if test_function in generated_function_set: + # Skip duplicate nested function tests under the same class + continue + label = f'{test_function} on {cloud}' + command = f'pytest {test_file}::{test_function} --{cloud}' + if param: + label += f' with param {param}' + command += f' -k {param}' step = { - 'label': f'{test_function} on {cloud}', - 'command': f'pytest {test_file}::{test_function} --{cloud}', + 'label': label, + 'command': command, 'agents': { # Separate agent pool for each cloud. # Since they require different amount of resources and # concurrency control. 'queue': queue - }, - 'if': f'build.env("{cloud}") == "1"' + } } if auto_retry: step['retry'] = { # Automatically retry 2 times on any failure by default. 'automatic': True } + generated_function_set.add(test_function) steps.append(step) return {'steps': steps} @@ -170,7 +257,11 @@ def _generate_pipeline(test_file: str, def _dump_pipeline_to_file(yaml_file_path: str, pipelines: List[Dict[str, Any]], extra_env: Optional[Dict[str, str]] = None): - default_env = {'LOG_TO_STDOUT': '1', 'PYTHONPATH': '${PYTHONPATH}:$(pwd)'} + default_env = { + 'LOG_TO_STDOUT': '1', + 'PYTHONPATH': '${PYTHONPATH}:$(pwd)', + 'SKYPILOT_DISABLE_USAGE_COLLECTION': '1' + } if extra_env: default_env.update(extra_env) with open(yaml_file_path, 'w', encoding='utf-8') as file: @@ -178,28 +269,23 @@ def _dump_pipeline_to_file(yaml_file_path: str, all_steps = [] for pipeline in pipelines: all_steps.extend(pipeline['steps']) - # Shuffle the steps to avoid flakyness, consecutive runs of the same - # kind of test may fail for requiring locks on the same resources. - random.shuffle(all_steps) final_pipeline = {'steps': all_steps, 'env': default_env} yaml.dump(final_pipeline, file, default_flow_style=False) -def _convert_release(test_files: List[str], filter_marks: List[str]): +def _convert_release(test_files: List[str], args: str): yaml_file_path = '.buildkite/pipeline_smoke_tests_release.yaml' output_file_pipelines = [] for test_file in test_files: print(f'Converting {test_file} to {yaml_file_path}') - pipeline = _generate_pipeline(test_file, filter_marks, auto_retry=True) + pipeline = _generate_pipeline(test_file, args, auto_retry=True) output_file_pipelines.append(pipeline) print(f'Converted {test_file} to {yaml_file_path}\n\n') # Enable all clouds by default for release pipeline. - _dump_pipeline_to_file(yaml_file_path, - output_file_pipelines, - extra_env={cloud: '1' for cloud in CLOUD_QUEUE_MAP}) + _dump_pipeline_to_file(yaml_file_path, output_file_pipelines) -def _convert_quick_tests_core(test_files: List[str], filter_marks: List[str]): +def _convert_quick_tests_core(test_files: List[str], args: List[str]): yaml_file_path = '.buildkite/pipeline_smoke_tests_quick_tests_core.yaml' output_file_pipelines = [] for test_file in test_files: @@ -207,7 +293,7 @@ def _convert_quick_tests_core(test_files: List[str], filter_marks: List[str]): # We want enable all clouds by default for each test function # for pre-merge. And let the author controls which clouds # to run by parameter. - pipeline = _generate_pipeline(test_file, filter_marks) + pipeline = _generate_pipeline(test_file, args) pipeline['steps'].append({ 'label': 'Backward compatibility test', 'command': 'bash tests/backward_compatibility_tests.sh', @@ -223,11 +309,10 @@ def _convert_quick_tests_core(test_files: List[str], filter_marks: List[str]): @click.command() -@click.option( - '--filter-marks', - type=str, - help='Filter to include only a subset of pytest marks, e.g., managed_jobs') -def main(filter_marks): +@click.option('--args', + type=str, + help='Args to pass to pytest, e.g., --managed-jobs --aws') +def main(args): test_files = os.listdir('tests/smoke_tests') release_files = [] quick_tests_core_files = [] @@ -240,15 +325,11 @@ def main(filter_marks): else: release_files.append(test_file_path) - filter_marks = filter_marks or os.getenv('FILTER_MARKS') - if filter_marks: - filter_marks = filter_marks.split(',') - print(f'Filter marks: {filter_marks}') - else: - filter_marks = [] + args = args or os.getenv('ARGS', '') + print(f'args: {args}') - _convert_release(release_files, filter_marks) - _convert_quick_tests_core(quick_tests_core_files, filter_marks) + _convert_release(release_files, args) + _convert_quick_tests_core(quick_tests_core_files, args) if __name__ == '__main__': diff --git a/.buildkite/test_buildkite_pipeline_generation.py b/.buildkite/test_buildkite_pipeline_generation.py new file mode 100644 index 00000000000..7c68064d18a --- /dev/null +++ b/.buildkite/test_buildkite_pipeline_generation.py @@ -0,0 +1,190 @@ +"""This script tests the buildkite pipeline generation script. + +It modifies the smoke test files to print the test name and return without +running the actual test code, then runs the pipeline generation script +and compares the output to the generated pipeline. + +Some parameters in smoke tests requires credentials to setup, so we need to +run the tests with the credentials. + +PYTHONPATH=$(pwd)/tests:$PYTHONPATH \ +pytest -n 0 --dist no .buildkite/test_buildkite_pipeline_generation.py + +""" + +import os +import pathlib +import re +import subprocess + +import pytest +import yaml + + +def _insert_test_tracers(content): + """Matches any function definition starting with `def test_...(` possibly + spanning multiple lines, and inserts print statements. + + 1) print(function_name) + 2) If 'generic_cloud' is in the parameters, prints "generic_cloud: {generic_cloud}". + 3) return + + Each of these inserted lines is indented 4 spaces more than the + function definition line. + + Caveats: + • Very naive parameter parsing. + • Splits by commas, then strips out type annotations and defaults. + • If you have advanced signatures, you may need a more robust approach. + """ + + pattern = re.compile(r'^(\s*)(def\s+test_\w+\(.*?\)):\s*\n', + flags=re.MULTILINE | re.DOTALL) + + def replacer(match): + base_indent = match.group(1) # e.g. " " + signature = match.group( + 2) # e.g. "def test_job_queue(generic_cloud: str, x=42)" + + # Indent our inserted lines 4 spaces beyond the function definition: + deeper_indent = base_indent + ' ' + + # Remove leading "def " so we can isolate function name + parameters + # signature_no_def might be "test_job_queue(generic_cloud: str, x=42)" + signature_no_def = signature[4:].strip() + + # Try splitting on the first "(" + try: + func_name, raw_params = signature_no_def.split('(', 1) + func_name = func_name.strip() + # Remove trailing ")" if it exists + if raw_params.endswith(')'): + raw_params = raw_params[:-1] + # Flatten newlines/spaces + raw_params = re.sub(r'\s+', ' ', raw_params).strip() + except ValueError: + # If splitting fails, fallback + func_name = signature_no_def + raw_params = '' + + # -------------------------------------------------- + # Parse out parameter names (naively) + # -------------------------------------------------- + # 1) Split on commas. + # 2) For each piece, remove type annotations (":something") + # and default values ("=something"). + # 3) Strip off leading "*" or "**". + # e.g. "generic_cloud: str" => "generic_cloud" + # "x=42" => "x" + # "**kwargs" => "kwargs" + # -------------------------------------------------- + arg_list = [] + if raw_params: + for piece in raw_params.split(','): + piece = piece.strip() + # Remove type annotations and defaults (split off first colon or equals) + piece = re.split(r'[:=]', piece, 1)[0] + # Remove leading "*" or "**" + piece = piece.lstrip('*').strip() + if piece: + arg_list.append(piece) + + # Build the lines to insert + lines = [] + # Keep original definition line + colon + lines.append(f"{base_indent}{signature}:") + # 1) Print function name + lines.append( + f"{deeper_indent}print('\\n{func_name}\\n', file=sys.stderr, flush=True)" + ) + # 2) Print generic_cloud if present + if 'generic_cloud' in arg_list: + lines.append( + f"{deeper_indent}print(f'generic_cloud: {{generic_cloud}}', file=sys.stderr, flush=True)" + ) + # 3) Return + lines.append(f"{deeper_indent}return\n") + + return "\n".join(lines) + + updated_content = pattern.sub(replacer, content) + return 'import sys\n' + updated_content + + +def _extract_test_names_from_pipeline(pipeline_path): + with open(pipeline_path, 'r') as f: + pipeline = yaml.safe_load(f) + + test_names = set() + for step in pipeline['steps']: + command = step['command'] + # Extract test name from pytest command + # e.g. "pytest tests/smoke_tests/test_basic.py::test_example_app --aws" + assert '::' in command + test_name = command.split('::')[-1].split()[ + 0] # Split on space to remove args + test_names.add(test_name) + + return test_names + + +@pytest.mark.parametrize('args', [ + '', + '--aws', + '--gcp', + '--azure', + '--kubernetes', + '--generic-cloud aws', + '--generic-cloud gcp', + '--managed-jobs', + '--managed-jobs --serve', + '--managed-jobs --aws', +]) +def test_generate_same_as_pytest(args): + # Get all test files from smoke_tests directory + test_files = [ + f'tests/smoke_tests/{f}' for f in os.listdir('tests/smoke_tests') + if f.endswith('.py') and f != 'test_quick_tests_core.py' + ] + + pytest_tests = set() + try: + # Modify each test file to just print and return + for test_file in test_files: + with open(test_file, 'r') as f: + content = f.read() + + modified_content = _insert_test_tracers(content) + + with open(test_file, 'w') as f: + f.write(modified_content) + + # Get all test functions from pytest for all files + pytest_output = subprocess.check_output( + f"pytest ./tests/test_smoke.py {args}", + stderr=subprocess.STDOUT, + text=True, + shell=True) + pytest_tests = set(re.findall(r"test_\w+", pytest_output)) + + # Generate pipeline and extract test functions using YAML parsing + env = dict(os.environ) + env['PYTHONPATH'] = f"{pathlib.Path.cwd()}/tests:" \ + f"{env.get('PYTHONPATH', '')}" + + subprocess.run( + ['python', '.buildkite/generate_pipeline.py', '--args', args], + env=env, + check=True) + + # Extract test names using YAML parsing + pipeline_tests = _extract_test_names_from_pipeline( + '.buildkite/pipeline_smoke_tests_release.yaml') + + # Compare the sets + assert pytest_tests == pipeline_tests, \ + f'Mismatch between pytest tests {pytest_tests} and pipeline tests {pipeline_tests}' + + finally: + # Restore original files using git + subprocess.run(['git', 'reset', '--hard', 'HEAD'], check=True) diff --git a/tests/conftest.py b/tests/conftest.py index af6367fdac6..878580b2b4d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -121,12 +121,6 @@ def _get_cloud_to_run(config) -> List[str]: def pytest_collection_modifyitems(config, items): - if config.option.collectonly: - for item in items: - full_name = item.nodeid - marks = [mark.name for mark in item.iter_markers()] - print(f"Collected {full_name} with marks: {marks}") - skip_marks = {} skip_marks['slow'] = pytest.mark.skip(reason='need --runslow option to run') skip_marks['managed_jobs'] = pytest.mark.skip( @@ -191,6 +185,12 @@ def pytest_collection_modifyitems(config, items): item.add_marker(serial_mark) item._nodeid = f'{item.nodeid}@serial_{generic_cloud_keyword}' # See comment on item.nodeid above + if config.option.collectonly: + for item in items: + full_name = item.nodeid + marks = [mark.name for mark in item.iter_markers()] + print(f"Collected {full_name} with marks: {marks}") + def _is_generic_test(item) -> bool: for cloud in all_clouds_in_smoke_tests: diff --git a/tests/smoke_tests/smoke_tests_utils.py b/tests/smoke_tests/smoke_tests_utils.py index 14f2b94a5d4..5f90666ce3c 100644 --- a/tests/smoke_tests/smoke_tests_utils.py +++ b/tests/smoke_tests/smoke_tests_utils.py @@ -203,11 +203,6 @@ def get_cmd_wait_until_managed_job_status_contains_matching_job_name( timeout=timeout) -# After the timeout, the cluster will stop if autostop is set, and our check -# should be more than the timeout. To address this, we extend the timeout by -# _BUMP_UP_SECONDS before exiting. -BUMP_UP_SECONDS = 35 - DEFAULT_CMD_TIMEOUT = 15 * 60 diff --git a/tests/smoke_tests/test_basic.py b/tests/smoke_tests/test_basic.py index 30576d3272f..d27305c3e49 100644 --- a/tests/smoke_tests/test_basic.py +++ b/tests/smoke_tests/test_basic.py @@ -139,7 +139,7 @@ def test_launch_fast_with_autostop(generic_cloud: str): timeout=autostop_timeout), # Even the cluster is stopped, cloud platform may take a while to # delete the VM. - f'sleep {smoke_tests_utils.BUMP_UP_SECONDS}', + f'sleep 35', # Launch again. Do full output validation - we expect the cluster to re-launch f'unset SKYPILOT_DEBUG; s=$(sky launch -y -c {name} --fast -i 1 tests/test_yamls/minimal.yaml) && {smoke_tests_utils.VALIDATE_LAUNCH_OUTPUT}', f'sky logs {name} 2 --status', diff --git a/tests/smoke_tests/test_cluster_job.py b/tests/smoke_tests/test_cluster_job.py index 1fbb1b3d875..6aa52045c17 100644 --- a/tests/smoke_tests/test_cluster_job.py +++ b/tests/smoke_tests/test_cluster_job.py @@ -494,7 +494,7 @@ def test_inferentia(): 'test_inferentia', [ f'sky launch -y -c {name} -t inf2.xlarge -- echo hi', - f'sky exec {name} --gpus Inferentia:1 echo hi', + f'sky exec {name} --gpus Inferentia2:1 echo hi', f'sky logs {name} 1 --status', # Ensure the job succeeded. f'sky logs {name} 2 --status', # Ensure the job succeeded. ], @@ -569,6 +569,7 @@ def test_tpu_vm_pod(): # ---------- TPU Pod Slice on GKE. ---------- @pytest.mark.requires_gke @pytest.mark.kubernetes +@pytest.mark.skip def test_tpu_pod_slice_gke(): name = smoke_tests_utils.get_cluster_name() test = smoke_tests_utils.Test( @@ -1110,7 +1111,7 @@ def test_autostop(generic_cloud: str): smoke_tests_utils.get_cmd_wait_until_cluster_status_contains( cluster_name=name, cluster_status=[sky.ClusterStatus.STOPPED], - timeout=autostop_timeout + smoke_tests_utils.BUMP_UP_SECONDS), + timeout=autostop_timeout), ], f'sky down -y {name}', timeout=total_timeout_minutes * 60, @@ -1481,7 +1482,7 @@ def test_azure_start_stop_two_nodes(): cluster_status=[ sky.ClusterStatus.INIT, sky.ClusterStatus.STOPPED ], - timeout=200 + smoke_tests_utils.BUMP_UP_SECONDS) + + timeout=235) + f'|| {{ ssh {name} "cat ~/.sky/skylet.log"; exit 1; }}' ], f'sky down -y {name}', diff --git a/tests/smoke_tests/test_managed_job.py b/tests/smoke_tests/test_managed_job.py index 1656e136398..ade189ecffe 100644 --- a/tests/smoke_tests/test_managed_job.py +++ b/tests/smoke_tests/test_managed_job.py @@ -21,7 +21,6 @@ # # Change cloud for generic tests to aws # > pytest tests/smoke_tests/test_managed_job.py --generic-cloud aws - import pathlib import re import tempfile @@ -29,7 +28,7 @@ import pytest from smoke_tests import smoke_tests_utils -from smoke_tests.test_mount_and_storage import TestStorageWithCredentials +from smoke_tests import test_mount_and_storage import sky from sky import jobs @@ -43,7 +42,7 @@ # when the controller being on Azure, which takes a long time for launching # step. @pytest.mark.managed_jobs -def test_managed_jobs(generic_cloud: str): +def test_managed_jobs_basic(generic_cloud: str): """Test the managed jobs yaml.""" name = smoke_tests_utils.get_cluster_name() test = smoke_tests_utils.Test( @@ -103,7 +102,7 @@ def test_job_pipeline(generic_cloud: str): test = smoke_tests_utils.Test( 'spot-pipeline', [ - f'sky jobs launch -n {name} tests/test_yamls/pipeline.yaml -y -d', + f'sky jobs launch -n {name} tests/test_yamls/pipeline.yaml --cloud {generic_cloud} -y -d', 'sleep 5', f'{smoke_tests_utils.GET_JOB_QUEUE} | grep {name} | head -n1 | grep "STARTING\|RUNNING"', # `grep -A 4 {name}` finds the job with {name} and the 4 lines @@ -151,7 +150,7 @@ def test_managed_jobs_failed_setup(generic_cloud: str): get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=name, job_status=[sky.ManagedJobStatus.FAILED_SETUP], - timeout=330 + smoke_tests_utils.BUMP_UP_SECONDS), + timeout=365), ], f'sky jobs cancel -y -n {name}', # Increase timeout since sky jobs queue -r can be blocked by other spot tests. @@ -517,13 +516,13 @@ def test_managed_jobs_cancellation_aws(aws_config_region): job_status=[ sky.ManagedJobStatus.STARTING, sky.ManagedJobStatus.RUNNING ], - timeout=60 + smoke_tests_utils.BUMP_UP_SECONDS), + timeout=95), f'sky jobs cancel -y -n {name}', smoke_tests_utils. get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=name, job_status=[sky.ManagedJobStatus.CANCELLED], - timeout=120 + smoke_tests_utils.BUMP_UP_SECONDS), + timeout=155), (f's=$(aws ec2 describe-instances --region {region} ' f'--filters Name=tag:ray-cluster-name,Values={name_on_cloud}-* ' f'--query Reservations[].Instances[].State[].Name ' @@ -536,13 +535,13 @@ def test_managed_jobs_cancellation_aws(aws_config_region): get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=f'{name}-2', job_status=[sky.ManagedJobStatus.RUNNING], - timeout=300 + smoke_tests_utils.BUMP_UP_SECONDS), + timeout=335), f'sky jobs cancel -y -n {name}-2', smoke_tests_utils. get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=f'{name}-2', job_status=[sky.ManagedJobStatus.CANCELLED], - timeout=120 + smoke_tests_utils.BUMP_UP_SECONDS), + timeout=155), (f's=$(aws ec2 describe-instances --region {region} ' f'--filters Name=tag:ray-cluster-name,Values={name_2_on_cloud}-* ' f'--query Reservations[].Instances[].State[].Name ' @@ -555,7 +554,7 @@ def test_managed_jobs_cancellation_aws(aws_config_region): get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=f'{name}-3', job_status=[sky.ManagedJobStatus.RUNNING], - timeout=300 + smoke_tests_utils.BUMP_UP_SECONDS), + timeout=335), # Terminate the cluster manually. (f'aws ec2 terminate-instances --region {region} --instance-ids $(' f'aws ec2 describe-instances --region {region} ' @@ -569,7 +568,7 @@ def test_managed_jobs_cancellation_aws(aws_config_region): get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=f'{name}-3', job_status=[sky.ManagedJobStatus.CANCELLED], - timeout=120 + smoke_tests_utils.BUMP_UP_SECONDS), + timeout=155), # The cluster should be terminated (shutting-down) after cancellation. We don't use the `=` operator here because # there can be multiple VM with the same name due to the recovery. (f's=$(aws ec2 describe-instances --region {region} ' @@ -608,13 +607,13 @@ def test_managed_jobs_cancellation_gcp(): get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=name, job_status=[sky.ManagedJobStatus.STARTING], - timeout=60 + smoke_tests_utils.BUMP_UP_SECONDS), + timeout=95), f'sky jobs cancel -y -n {name}', smoke_tests_utils. get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=name, job_status=[sky.ManagedJobStatus.CANCELLED], - timeout=120 + smoke_tests_utils.BUMP_UP_SECONDS), + timeout=155), # Test cancelling the spot cluster during spot job being setup. f'sky jobs launch --cloud gcp --zone {zone} -n {name}-2 --use-spot tests/test_yamls/test_long_setup.yaml -y -d', # The job is set up in the cluster, will shown as RUNNING. @@ -622,20 +621,20 @@ def test_managed_jobs_cancellation_gcp(): get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=f'{name}-2', job_status=[sky.ManagedJobStatus.RUNNING], - timeout=300 + smoke_tests_utils.BUMP_UP_SECONDS), + timeout=335), f'sky jobs cancel -y -n {name}-2', smoke_tests_utils. get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=f'{name}-2', job_status=[sky.ManagedJobStatus.CANCELLED], - timeout=120 + smoke_tests_utils.BUMP_UP_SECONDS), + timeout=155), # Test cancellation during spot job is recovering. f'sky jobs launch --cloud gcp --zone {zone} -n {name}-3 --use-spot "sleep 1000" -y -d', smoke_tests_utils. get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=f'{name}-3', job_status=[sky.ManagedJobStatus.RUNNING], - timeout=300 + smoke_tests_utils.BUMP_UP_SECONDS), + timeout=335), # Terminate the cluster manually. terminate_cmd, smoke_tests_utils.JOB_WAIT_NOT_RUNNING.format(job_name=f'{name}-3'), @@ -645,7 +644,7 @@ def test_managed_jobs_cancellation_gcp(): get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=f'{name}-3', job_status=[sky.ManagedJobStatus.CANCELLED], - timeout=120 + smoke_tests_utils.BUMP_UP_SECONDS), + timeout=155), # The cluster should be terminated (STOPPING) after cancellation. We don't use the `=` operator here because # there can be multiple VM with the same name due to the recovery. (f's=$({query_state_cmd}) && echo "$s" && echo; [[ -z "$s" ]] || echo "$s" | grep -v -E "PROVISIONING|STAGING|RUNNING|REPAIRING|TERMINATED|SUSPENDING|SUSPENDED|SUSPENDED"' @@ -656,8 +655,11 @@ def test_managed_jobs_cancellation_gcp(): @pytest.mark.managed_jobs -def test_managed_jobs_retry_logs(): +def test_managed_jobs_retry_logs(generic_cloud: str): """Test managed job retry logs are properly displayed when a task fails.""" + timeout = 7 * 60 # 7 mins + if generic_cloud == 'azure': + timeout *= 2 name = smoke_tests_utils.get_cluster_name() yaml_path = 'tests/test_yamls/test_managed_jobs_retry.yaml' @@ -677,8 +679,7 @@ def test_managed_jobs_retry_logs(): f'! cat {log_file.name} | grep "Job 2"', ], f'sky jobs cancel -y -n {name}', - timeout=7 * 60, # 7 mins - ) + timeout=timeout) smoke_tests_utils.run_one_test(test) @@ -709,31 +710,31 @@ def test_managed_jobs_storage(generic_cloud: str): if generic_cloud == 'aws': region = 'eu-central-1' region_flag = f' --region {region}' - region_cmd = TestStorageWithCredentials.cli_region_cmd( + region_cmd = test_mount_and_storage.TestStorageWithCredentials.cli_region_cmd( storage_lib.StoreType.S3, bucket_name=storage_name) region_validation_cmd = f'{region_cmd} | grep {region}' - s3_check_file_count = TestStorageWithCredentials.cli_count_name_in_bucket( + s3_check_file_count = test_mount_and_storage.TestStorageWithCredentials.cli_count_name_in_bucket( storage_lib.StoreType.S3, output_storage_name, 'output.txt') output_check_cmd = f'{s3_check_file_count} | grep 1' elif generic_cloud == 'gcp': region = 'us-west2' region_flag = f' --region {region}' - region_cmd = TestStorageWithCredentials.cli_region_cmd( + region_cmd = test_mount_and_storage.TestStorageWithCredentials.cli_region_cmd( storage_lib.StoreType.GCS, bucket_name=storage_name) region_validation_cmd = f'{region_cmd} | grep {region}' - gcs_check_file_count = TestStorageWithCredentials.cli_count_name_in_bucket( + gcs_check_file_count = test_mount_and_storage.TestStorageWithCredentials.cli_count_name_in_bucket( storage_lib.StoreType.GCS, output_storage_name, 'output.txt') output_check_cmd = f'{gcs_check_file_count} | grep 1' elif generic_cloud == 'azure': region = 'centralus' region_flag = f' --region {region}' - storage_account_name = ( - storage_lib.AzureBlobStore.get_default_storage_account_name(region)) - region_cmd = TestStorageWithCredentials.cli_region_cmd( + storage_account_name = test_mount_and_storage.TestStorageWithCredentials. \ + get_az_storage_account_name(region) + region_cmd = test_mount_and_storage.TestStorageWithCredentials.cli_region_cmd( storage_lib.StoreType.AZURE, storage_account_name=storage_account_name) region_validation_cmd = f'{region_cmd} | grep {region}' - az_check_file_count = TestStorageWithCredentials.cli_count_name_in_bucket( + az_check_file_count = test_mount_and_storage.TestStorageWithCredentials.cli_count_name_in_bucket( storage_lib.StoreType.AZURE, output_storage_name, 'output.txt', @@ -742,10 +743,10 @@ def test_managed_jobs_storage(generic_cloud: str): elif generic_cloud == 'kubernetes': # With Kubernetes, we don't know which object storage provider is used. # Check both S3 and GCS if bucket exists in either. - s3_check_file_count = TestStorageWithCredentials.cli_count_name_in_bucket( + s3_check_file_count = test_mount_and_storage.TestStorageWithCredentials.cli_count_name_in_bucket( storage_lib.StoreType.S3, output_storage_name, 'output.txt') s3_output_check_cmd = f'{s3_check_file_count} | grep 1' - gcs_check_file_count = TestStorageWithCredentials.cli_count_name_in_bucket( + gcs_check_file_count = test_mount_and_storage.TestStorageWithCredentials.cli_count_name_in_bucket( storage_lib.StoreType.GCS, output_storage_name, 'output.txt') gcs_output_check_cmd = f'{gcs_check_file_count} | grep 1' output_check_cmd = f'{s3_output_check_cmd} || {gcs_output_check_cmd}' @@ -767,7 +768,7 @@ def test_managed_jobs_storage(generic_cloud: str): get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=name, job_status=[sky.ManagedJobStatus.SUCCEEDED], - timeout=60 + smoke_tests_utils.BUMP_UP_SECONDS), + timeout=95), # Wait for the job to be cleaned up. 'sleep 20', f'[ $(aws s3api list-buckets --query "Buckets[?contains(Name, \'{storage_name}\')].Name" --output text | wc -l) -eq 0 ]', @@ -824,7 +825,7 @@ def test_managed_jobs_intermediate_storage(generic_cloud: str): get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=name, job_status=[sky.ManagedJobStatus.SUCCEEDED], - timeout=60 + smoke_tests_utils.BUMP_UP_SECONDS), + timeout=95), # check intermediate bucket exists, it won't be deletd if its user specific f'[ $(aws s3api list-buckets --query "Buckets[?contains(Name, \'{intermediate_storage_name}\')].Name" --output text | wc -l) -eq 1 ]', ], @@ -853,7 +854,7 @@ def test_managed_jobs_tpu(): get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=name, job_status=[sky.ManagedJobStatus.STARTING], - timeout=60 + smoke_tests_utils.BUMP_UP_SECONDS), + timeout=95), # TPU takes a while to launch smoke_tests_utils. get_cmd_wait_until_managed_job_status_contains_matching_job_name( @@ -861,7 +862,7 @@ def test_managed_jobs_tpu(): job_status=[ sky.ManagedJobStatus.RUNNING, sky.ManagedJobStatus.SUCCEEDED ], - timeout=900 + smoke_tests_utils.BUMP_UP_SECONDS), + timeout=935), ], f'sky jobs cancel -y -n {name}', # Increase timeout since sky jobs queue -r can be blocked by other spot tests. @@ -883,7 +884,7 @@ def test_managed_jobs_inline_env(generic_cloud: str): get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=name, job_status=[sky.ManagedJobStatus.SUCCEEDED], - timeout=20 + smoke_tests_utils.BUMP_UP_SECONDS), + timeout=55), f'JOB_ROW=$(sky jobs queue | grep {name} | head -n1) && ' f'echo "$JOB_ROW" && echo "$JOB_ROW" | grep "SUCCEEDED" && ' f'JOB_ID=$(echo "$JOB_ROW" | awk \'{{print $1}}\') && ' @@ -911,7 +912,7 @@ def test_managed_jobs_logs_sync_down(): get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=f'{name}', job_status=[sky.ManagedJobStatus.RUNNING], - timeout=300 + smoke_tests_utils.BUMP_UP_SECONDS), + timeout=335), f'sky jobs logs --controller 1 --sync-down', f'sky jobs logs 1 --sync-down', f'sky jobs logs --controller --name minimal --sync-down', diff --git a/tests/smoke_tests/test_mount_and_storage.py b/tests/smoke_tests/test_mount_and_storage.py index 3f2ddb16c57..a1147569d74 100644 --- a/tests/smoke_tests/test_mount_and_storage.py +++ b/tests/smoke_tests/test_mount_and_storage.py @@ -220,9 +220,8 @@ def test_azure_storage_mounts_with_stop(): name = smoke_tests_utils.get_cluster_name() cloud = 'azure' storage_name = f'sky-test-{int(time.time())}' - default_region = 'eastus' - storage_account_name = (storage_lib.AzureBlobStore. - get_default_storage_account_name(default_region)) + storage_account_name = TestStorageWithCredentials.get_az_storage_account_name( + ) storage_account_key = data_utils.get_az_storage_account_key( storage_account_name) # if the file does not exist, az storage blob list returns '[]' @@ -586,6 +585,18 @@ class TestStorageWithCredentials: }, } + @staticmethod + def get_az_storage_account_name(default_region: str = 'centralus'): + config_storage_account = skypilot_config.get_nested( + ('azure', 'storage_account'), None) + if config_storage_account is not None: + storage_account_name = config_storage_account + else: + storage_account_name = ( + storage_lib.AzureBlobStore.get_default_storage_account_name( + default_region)) + return storage_account_name + @staticmethod def create_dir_structure(base_path, structure): # creates a given file STRUCTURE in BASE_PATH @@ -612,10 +623,8 @@ def cli_delete_cmd(store_type, gsutil_alias, alias_gen = data_utils.get_gsutil_command() return f'{alias_gen}; {gsutil_alias} rm -r {url}' if store_type == storage_lib.StoreType.AZURE: - default_region = 'eastus' - storage_account_name = ( - storage_lib.AzureBlobStore.get_default_storage_account_name( - default_region)) + storage_account_name = TestStorageWithCredentials.get_az_storage_account_name( + ) storage_account_key = data_utils.get_az_storage_account_key( storage_account_name) return ('az storage container delete ' @@ -694,13 +703,8 @@ def cli_ls_cmd(store_type, bucket_name, suffix='', recursive=False): return f'gsutil ls {url}' if store_type == storage_lib.StoreType.AZURE: # azure isrecursive by default - default_region = 'eastus' - config_storage_account = skypilot_config.get_nested( - ('azure', 'storage_account'), None) - storage_account_name = config_storage_account if ( - config_storage_account is not None) else ( - storage_lib.AzureBlobStore.get_default_storage_account_name( - default_region)) + storage_account_name = TestStorageWithCredentials.get_az_storage_account_name( + ) storage_account_key = data_utils.get_az_storage_account_key( storage_account_name) list_cmd = ('az storage blob list ' @@ -762,10 +766,8 @@ def cli_count_name_in_bucket(store_type, return f'gsutil ls -r gs://{bucket_name} | grep "{file_name}" | wc -l' elif store_type == storage_lib.StoreType.AZURE: if storage_account_name is None: - default_region = 'eastus' - storage_account_name = ( - storage_lib.AzureBlobStore.get_default_storage_account_name( - default_region)) + storage_account_name = TestStorageWithCredentials.get_az_storage_account_name( + ) storage_account_key = data_utils.get_az_storage_account_key( storage_account_name) return ('az storage blob list ' @@ -789,10 +791,8 @@ def cli_count_file_in_bucket(store_type, bucket_name): elif store_type == storage_lib.StoreType.GCS: return f'gsutil ls -r gs://{bucket_name}/** | wc -l' elif store_type == storage_lib.StoreType.AZURE: - default_region = 'eastus' - storage_account_name = ( - storage_lib.AzureBlobStore.get_default_storage_account_name( - default_region)) + storage_account_name = TestStorageWithCredentials.get_az_storage_account_name( + ) storage_account_key = data_utils.get_az_storage_account_key( storage_account_name) return ('az storage blob list ' @@ -1009,10 +1009,8 @@ def tmp_gsutil_bucket(self, tmp_bucket_name): @pytest.fixture def tmp_az_bucket(self, tmp_bucket_name): # Creates a temporary bucket using gsutil - default_region = 'eastus' - storage_account_name = ( - storage_lib.AzureBlobStore.get_default_storage_account_name( - default_region)) + storage_account_name = TestStorageWithCredentials.get_az_storage_account_name( + ) storage_account_key = data_utils.get_az_storage_account_key( storage_account_name) bucket_uri = data_utils.AZURE_CONTAINER_URL.format( @@ -1299,10 +1297,8 @@ def test_nonexistent_bucket(self, nonexist_bucket_url): command = f'gsutil ls {nonexist_bucket_url.format(random_name=nonexist_bucket_name)}' expected_output = 'BucketNotFoundException' elif nonexist_bucket_url.startswith('https'): - default_region = 'eastus' - storage_account_name = ( - storage_lib.AzureBlobStore.get_default_storage_account_name( - default_region)) + storage_account_name = TestStorageWithCredentials.get_az_storage_account_name( + ) storage_account_key = data_utils.get_az_storage_account_key( storage_account_name) command = f'az storage container exists --account-name {storage_account_name} --account-key {storage_account_key} --name {nonexist_bucket_name}' @@ -1499,9 +1495,7 @@ def test_excluded_file_cloud_storage_upload_copy(self, gitignore_structure, tmp_gitignore_storage_obj): # tests if files included in .gitignore and .git/info/exclude are # excluded from being transferred to Storage - tmp_gitignore_storage_obj.add_store(store_type) - upload_file_name = 'included' # Count the number of files with the given file name up_cmd = self.cli_count_name_in_bucket(store_type, \ @@ -1510,7 +1504,6 @@ def test_excluded_file_cloud_storage_upload_copy(self, gitignore_structure, tmp_gitignore_storage_obj.name, file_name='.git') cnt_num_file_cmd = self.cli_count_file_in_bucket( store_type, tmp_gitignore_storage_obj.name) - up_output = subprocess.check_output(up_cmd, shell=True) git_exclude_output = subprocess.check_output(git_exclude_cmd, shell=True) diff --git a/tests/smoke_tests/test_sky_serve.py b/tests/smoke_tests/test_sky_serve.py index 01e137f40ef..fbc0701fe8c 100644 --- a/tests/smoke_tests/test_sky_serve.py +++ b/tests/smoke_tests/test_sky_serve.py @@ -801,9 +801,10 @@ def test_skyserve_failures(generic_cloud: str): f'until ! echo "$s" | grep "PENDING" && ! echo "$s" | grep "Please wait for the controller to be ready."; do ' 'echo "Waiting for replica to be out of pending..."; sleep 5; ' f's=$(sky serve status {name}); echo "$s"; done; ' + - _check_replica_in_status( - name, [(1, False, 'FAILED_PROBING'), - (1, False, _SERVICE_LAUNCHING_STATUS_REGEX)]), + _check_replica_in_status(name, [ + (1, False, 'FAILED_PROBING'), + (1, False, _SERVICE_LAUNCHING_STATUS_REGEX + '\|READY') + ]), # TODO(zhwu): add test for FAILED_PROVISION ], _TEARDOWN_SERVICE.format(name=name),