Make url optional #2815
4 fail, 1 skipped, 246 pass in 4h 40m 53s
Annotations
github-actions / Test Results for assets-test
test_benchmark_result_aggregator_component[benchmark_result_aggregator_pipeline.yaml-True-True] (component/benchmark_result_aggregator.tests.benchmark_result_aggregator.test_benchmark_result_aggregator.TestBenchmarkResultAggregatorComponent) failed
pytest-reports/component/benchmark_result_aggregator.xml [took 10s]
Raw output
azure.ai.ml.exceptions.JobException: Exception :
{
"error": {
"code": "UserError",
"message": "Failed to submit job due to key: azureml://registries/azureml/components/model_prediction/labels/latest, Message: Invalid asset id azureml://registries/azureml/components/model_prediction/labels/latestkey: azureml://registries/azureml/components/compute_metrics/labels/latest, Message: Invalid asset id azureml://registries/azureml/components/compute_metrics/labels/latest",
"message_format": "Failed to submit job due to key: azureml://registries/azureml/components/model_prediction/labels/latest, Message: Invalid asset id azureml://registries/azureml/components/model_prediction/labels/latestkey: azureml://registries/azureml/components/compute_metrics/labels/latest, Message: Invalid asset id azureml://registries/azureml/components/compute_metrics/labels/latest",
"message_parameters": {},
"reference_code": "JobSubmissionFailure",
"details": []
},
"time": "0001-01-01T00:00:00.000Z",
"component_name": "JobSubmission"
}
self = <tests.benchmark_result_aggregator.test_benchmark_result_aggregator.TestBenchmarkResultAggregatorComponent object at 0x7fa8f7fd68d0>
temp_dir = '/tmp/pytest-of-runner/pytest-0/test_benchmark_result_aggregat0'
pipeline_file_name = 'benchmark_result_aggregator_pipeline.yaml'
has_perf_step = True, has_quality_step = True
@pytest.mark.parametrize(
"pipeline_file_name, has_perf_step, has_quality_step",
[
("benchmark_result_aggregator_pipeline.yaml", True, True),
("benchmark_result_aggregator_pipeline_no_quality.yaml", True, False),
("benchmark_result_aggregator_pipeline_no_perf.yaml", False, True),
],
)
def test_benchmark_result_aggregator_component(
self,
temp_dir: str,
pipeline_file_name: str,
has_perf_step: bool,
has_quality_step: bool,
) -> None:
"""Benchmark result aggregator component test."""
ml_client = get_mlclient()
pipeline_job = self._get_pipeline_job(
pipeline_file_name,
self.test_benchmark_result_aggregator_component.__name__,
)
# submit the pipeline job
pipeline_job = ml_client.create_or_update(
pipeline_job, experiment_name=self.EXP_NAME
)
> ml_client.jobs.stream(pipeline_job.name)
../../tests/benchmark_result_aggregator/test_benchmark_result_aggregator.py:51:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/usr/share/miniconda/envs/isolated_1740722483737/lib/python3.11/site-packages/azure/core/tracing/decorator.py:116: in wrapper_use_tracer
return func(*args, **kwargs)
/usr/share/miniconda/envs/isolated_1740722483737/lib/python3.11/site-packages/azure/ai/ml/_telemetry/activity.py:288: in wrapper
return f(*args, **kwargs)
/usr/share/miniconda/envs/isolated_1740722483737/lib/python3.11/site-packages/azure/ai/ml/operations/_job_operations.py:838: in stream
self._stream_logs_until_completion(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
run_operations = <azure.ai.ml.operations._run_operations.RunOperations object at 0x7fa8da158c50>
job_resource = <azure.ai.ml._restclient.v2024_01_01_preview.models._models_py3.JobBase object at 0x7fa8dab3e5d0>
datastore_operations = <azure.ai.ml.operations._datastore_operations.DatastoreOperations object at 0x7fa8db20a410>
raise_exception_on_failed_job = True
def stream_logs_until_completion(
run_operations: RunOperations,
job_resource: JobBase,
datastore_operations: Optional[DatastoreOperations] = None,
raise_exception_on_failed_job: bool = True,
*,
requests_pipeline: HttpPipeline
) -> None:
"""Stream the experiment run output to the specified file handle. By default the the file handle points to stdout.
:param run_operations: The run history operations class.
:type run_operations: RunOperations
:param job_resource: The job to stream
:type job_resource: JobBase
:param datastore_operations: Optional, the datastore operations class, used to get logs from datastore
:type datastore_operations: Optional[DatastoreOperations]
:param raise_exception_on_failed_job: Should this method fail if job fails
:type raise_exception_on_failed_job: Boolean
:keyword requests_pipeline: The HTTP pipeline to use for requests.
:type requests_pipeline: ~azure.ai.ml._utils._http_utils.HttpPipeline
:return:
:rtype: None
"""
job_type = job_resource.properties.job_type
job_name = job_resource.name
studio_endpoint = job_resource.properties.services.get("Studio", None)
studio_endpoint = studio_endpoint.endpoint if studio_endpoint else None
# Feature store jobs should be linked to the Feature Store Workspace UI.
# Todo: Consolidate this logic to service side
if "azureml.FeatureStoreJobType" in job_resource.properties.properties:
url_format = (
"https://ml.azure.com/featureStore/{fs_name}/featureSets/{fset_name}/{fset_version}/matJobs/"
"jobs/{run_id}?wsid=/subscriptions/{fs_sub_id}/resourceGroups/{fs_rg_name}/providers/"
"Microsoft.MachineLearningServices/workspaces/{fs_name}"
)
studio_endpoint = url_format.format(
fs_name=job_resource.properties.properties["azureml.FeatureStoreName"],
fs_sub_id=run_operations._subscription_id,
fs_rg_name=run_operations._resource_group_name,
fset_name=job_resource.properties.properties["azureml.FeatureSetName"],
fset_version=job_resource.properties.properties["azureml.FeatureSetVersion"],
run_id=job_name,
)
elif "FeatureStoreJobType" in job_resource.properties.properties:
url_format = (
"https://ml.azure.com/featureStore/{fs_name}/featureSets/{fset_name}/{fset_version}/matJobs/"
"jobs/{run_id}?wsid=/subscriptions/{fs_sub_id}/resourceGroups/{fs_rg_name}/providers/"
"Microsoft.MachineLearningServices/workspaces/{fs_name}"
)
studio_endpoint = url_format.format(
fs_name=job_resource.properties.properties["FeatureStoreName"],
fs_sub_id=run_operations._subscription_id,
fs_rg_name=run_operations._resource_group_name,
fset_name=job_resource.properties.properties["FeatureSetName"],
fset_version=job_resource.properties.properties["FeatureSetVersion"],
run_id=job_name,
)
file_handle = sys.stdout
ds_properties = None
prefix = None
if (
hasattr(job_resource.properties, "outputs")
and job_resource.properties.job_type != RestJobType.AUTO_ML
and datastore_operations
):
# Get default output location
default_output = (
job_resource.properties.outputs.get("default", None) if job_resource.properties.outputs else None
)
is_uri_folder = default_output and default_output.job_output_type == DataType.URI_FOLDER
if is_uri_folder:
output_uri = default_output.uri # type: ignore
# Parse the uri format
output_uri = output_uri.split("datastores/")[1]
datastore_name, prefix = output_uri.split("/", 1)
ds_properties = get_datastore_info(datastore_operations, datastore_name)
try:
file_handle.write("RunId: {}\n".format(job_name))
file_handle.write("Web View: {}\n".format(studio_endpoint))
_current_details: RunDetails = run_operations.get_run_details(job_name)
processed_logs: Dict = {}
poll_start_time = time.time()
pipeline_with_retries = create_requests_pipeline_with_retry(requests_pipeline=requests_pipeline)
while (
_current_details.status in RunHistoryConstants.IN_PROGRESS_STATUSES
or _current_details.status == JobStatus.FINALIZING
):
file_handle.flush()
time.sleep(_wait_before_polling(time.time() - poll_start_time))
_current_details = run_operations.get_run_details(job_name) # TODO use FileWatcher
if job_type.lower() in JobType.PIPELINE:
legacy_folder_name = "/logs/azureml/"
else:
legacy_folder_name = "/azureml-logs/"
_current_logs_dict = (
list_logs_in_datastore(
ds_properties,
prefix=str(prefix),
legacy_log_folder_name=legacy_folder_name,
)
if ds_properties is not None
else _current_details.log_files
)
# Get the list of new logs available after filtering out the processed ones
available_logs = _get_sorted_filtered_logs(_current_logs_dict, job_type, processed_logs)
content = ""
for current_log in available_logs:
content = download_text_from_url(
_current_logs_dict[current_log],
pipeline_with_retries,
timeout=RunHistoryConstants._DEFAULT_GET_CONTENT_TIMEOUT,
)
_incremental_print(content, processed_logs, current_log, file_handle)
# TODO: Temporary solution to wait for all the logs to be printed in the finalizing state.
if (
_current_details.status not in RunHistoryConstants.IN_PROGRESS_STATUSES
and _current_details.status == JobStatus.FINALIZING
and "The activity completed successfully. Finalizing run..." in content
):
break
file_handle.write("\n")
file_handle.write("Execution Summary\n")
file_handle.write("=================\n")
file_handle.write("RunId: {}\n".format(job_name))
file_handle.write("Web View: {}\n".format(studio_endpoint))
warnings = _current_details.warnings
if warnings:
messages = [x.message for x in warnings if x.message]
if len(messages) > 0:
file_handle.write("\nWarnings:\n")
for message in messages:
file_handle.write(message + "\n")
file_handle.write("\n")
if _current_details.status == JobStatus.FAILED:
error = (
_current_details.error.as_dict()
if _current_details.error
else "Detailed error not set on the Run. Please check the logs for details."
)
# If we are raising the error later on, so we don't double print.
if not raise_exception_on_failed_job:
file_handle.write("\nError:\n")
file_handle.write(json.dumps(error, indent=4))
file_handle.write("\n")
else:
> raise JobException(
message="Exception : \n {} ".format(json.dumps(error, indent=4)),
target=ErrorTarget.JOB,
no_personal_data_message="Exception raised on failed job.",
error_category=ErrorCategory.SYSTEM_ERROR,
)
E azure.ai.ml.exceptions.JobException: Exception :
E {
E "error": {
E "code": "UserError",
E "message": "Failed to submit job due to key: azureml://registries/azureml/components/model_prediction/labels/latest, Message: Invalid asset id azureml://registries/azureml/components/model_prediction/labels/latestkey: azureml://registries/azureml/components/compute_metrics/labels/latest, Message: Invalid asset id azureml://registries/azureml/components/compute_metrics/labels/latest",
E "message_format": "Failed to submit job due to key: azureml://registries/azureml/components/model_prediction/labels/latest, Message: Invalid asset id azureml://registries/azureml/components/model_prediction/labels/latestkey: azureml://registries/azureml/components/compute_metrics/labels/latest, Message: Invalid asset id azureml://registries/azureml/components/compute_metrics/labels/latest",
E "message_parameters": {},
E "reference_code": "JobSubmissionFailure",
E "details": []
E },
E "time": "0001-01-01T00:00:00.000Z",
E "component_name": "JobSubmission"
E }
/usr/share/miniconda/envs/isolated_1740722483737/lib/python3.11/site-packages/azure/ai/ml/operations/_job_ops_helper.py:334: JobException
github-actions / Test Results for assets-test
test_benchmark_result_aggregator_component[benchmark_result_aggregator_pipeline_no_quality.yaml-True-False] (component/benchmar…lt_aggregator.tests.benchmark_result_aggregator.test_benchmark_result_aggregator.TestBenchmarkResultAggregatorComponent) failed
pytest-reports/component/benchmark_result_aggregator.xml [took 7s]
Raw output
azure.ai.ml.exceptions.JobException: Exception :
{
"error": {
"code": "UserError",
"message": "Failed to submit job due to key: azureml://registries/azureml/components/model_prediction/labels/latest, Message: Invalid asset id azureml://registries/azureml/components/model_prediction/labels/latest",
"message_format": "Failed to submit job due to key: azureml://registries/azureml/components/model_prediction/labels/latest, Message: Invalid asset id azureml://registries/azureml/components/model_prediction/labels/latest",
"message_parameters": {},
"reference_code": "JobSubmissionFailure",
"details": []
},
"time": "0001-01-01T00:00:00.000Z",
"component_name": "JobSubmission"
}
self = <tests.benchmark_result_aggregator.test_benchmark_result_aggregator.TestBenchmarkResultAggregatorComponent object at 0x7fa8daa72bd0>
temp_dir = '/tmp/pytest-of-runner/pytest-0/test_benchmark_result_aggregat1'
pipeline_file_name = 'benchmark_result_aggregator_pipeline_no_quality.yaml'
has_perf_step = True, has_quality_step = False
@pytest.mark.parametrize(
"pipeline_file_name, has_perf_step, has_quality_step",
[
("benchmark_result_aggregator_pipeline.yaml", True, True),
("benchmark_result_aggregator_pipeline_no_quality.yaml", True, False),
("benchmark_result_aggregator_pipeline_no_perf.yaml", False, True),
],
)
def test_benchmark_result_aggregator_component(
self,
temp_dir: str,
pipeline_file_name: str,
has_perf_step: bool,
has_quality_step: bool,
) -> None:
"""Benchmark result aggregator component test."""
ml_client = get_mlclient()
pipeline_job = self._get_pipeline_job(
pipeline_file_name,
self.test_benchmark_result_aggregator_component.__name__,
)
# submit the pipeline job
pipeline_job = ml_client.create_or_update(
pipeline_job, experiment_name=self.EXP_NAME
)
> ml_client.jobs.stream(pipeline_job.name)
../../tests/benchmark_result_aggregator/test_benchmark_result_aggregator.py:51:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/usr/share/miniconda/envs/isolated_1740722483737/lib/python3.11/site-packages/azure/core/tracing/decorator.py:116: in wrapper_use_tracer
return func(*args, **kwargs)
/usr/share/miniconda/envs/isolated_1740722483737/lib/python3.11/site-packages/azure/ai/ml/_telemetry/activity.py:288: in wrapper
return f(*args, **kwargs)
/usr/share/miniconda/envs/isolated_1740722483737/lib/python3.11/site-packages/azure/ai/ml/operations/_job_operations.py:838: in stream
self._stream_logs_until_completion(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
run_operations = <azure.ai.ml.operations._run_operations.RunOperations object at 0x7fa8da12f450>
job_resource = <azure.ai.ml._restclient.v2024_01_01_preview.models._models_py3.JobBase object at 0x7fa8da351a50>
datastore_operations = <azure.ai.ml.operations._datastore_operations.DatastoreOperations object at 0x7fa8da3f8690>
raise_exception_on_failed_job = True
def stream_logs_until_completion(
run_operations: RunOperations,
job_resource: JobBase,
datastore_operations: Optional[DatastoreOperations] = None,
raise_exception_on_failed_job: bool = True,
*,
requests_pipeline: HttpPipeline
) -> None:
"""Stream the experiment run output to the specified file handle. By default the the file handle points to stdout.
:param run_operations: The run history operations class.
:type run_operations: RunOperations
:param job_resource: The job to stream
:type job_resource: JobBase
:param datastore_operations: Optional, the datastore operations class, used to get logs from datastore
:type datastore_operations: Optional[DatastoreOperations]
:param raise_exception_on_failed_job: Should this method fail if job fails
:type raise_exception_on_failed_job: Boolean
:keyword requests_pipeline: The HTTP pipeline to use for requests.
:type requests_pipeline: ~azure.ai.ml._utils._http_utils.HttpPipeline
:return:
:rtype: None
"""
job_type = job_resource.properties.job_type
job_name = job_resource.name
studio_endpoint = job_resource.properties.services.get("Studio", None)
studio_endpoint = studio_endpoint.endpoint if studio_endpoint else None
# Feature store jobs should be linked to the Feature Store Workspace UI.
# Todo: Consolidate this logic to service side
if "azureml.FeatureStoreJobType" in job_resource.properties.properties:
url_format = (
"https://ml.azure.com/featureStore/{fs_name}/featureSets/{fset_name}/{fset_version}/matJobs/"
"jobs/{run_id}?wsid=/subscriptions/{fs_sub_id}/resourceGroups/{fs_rg_name}/providers/"
"Microsoft.MachineLearningServices/workspaces/{fs_name}"
)
studio_endpoint = url_format.format(
fs_name=job_resource.properties.properties["azureml.FeatureStoreName"],
fs_sub_id=run_operations._subscription_id,
fs_rg_name=run_operations._resource_group_name,
fset_name=job_resource.properties.properties["azureml.FeatureSetName"],
fset_version=job_resource.properties.properties["azureml.FeatureSetVersion"],
run_id=job_name,
)
elif "FeatureStoreJobType" in job_resource.properties.properties:
url_format = (
"https://ml.azure.com/featureStore/{fs_name}/featureSets/{fset_name}/{fset_version}/matJobs/"
"jobs/{run_id}?wsid=/subscriptions/{fs_sub_id}/resourceGroups/{fs_rg_name}/providers/"
"Microsoft.MachineLearningServices/workspaces/{fs_name}"
)
studio_endpoint = url_format.format(
fs_name=job_resource.properties.properties["FeatureStoreName"],
fs_sub_id=run_operations._subscription_id,
fs_rg_name=run_operations._resource_group_name,
fset_name=job_resource.properties.properties["FeatureSetName"],
fset_version=job_resource.properties.properties["FeatureSetVersion"],
run_id=job_name,
)
file_handle = sys.stdout
ds_properties = None
prefix = None
if (
hasattr(job_resource.properties, "outputs")
and job_resource.properties.job_type != RestJobType.AUTO_ML
and datastore_operations
):
# Get default output location
default_output = (
job_resource.properties.outputs.get("default", None) if job_resource.properties.outputs else None
)
is_uri_folder = default_output and default_output.job_output_type == DataType.URI_FOLDER
if is_uri_folder:
output_uri = default_output.uri # type: ignore
# Parse the uri format
output_uri = output_uri.split("datastores/")[1]
datastore_name, prefix = output_uri.split("/", 1)
ds_properties = get_datastore_info(datastore_operations, datastore_name)
try:
file_handle.write("RunId: {}\n".format(job_name))
file_handle.write("Web View: {}\n".format(studio_endpoint))
_current_details: RunDetails = run_operations.get_run_details(job_name)
processed_logs: Dict = {}
poll_start_time = time.time()
pipeline_with_retries = create_requests_pipeline_with_retry(requests_pipeline=requests_pipeline)
while (
_current_details.status in RunHistoryConstants.IN_PROGRESS_STATUSES
or _current_details.status == JobStatus.FINALIZING
):
file_handle.flush()
time.sleep(_wait_before_polling(time.time() - poll_start_time))
_current_details = run_operations.get_run_details(job_name) # TODO use FileWatcher
if job_type.lower() in JobType.PIPELINE:
legacy_folder_name = "/logs/azureml/"
else:
legacy_folder_name = "/azureml-logs/"
_current_logs_dict = (
list_logs_in_datastore(
ds_properties,
prefix=str(prefix),
legacy_log_folder_name=legacy_folder_name,
)
if ds_properties is not None
else _current_details.log_files
)
# Get the list of new logs available after filtering out the processed ones
available_logs = _get_sorted_filtered_logs(_current_logs_dict, job_type, processed_logs)
content = ""
for current_log in available_logs:
content = download_text_from_url(
_current_logs_dict[current_log],
pipeline_with_retries,
timeout=RunHistoryConstants._DEFAULT_GET_CONTENT_TIMEOUT,
)
_incremental_print(content, processed_logs, current_log, file_handle)
# TODO: Temporary solution to wait for all the logs to be printed in the finalizing state.
if (
_current_details.status not in RunHistoryConstants.IN_PROGRESS_STATUSES
and _current_details.status == JobStatus.FINALIZING
and "The activity completed successfully. Finalizing run..." in content
):
break
file_handle.write("\n")
file_handle.write("Execution Summary\n")
file_handle.write("=================\n")
file_handle.write("RunId: {}\n".format(job_name))
file_handle.write("Web View: {}\n".format(studio_endpoint))
warnings = _current_details.warnings
if warnings:
messages = [x.message for x in warnings if x.message]
if len(messages) > 0:
file_handle.write("\nWarnings:\n")
for message in messages:
file_handle.write(message + "\n")
file_handle.write("\n")
if _current_details.status == JobStatus.FAILED:
error = (
_current_details.error.as_dict()
if _current_details.error
else "Detailed error not set on the Run. Please check the logs for details."
)
# If we are raising the error later on, so we don't double print.
if not raise_exception_on_failed_job:
file_handle.write("\nError:\n")
file_handle.write(json.dumps(error, indent=4))
file_handle.write("\n")
else:
> raise JobException(
message="Exception : \n {} ".format(json.dumps(error, indent=4)),
target=ErrorTarget.JOB,
no_personal_data_message="Exception raised on failed job.",
error_category=ErrorCategory.SYSTEM_ERROR,
)
E azure.ai.ml.exceptions.JobException: Exception :
E {
E "error": {
E "code": "UserError",
E "message": "Failed to submit job due to key: azureml://registries/azureml/components/model_prediction/labels/latest, Message: Invalid asset id azureml://registries/azureml/components/model_prediction/labels/latest",
E "message_format": "Failed to submit job due to key: azureml://registries/azureml/components/model_prediction/labels/latest, Message: Invalid asset id azureml://registries/azureml/components/model_prediction/labels/latest",
E "message_parameters": {},
E "reference_code": "JobSubmissionFailure",
E "details": []
E },
E "time": "0001-01-01T00:00:00.000Z",
E "component_name": "JobSubmission"
E }
/usr/share/miniconda/envs/isolated_1740722483737/lib/python3.11/site-packages/azure/ai/ml/operations/_job_ops_helper.py:334: JobException
github-actions / Test Results for assets-test
test_benchmark_result_aggregator_component[benchmark_result_aggregator_pipeline_no_perf.yaml-False-True] (component/benchmark_r…lt_aggregator.tests.benchmark_result_aggregator.test_benchmark_result_aggregator.TestBenchmarkResultAggregatorComponent) failed
pytest-reports/component/benchmark_result_aggregator.xml [took 7s]
Raw output
azure.ai.ml.exceptions.JobException: Exception :
{
"error": {
"code": "UserError",
"message": "Failed to submit job due to key: azureml://registries/azureml/components/model_prediction/labels/latest, Message: Invalid asset id azureml://registries/azureml/components/model_prediction/labels/latestkey: azureml://registries/azureml/components/compute_metrics/labels/latest, Message: Invalid asset id azureml://registries/azureml/components/compute_metrics/labels/latest",
"message_format": "Failed to submit job due to key: azureml://registries/azureml/components/model_prediction/labels/latest, Message: Invalid asset id azureml://registries/azureml/components/model_prediction/labels/latestkey: azureml://registries/azureml/components/compute_metrics/labels/latest, Message: Invalid asset id azureml://registries/azureml/components/compute_metrics/labels/latest",
"message_parameters": {},
"reference_code": "JobSubmissionFailure",
"details": []
},
"time": "0001-01-01T00:00:00.000Z",
"component_name": "JobSubmission"
}
self = <tests.benchmark_result_aggregator.test_benchmark_result_aggregator.TestBenchmarkResultAggregatorComponent object at 0x7fa8daa73690>
temp_dir = '/tmp/pytest-of-runner/pytest-0/test_benchmark_result_aggregat2'
pipeline_file_name = 'benchmark_result_aggregator_pipeline_no_perf.yaml'
has_perf_step = False, has_quality_step = True
@pytest.mark.parametrize(
"pipeline_file_name, has_perf_step, has_quality_step",
[
("benchmark_result_aggregator_pipeline.yaml", True, True),
("benchmark_result_aggregator_pipeline_no_quality.yaml", True, False),
("benchmark_result_aggregator_pipeline_no_perf.yaml", False, True),
],
)
def test_benchmark_result_aggregator_component(
self,
temp_dir: str,
pipeline_file_name: str,
has_perf_step: bool,
has_quality_step: bool,
) -> None:
"""Benchmark result aggregator component test."""
ml_client = get_mlclient()
pipeline_job = self._get_pipeline_job(
pipeline_file_name,
self.test_benchmark_result_aggregator_component.__name__,
)
# submit the pipeline job
pipeline_job = ml_client.create_or_update(
pipeline_job, experiment_name=self.EXP_NAME
)
> ml_client.jobs.stream(pipeline_job.name)
../../tests/benchmark_result_aggregator/test_benchmark_result_aggregator.py:51:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/usr/share/miniconda/envs/isolated_1740722483737/lib/python3.11/site-packages/azure/core/tracing/decorator.py:116: in wrapper_use_tracer
return func(*args, **kwargs)
/usr/share/miniconda/envs/isolated_1740722483737/lib/python3.11/site-packages/azure/ai/ml/_telemetry/activity.py:288: in wrapper
return f(*args, **kwargs)
/usr/share/miniconda/envs/isolated_1740722483737/lib/python3.11/site-packages/azure/ai/ml/operations/_job_operations.py:838: in stream
self._stream_logs_until_completion(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
run_operations = <azure.ai.ml.operations._run_operations.RunOperations object at 0x7fa8d9d46010>
job_resource = <azure.ai.ml._restclient.v2024_01_01_preview.models._models_py3.JobBase object at 0x7fa8da077390>
datastore_operations = <azure.ai.ml.operations._datastore_operations.DatastoreOperations object at 0x7fa8d9e42b90>
raise_exception_on_failed_job = True
def stream_logs_until_completion(
run_operations: RunOperations,
job_resource: JobBase,
datastore_operations: Optional[DatastoreOperations] = None,
raise_exception_on_failed_job: bool = True,
*,
requests_pipeline: HttpPipeline
) -> None:
"""Stream the experiment run output to the specified file handle. By default the the file handle points to stdout.
:param run_operations: The run history operations class.
:type run_operations: RunOperations
:param job_resource: The job to stream
:type job_resource: JobBase
:param datastore_operations: Optional, the datastore operations class, used to get logs from datastore
:type datastore_operations: Optional[DatastoreOperations]
:param raise_exception_on_failed_job: Should this method fail if job fails
:type raise_exception_on_failed_job: Boolean
:keyword requests_pipeline: The HTTP pipeline to use for requests.
:type requests_pipeline: ~azure.ai.ml._utils._http_utils.HttpPipeline
:return:
:rtype: None
"""
job_type = job_resource.properties.job_type
job_name = job_resource.name
studio_endpoint = job_resource.properties.services.get("Studio", None)
studio_endpoint = studio_endpoint.endpoint if studio_endpoint else None
# Feature store jobs should be linked to the Feature Store Workspace UI.
# Todo: Consolidate this logic to service side
if "azureml.FeatureStoreJobType" in job_resource.properties.properties:
url_format = (
"https://ml.azure.com/featureStore/{fs_name}/featureSets/{fset_name}/{fset_version}/matJobs/"
"jobs/{run_id}?wsid=/subscriptions/{fs_sub_id}/resourceGroups/{fs_rg_name}/providers/"
"Microsoft.MachineLearningServices/workspaces/{fs_name}"
)
studio_endpoint = url_format.format(
fs_name=job_resource.properties.properties["azureml.FeatureStoreName"],
fs_sub_id=run_operations._subscription_id,
fs_rg_name=run_operations._resource_group_name,
fset_name=job_resource.properties.properties["azureml.FeatureSetName"],
fset_version=job_resource.properties.properties["azureml.FeatureSetVersion"],
run_id=job_name,
)
elif "FeatureStoreJobType" in job_resource.properties.properties:
url_format = (
"https://ml.azure.com/featureStore/{fs_name}/featureSets/{fset_name}/{fset_version}/matJobs/"
"jobs/{run_id}?wsid=/subscriptions/{fs_sub_id}/resourceGroups/{fs_rg_name}/providers/"
"Microsoft.MachineLearningServices/workspaces/{fs_name}"
)
studio_endpoint = url_format.format(
fs_name=job_resource.properties.properties["FeatureStoreName"],
fs_sub_id=run_operations._subscription_id,
fs_rg_name=run_operations._resource_group_name,
fset_name=job_resource.properties.properties["FeatureSetName"],
fset_version=job_resource.properties.properties["FeatureSetVersion"],
run_id=job_name,
)
file_handle = sys.stdout
ds_properties = None
prefix = None
if (
hasattr(job_resource.properties, "outputs")
and job_resource.properties.job_type != RestJobType.AUTO_ML
and datastore_operations
):
# Get default output location
default_output = (
job_resource.properties.outputs.get("default", None) if job_resource.properties.outputs else None
)
is_uri_folder = default_output and default_output.job_output_type == DataType.URI_FOLDER
if is_uri_folder:
output_uri = default_output.uri # type: ignore
# Parse the uri format
output_uri = output_uri.split("datastores/")[1]
datastore_name, prefix = output_uri.split("/", 1)
ds_properties = get_datastore_info(datastore_operations, datastore_name)
try:
file_handle.write("RunId: {}\n".format(job_name))
file_handle.write("Web View: {}\n".format(studio_endpoint))
_current_details: RunDetails = run_operations.get_run_details(job_name)
processed_logs: Dict = {}
poll_start_time = time.time()
pipeline_with_retries = create_requests_pipeline_with_retry(requests_pipeline=requests_pipeline)
while (
_current_details.status in RunHistoryConstants.IN_PROGRESS_STATUSES
or _current_details.status == JobStatus.FINALIZING
):
file_handle.flush()
time.sleep(_wait_before_polling(time.time() - poll_start_time))
_current_details = run_operations.get_run_details(job_name) # TODO use FileWatcher
if job_type.lower() in JobType.PIPELINE:
legacy_folder_name = "/logs/azureml/"
else:
legacy_folder_name = "/azureml-logs/"
_current_logs_dict = (
list_logs_in_datastore(
ds_properties,
prefix=str(prefix),
legacy_log_folder_name=legacy_folder_name,
)
if ds_properties is not None
else _current_details.log_files
)
# Get the list of new logs available after filtering out the processed ones
available_logs = _get_sorted_filtered_logs(_current_logs_dict, job_type, processed_logs)
content = ""
for current_log in available_logs:
content = download_text_from_url(
_current_logs_dict[current_log],
pipeline_with_retries,
timeout=RunHistoryConstants._DEFAULT_GET_CONTENT_TIMEOUT,
)
_incremental_print(content, processed_logs, current_log, file_handle)
# TODO: Temporary solution to wait for all the logs to be printed in the finalizing state.
if (
_current_details.status not in RunHistoryConstants.IN_PROGRESS_STATUSES
and _current_details.status == JobStatus.FINALIZING
and "The activity completed successfully. Finalizing run..." in content
):
break
file_handle.write("\n")
file_handle.write("Execution Summary\n")
file_handle.write("=================\n")
file_handle.write("RunId: {}\n".format(job_name))
file_handle.write("Web View: {}\n".format(studio_endpoint))
warnings = _current_details.warnings
if warnings:
messages = [x.message for x in warnings if x.message]
if len(messages) > 0:
file_handle.write("\nWarnings:\n")
for message in messages:
file_handle.write(message + "\n")
file_handle.write("\n")
if _current_details.status == JobStatus.FAILED:
error = (
_current_details.error.as_dict()
if _current_details.error
else "Detailed error not set on the Run. Please check the logs for details."
)
# If we are raising the error later on, so we don't double print.
if not raise_exception_on_failed_job:
file_handle.write("\nError:\n")
file_handle.write(json.dumps(error, indent=4))
file_handle.write("\n")
else:
> raise JobException(
message="Exception : \n {} ".format(json.dumps(error, indent=4)),
target=ErrorTarget.JOB,
no_personal_data_message="Exception raised on failed job.",
error_category=ErrorCategory.SYSTEM_ERROR,
)
E azure.ai.ml.exceptions.JobException: Exception :
E {
E "error": {
E "code": "UserError",
E "message": "Failed to submit job due to key: azureml://registries/azureml/components/model_prediction/labels/latest, Message: Invalid asset id azureml://registries/azureml/components/model_prediction/labels/latestkey: azureml://registries/azureml/components/compute_metrics/labels/latest, Message: Invalid asset id azureml://registries/azureml/components/compute_metrics/labels/latest",
E "message_format": "Failed to submit job due to key: azureml://registries/azureml/components/model_prediction/labels/latest, Message: Invalid asset id azureml://registries/azureml/components/model_prediction/labels/latestkey: azureml://registries/azureml/components/compute_metrics/labels/latest, Message: Invalid asset id azureml://registries/azureml/components/compute_metrics/labels/latest",
E "message_parameters": {},
E "reference_code": "JobSubmissionFailure",
E "details": []
E },
E "time": "0001-01-01T00:00:00.000Z",
E "component_name": "JobSubmission"
E }
/usr/share/miniconda/envs/isolated_1740722483737/lib/python3.11/site-packages/azure/ai/ml/operations/_job_ops_helper.py:334: JobException
github-actions / Test Results for assets-test
test_dataset_downloader_component[None-all-test-/home/runner/work/azureml-assets/azureml-assets/assets/aml-benchmark/scripts/da…/math.py] (component/dataset_downloader.tests.dataset_downloader.test_dataset_downloader.TestDatasetDownloaderComponent) failed
pytest-reports/component/dataset_downloader.xml [took 11m 54s]
Raw output
azure.ai.ml.exceptions.JobException: Exception :
{
"error": {
"code": "UserError",
"message": "Pipeline has failed child jobs. Failed nodes: /run_dataset_downloader. For more details and logs, please go to the job detail page and check the child jobs.",
"message_format": "Pipeline has failed child jobs. {0}",
"message_parameters": {},
"reference_code": "PipelineHasStepJobFailed",
"details": []
},
"environment": "eastus",
"location": "eastus",
"time": "2025-02-28T06:38:15.974417Z",
"component_name": ""
}
self = <tests.dataset_downloader.test_dataset_downloader.TestDatasetDownloaderComponent object at 0x7fcc16c04650>
temp_dir = '/tmp/pytest-of-runner/pytest-0/test_dataset_downloader_compon4'
dataset_name = None, configuration = 'all', split = 'test'
script = '/home/runner/work/azureml-assets/azureml-assets/assets/aml-benchmark/scripts/data_loaders/math.py'
@pytest.mark.parametrize(
"dataset_name, configuration, split, script",
[
("xquad", "xquad.en", "validation", None),
("xquad", "xquad.en,xquad.hi", "all", None),
("xquad", "all", "all", None),
("cifar10", "all", "test", None),
(None, "all", "test", Constants.MATH_DATASET_LOADER_SCRIPT),
],
)
def test_dataset_downloader_component(
self,
temp_dir: str,
dataset_name: Union[str, None],
configuration: Union[str, None],
split: Union[str, None],
script: Union[str, None],
) -> None:
"""Dataset Downloader component test."""
ml_client = get_mlclient()
pipeline_job = self._get_pipeline_job(
dataset_name,
configuration,
split,
script,
self.test_dataset_downloader_component.__name__,
)
# submit the pipeline job
pipeline_job = ml_client.create_or_update(
pipeline_job, experiment_name=self.EXP_NAME
)
> ml_client.jobs.stream(pipeline_job.name)
../../tests/dataset_downloader/test_dataset_downloader.py:68:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/usr/share/miniconda/envs/isolated_1740722439336/lib/python3.11/site-packages/azure/core/tracing/decorator.py:116: in wrapper_use_tracer
return func(*args, **kwargs)
/usr/share/miniconda/envs/isolated_1740722439336/lib/python3.11/site-packages/azure/ai/ml/_telemetry/activity.py:288: in wrapper
return f(*args, **kwargs)
/usr/share/miniconda/envs/isolated_1740722439336/lib/python3.11/site-packages/azure/ai/ml/operations/_job_operations.py:838: in stream
self._stream_logs_until_completion(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
run_operations = <azure.ai.ml.operations._run_operations.RunOperations object at 0x7fcc0fe53f90>
job_resource = <azure.ai.ml._restclient.v2024_01_01_preview.models._models_py3.JobBase object at 0x7fcc140af9d0>
datastore_operations = <azure.ai.ml.operations._datastore_operations.DatastoreOperations object at 0x7fcc0ff9a4d0>
raise_exception_on_failed_job = True
def stream_logs_until_completion(
run_operations: RunOperations,
job_resource: JobBase,
datastore_operations: Optional[DatastoreOperations] = None,
raise_exception_on_failed_job: bool = True,
*,
requests_pipeline: HttpPipeline
) -> None:
"""Stream the experiment run output to the specified file handle. By default the the file handle points to stdout.
:param run_operations: The run history operations class.
:type run_operations: RunOperations
:param job_resource: The job to stream
:type job_resource: JobBase
:param datastore_operations: Optional, the datastore operations class, used to get logs from datastore
:type datastore_operations: Optional[DatastoreOperations]
:param raise_exception_on_failed_job: Should this method fail if job fails
:type raise_exception_on_failed_job: Boolean
:keyword requests_pipeline: The HTTP pipeline to use for requests.
:type requests_pipeline: ~azure.ai.ml._utils._http_utils.HttpPipeline
:return:
:rtype: None
"""
job_type = job_resource.properties.job_type
job_name = job_resource.name
studio_endpoint = job_resource.properties.services.get("Studio", None)
studio_endpoint = studio_endpoint.endpoint if studio_endpoint else None
# Feature store jobs should be linked to the Feature Store Workspace UI.
# Todo: Consolidate this logic to service side
if "azureml.FeatureStoreJobType" in job_resource.properties.properties:
url_format = (
"https://ml.azure.com/featureStore/{fs_name}/featureSets/{fset_name}/{fset_version}/matJobs/"
"jobs/{run_id}?wsid=/subscriptions/{fs_sub_id}/resourceGroups/{fs_rg_name}/providers/"
"Microsoft.MachineLearningServices/workspaces/{fs_name}"
)
studio_endpoint = url_format.format(
fs_name=job_resource.properties.properties["azureml.FeatureStoreName"],
fs_sub_id=run_operations._subscription_id,
fs_rg_name=run_operations._resource_group_name,
fset_name=job_resource.properties.properties["azureml.FeatureSetName"],
fset_version=job_resource.properties.properties["azureml.FeatureSetVersion"],
run_id=job_name,
)
elif "FeatureStoreJobType" in job_resource.properties.properties:
url_format = (
"https://ml.azure.com/featureStore/{fs_name}/featureSets/{fset_name}/{fset_version}/matJobs/"
"jobs/{run_id}?wsid=/subscriptions/{fs_sub_id}/resourceGroups/{fs_rg_name}/providers/"
"Microsoft.MachineLearningServices/workspaces/{fs_name}"
)
studio_endpoint = url_format.format(
fs_name=job_resource.properties.properties["FeatureStoreName"],
fs_sub_id=run_operations._subscription_id,
fs_rg_name=run_operations._resource_group_name,
fset_name=job_resource.properties.properties["FeatureSetName"],
fset_version=job_resource.properties.properties["FeatureSetVersion"],
run_id=job_name,
)
file_handle = sys.stdout
ds_properties = None
prefix = None
if (
hasattr(job_resource.properties, "outputs")
and job_resource.properties.job_type != RestJobType.AUTO_ML
and datastore_operations
):
# Get default output location
default_output = (
job_resource.properties.outputs.get("default", None) if job_resource.properties.outputs else None
)
is_uri_folder = default_output and default_output.job_output_type == DataType.URI_FOLDER
if is_uri_folder:
output_uri = default_output.uri # type: ignore
# Parse the uri format
output_uri = output_uri.split("datastores/")[1]
datastore_name, prefix = output_uri.split("/", 1)
ds_properties = get_datastore_info(datastore_operations, datastore_name)
try:
file_handle.write("RunId: {}\n".format(job_name))
file_handle.write("Web View: {}\n".format(studio_endpoint))
_current_details: RunDetails = run_operations.get_run_details(job_name)
processed_logs: Dict = {}
poll_start_time = time.time()
pipeline_with_retries = create_requests_pipeline_with_retry(requests_pipeline=requests_pipeline)
while (
_current_details.status in RunHistoryConstants.IN_PROGRESS_STATUSES
or _current_details.status == JobStatus.FINALIZING
):
file_handle.flush()
time.sleep(_wait_before_polling(time.time() - poll_start_time))
_current_details = run_operations.get_run_details(job_name) # TODO use FileWatcher
if job_type.lower() in JobType.PIPELINE:
legacy_folder_name = "/logs/azureml/"
else:
legacy_folder_name = "/azureml-logs/"
_current_logs_dict = (
list_logs_in_datastore(
ds_properties,
prefix=str(prefix),
legacy_log_folder_name=legacy_folder_name,
)
if ds_properties is not None
else _current_details.log_files
)
# Get the list of new logs available after filtering out the processed ones
available_logs = _get_sorted_filtered_logs(_current_logs_dict, job_type, processed_logs)
content = ""
for current_log in available_logs:
content = download_text_from_url(
_current_logs_dict[current_log],
pipeline_with_retries,
timeout=RunHistoryConstants._DEFAULT_GET_CONTENT_TIMEOUT,
)
_incremental_print(content, processed_logs, current_log, file_handle)
# TODO: Temporary solution to wait for all the logs to be printed in the finalizing state.
if (
_current_details.status not in RunHistoryConstants.IN_PROGRESS_STATUSES
and _current_details.status == JobStatus.FINALIZING
and "The activity completed successfully. Finalizing run..." in content
):
break
file_handle.write("\n")
file_handle.write("Execution Summary\n")
file_handle.write("=================\n")
file_handle.write("RunId: {}\n".format(job_name))
file_handle.write("Web View: {}\n".format(studio_endpoint))
warnings = _current_details.warnings
if warnings:
messages = [x.message for x in warnings if x.message]
if len(messages) > 0:
file_handle.write("\nWarnings:\n")
for message in messages:
file_handle.write(message + "\n")
file_handle.write("\n")
if _current_details.status == JobStatus.FAILED:
error = (
_current_details.error.as_dict()
if _current_details.error
else "Detailed error not set on the Run. Please check the logs for details."
)
# If we are raising the error later on, so we don't double print.
if not raise_exception_on_failed_job:
file_handle.write("\nError:\n")
file_handle.write(json.dumps(error, indent=4))
file_handle.write("\n")
else:
> raise JobException(
message="Exception : \n {} ".format(json.dumps(error, indent=4)),
target=ErrorTarget.JOB,
no_personal_data_message="Exception raised on failed job.",
error_category=ErrorCategory.SYSTEM_ERROR,
)
E azure.ai.ml.exceptions.JobException: Exception :
E {
E "error": {
E "code": "UserError",
E "message": "Pipeline has failed child jobs. Failed nodes: /run_dataset_downloader. For more details and logs, please go to the job detail page and check the child jobs.",
E "message_format": "Pipeline has failed child jobs. {0}",
E "message_parameters": {},
E "reference_code": "PipelineHasStepJobFailed",
E "details": []
E },
E "environment": "eastus",
E "location": "eastus",
E "time": "2025-02-28T06:38:15.974417Z",
E "component_name": ""
E }
/usr/share/miniconda/envs/isolated_1740722439336/lib/python3.11/site-packages/azure/ai/ml/operations/_job_ops_helper.py:334: JobException