Skip to content

Commit

Permalink
#17731: Upload gtest testcase data to superset (#17950)
Browse files Browse the repository at this point in the history
### Ticket
#17731 

### Problem description
The produce_data python script doesn't support gtest-generated xml
files. As a result, gtest data isn't uploaded to superset.

### What's changed
Add gtest support:
- update xml utils to handle both pytest and gtest xml files
- add unit tests
- add model constraint validation to ensure test-specific table
(`sw_test.cicd_test`) constraints are not violated (job_id,
full_test_name, test_start_ts)

### Checklist
- [x] New/Existing tests provide coverage for changes
https://github.com/tenstorrent/tt-metal/actions/runs/13399311809
  • Loading branch information
williamlyTT authored Feb 19, 2025
1 parent b26e037 commit e820e8d
Show file tree
Hide file tree
Showing 21 changed files with 12,599 additions and 57 deletions.
22 changes: 14 additions & 8 deletions infra/data_collection/cicd.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,19 +65,25 @@ def create_cicd_json_for_data_analysis(

test_report_exists = github_job_id in github_job_id_to_test_reports
if test_report_exists:
test_report_path = github_job_id_to_test_reports[github_job_id]
tests = get_tests_from_test_report_path(test_report_path)
tests = []
test_reports = github_job_id_to_test_reports[github_job_id]
for test_report_path in test_reports:
logger.info(f"Job id:{github_job_id} Analyzing test report {test_report_path}")
tests += get_tests_from_test_report_path(test_report_path)
else:
tests = []

logger.info(f"Found {len(tests)} tests for job {github_job_id}")

job = pydantic_models.Job(
**raw_job,
tests=tests,
)

jobs.append(job)
try:
job = pydantic_models.Job(
**raw_job,
tests=tests,
)
except ValueError as e:
logger.warning(f"Skipping insert for job {github_job_id}, model validation failed: {e}")
else:
jobs.append(job)

pipeline = pydantic_models.Pipeline(
**raw_pipeline,
Expand Down
85 changes: 53 additions & 32 deletions infra/data_collection/github/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import pathlib
import json
from datetime import datetime
from datetime import datetime, timedelta
from functools import partial
from typing import List

Expand All @@ -26,11 +26,10 @@ def get_workflow_run_uuids_to_test_reports_paths_(workflow_outputs_dir, workflow
test_report_uuid = test_report_dir.name.replace("test_reports_", "")

try:
xml_file_paths = (test_report_dir / "most_recent_tests.xml").resolve(strict=True)
# read all *.xml in test_report_dir (gtest can have one xml files per test executable)
xml_file_paths = [file.resolve(strict=True) for file in list(test_report_dir.glob("*.xml"))]
except FileNotFoundError as e:
logger.warning(
f"no pytest xml file found matching most_recent_tests.xml (likely gtest xml) in {test_report_dir}"
)
logger.warning(f"No pytest or gtest xml file found in {test_report_dir}, skipping directory.")
else:
workflow_run_test_reports_path[test_report_uuid] = xml_file_paths

Expand Down Expand Up @@ -134,56 +133,71 @@ def get_github_job_id_to_annotations(workflow_outputs_dir, workflow_run_id: int)
return github_job_ids_to_annotation_jsons


def get_pydantic_test_from_pytest_testcase_(testcase, default_timestamp=datetime.now()):
skipped = junit_xml_utils.get_pytest_testcase_is_skipped(testcase)
failed = junit_xml_utils.get_pytest_testcase_is_failed(testcase)
error = junit_xml_utils.get_pytest_testcase_is_error(testcase)
def get_pydantic_test_from_testcase_(testcase, default_timestamp=datetime.now(), is_pytest=True, testsuite_name=None):
skipped = junit_xml_utils.get_testcase_is_skipped(testcase)
failed = junit_xml_utils.get_testcase_is_failed(testcase)
error = junit_xml_utils.get_testcase_is_error(testcase)
success = not (failed or error)

error_message = None

# Error is a scarier thing than failure because it means there's an infra error, expose that first
if failed:
error_message = junit_xml_utils.get_pytest_failure_message(testcase)
error_message = junit_xml_utils.get_test_failure_message(testcase)

if error:
error_message = junit_xml_utils.get_pytest_error_message(testcase)
error_message = junit_xml_utils.get_test_error_message(testcase)

# Error at the beginning of a test can prevent pytest from recording timestamps at all
if not (skipped or error):
properties = junit_xml_utils.get_pytest_testcase_properties(testcase)
# Check if properties is none to see if pytest recorded the timestamps
if properties is not None:
test_start_ts = datetime.strptime(properties["start_timestamp"], "%Y-%m-%dT%H:%M:%S")
test_end_ts = datetime.strptime(properties["end_timestamp"], "%Y-%m-%dT%H:%M:%S")
if is_pytest:
properties = junit_xml_utils.get_pytest_testcase_properties(testcase)
# Check if properties is none to see if pytest recorded the timestamps
if properties is not None:
test_start_ts = datetime.strptime(properties["start_timestamp"], "%Y-%m-%dT%H:%M:%S")
test_end_ts = datetime.strptime(properties["end_timestamp"], "%Y-%m-%dT%H:%M:%S")
else:
test_start_ts = default_timestamp
test_end_ts = default_timestamp
else:
test_start_ts = default_timestamp
test_end_ts = default_timestamp
# gtest stores elapsed time for the test in the time attribute
gtest_elapsed_time = float(testcase.attrib["time"])
test_end_ts = default_timestamp + timedelta(seconds=gtest_elapsed_time)
else:
test_start_ts = default_timestamp
test_end_ts = default_timestamp

test_case_name = testcase.attrib["name"].split("[")[0]

filepath_no_ext = testcase.attrib["classname"].replace(".", "/")
filepath = f"{filepath_no_ext}.py"
if is_pytest:
filepath_no_ext = testcase.attrib["classname"].replace(".", "/")
filepath = f"{filepath_no_ext}.py"
else:
filepath = testcase.attrib["file"]
if filepath.startswith("/work/"):
filepath = filepath.lstrip("/work/")

def get_category_from_pytest_testcase_(testcase_):
def get_category_from_testcase_(testcase_, is_pytest=True):
categories = ["models", "ttnn", "tt_eager", "tt_metal"]
for category in categories:
if category in testcase_.attrib["classname"]:
identifier_attrib = "classname" if is_pytest else "file"
if category in testcase_.attrib[identifier_attrib]:
return category
return "other"

category = get_category_from_pytest_testcase_(testcase)
category = get_category_from_testcase_(testcase, is_pytest=is_pytest)

# leaving empty for now
group = None

# leaving empty for now
owner = None

full_test_name = f"{filepath}::{testcase.attrib['name']}"
if testsuite_name:
full_test_name = f"{filepath}::{testsuite_name}::{testcase.attrib['name']}"
else:
full_test_name = f"{filepath}::{testcase.attrib['name']}"

# to be populated with [] if available
config = None
Expand Down Expand Up @@ -229,17 +243,24 @@ def get_tests_from_test_report_path(test_report_path):
report_root = report_root_tree.getroot()

is_pytest = junit_xml_utils.is_pytest_junit_xml(report_root)
is_gtest = junit_xml_utils.is_gtest_xml(report_root)

if is_pytest:
testsuite = report_root[0]
default_timestamp = datetime.strptime(testsuite.attrib["timestamp"], "%Y-%m-%dT%H:%M:%S.%f")

get_pydantic_test = partial(get_pydantic_test_from_pytest_testcase_, default_timestamp=default_timestamp)

if is_pytest or is_gtest:
logger.info(f"Found {len(report_root)} testsuites")
tests = []
for testcase in testsuite:
if is_valid_testcase_(testcase):
tests.append(get_pydantic_test(testcase))
for i in range(len(report_root)):
testsuite = report_root[i]
testsuite_name = testsuite.attrib.get("name") if is_gtest else None
default_timestamp = datetime.strptime(testsuite.attrib["timestamp"], "%Y-%m-%dT%H:%M:%S.%f")
get_pydantic_test = partial(
get_pydantic_test_from_testcase_,
default_timestamp=default_timestamp,
is_pytest=is_pytest,
testsuite_name=testsuite_name,
)
for testcase in testsuite:
if is_valid_testcase_(testcase):
tests.append(get_pydantic_test(testcase))

return tests
else:
Expand Down
44 changes: 28 additions & 16 deletions infra/data_collection/junit_xml_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,45 @@ def get_xml_file_root_element_tree(filepath):
return root_element_tree


def sanity_check_pytest_junit_xml_(root_element):
def sanity_check_test_xml_(root_element, is_pytest=True):
testsuite_count = len(root_element)

assert testsuite_count == 1, f"{len(root_element)}"

logger.debug("Asserted pytest junit xml")

if is_pytest:
assert testsuite_count == 1, f"{len(root_element)}"
logger.debug("Asserted pytest junit xml")
else:
assert testsuite_count >= 1, f"{len(root_element)}"
logger.debug("Asserted gtest xml")
return root_element


def is_pytest_junit_xml(root_element):
is_pytest = root_element[0].get("name") == "pytest"

if is_pytest:
sanity_check_pytest_junit_xml_(root_element)
sanity_check_test_xml_(root_element)

return is_pytest


def is_gtest_xml(root_element):
is_gtest = root_element[0].get("name") != "pytest"

if is_gtest:
sanity_check_test_xml_(root_element, is_pytest=False)

return is_gtest


def get_at_most_one_single_child_element_(element, tag_name):
is_expected = lambda child_: child_.tag == tag_name

potential_expected_blocks = list(filter(is_expected, element))

assert (
len(potential_expected_blocks) <= 1
), f"{len(potential_expected_blocks)} is not exactly 1 for tag name {tag_name}"
# downgrade assert to warning
if len(potential_expected_blocks) > 1:
element_name = element.attrib.get("name", "unknown_name")
logger.warning(f"{element_name} : {len(potential_expected_blocks)} is greater than 1 for tag name {tag_name}")

return potential_expected_blocks[0] if len(potential_expected_blocks) else None

Expand Down Expand Up @@ -73,31 +85,31 @@ def get_optional_child_element_exists_(parent_element, tag_name):
return get_at_most_one_single_child_element_(parent_element, tag_name) != None


def get_pytest_testcase_is_skipped(testcase_element):
def get_testcase_is_skipped(testcase_element):
return get_optional_child_element_exists_(testcase_element, "skipped")


def get_pytest_testcase_is_failed(testcase_element):
def get_testcase_is_failed(testcase_element):
return get_optional_child_element_exists_(testcase_element, "failure")


def get_pytest_testcase_is_error(testcase_element):
def get_testcase_is_error(testcase_element):
return get_optional_child_element_exists_(testcase_element, "error")


# opportunity for less copy-pasta


def get_pytest_failure_message(testcase_element):
assert get_pytest_testcase_is_failed(testcase_element)
def get_test_failure_message(testcase_element):
assert get_testcase_is_failed(testcase_element)

failure_element = get_at_most_one_single_child_element_(testcase_element, "failure")

return failure_element.attrib["message"]


def get_pytest_error_message(testcase_element):
assert get_pytest_testcase_is_error(testcase_element)
def get_test_error_message(testcase_element):
assert get_testcase_is_error(testcase_element)

error_element = get_at_most_one_single_child_element_(testcase_element, "error")

Expand Down
16 changes: 15 additions & 1 deletion infra/data_collection/pydantic_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from datetime import datetime
from typing import List, Optional

from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, model_validator


class Test(BaseModel):
Expand Down Expand Up @@ -74,6 +74,20 @@ class Job(BaseModel):
failure_description: Optional[str] = Field(None, description="Failure description.")
tests: List[Test] = []

# Model validator to check the unique combination constraint
@model_validator(mode="before")
def check_unique_tests(cls, values):
tests = values.get("tests", [])
seen_combinations = set()

for test in tests:
# for each job, the test constraint is full_test_name, test_start_ts
test_combination = (test.full_test_name, test.test_start_ts)
if test_combination in seen_combinations:
raise ValueError(f"Duplicate test combination found: {test_combination}")
seen_combinations.add(test_combination)
return values


class Pipeline(BaseModel):
"""
Expand Down
Loading

0 comments on commit e820e8d

Please sign in to comment.