From d1036260ad971d4d415b71e561741447abb6c3d8 Mon Sep 17 00:00:00 2001 From: Abhishek Chauhan Date: Mon, 18 Nov 2024 20:26:59 +0530 Subject: [PATCH 1/5] parallel processing added in evaluation module --- agentneo/evaluation/evaluation.py | 51 ++++++++++++++++++++----------- 1 file changed, 34 insertions(+), 17 deletions(-) diff --git a/agentneo/evaluation/evaluation.py b/agentneo/evaluation/evaluation.py index bd3b8950..dc6effd3 100644 --- a/agentneo/evaluation/evaluation.py +++ b/agentneo/evaluation/evaluation.py @@ -5,6 +5,8 @@ from typing import Optional import json import ast +from concurrent.futures import ThreadPoolExecutor, as_completed +from datetime import datetime from ..data import ( ProjectInfoModel, @@ -22,8 +24,6 @@ execute_plan_adaptibility_metric, ) -from datetime import datetime - class Evaluation: def __init__(self, session, trace_id): self.user_session = session @@ -38,17 +38,41 @@ def __init__(self, session, trace_id): self.trace_data = self.get_trace_data() def evaluate(self, metric_list=[], config={}, metadata={}): - for metric in metric_list: - start_time = datetime.now() - result = self._execute_metric(metric, config, metadata) - end_time = datetime.now() - duration = (end_time - start_time).total_seconds() + """ + Evaluate a list of metrics in parallel. + """ + results = [] + with ThreadPoolExecutor() as executor: + # Submit all metrics for parallel execution + future_to_metric = { + executor.submit(self._execute_and_save_metric, metric, config, metadata): metric + for metric in metric_list + } - self._save_metric_result(metric, result, start_time, end_time, duration) + for future in as_completed(future_to_metric): + metric = future_to_metric[future] + try: + # Get the result of the future + result = future.result() + results.append(result) + except Exception as e: + print(f"Metric {metric} failed with exception: {e}") self.session.commit() self.session.close() + def _execute_and_save_metric(self, metric, config, metadata): + """ + Execute a metric and save its result. + """ + start_time = datetime.now() + result = self._execute_metric(metric, config, metadata) + end_time = datetime.now() + duration = (end_time - start_time).total_seconds() + + self._save_metric_result(metric, result, start_time, end_time, duration) + return result + def _execute_metric(self, metric, config, metadata): if metric == 'goal_decomposition_efficiency': return execute_goal_decomposition_efficiency_metric( @@ -74,7 +98,6 @@ def _execute_metric(self, metric, config, metadata): else: raise ValueError("provided metric name is not supported.") - def _save_metric_result(self, metric, result, start_time, end_time, duration): metric_entry = MetricModel( trace_id=self.trace_id, @@ -122,9 +145,8 @@ def get_trace_data(self): return json_data[0] - except: - raise ValueError("Unable to load the trace data.") - + except Exception as e: + raise ValueError(f"Unable to load the trace data: {e}") def serialize_trace(self, trace): """Convert a TraceModel object into a dictionary, including all related objects.""" @@ -149,12 +171,8 @@ def serialize_trace(self, trace): { 'id': agent_call.id, 'name': agent_call.name, - # 'input_parameters': self.parse_json_field(agent_call.input_parameters), - # 'output': self.parse_json_field(agent_call.output), 'start_time': agent_call.start_time.isoformat() if agent_call.start_time else None, 'end_time': agent_call.end_time.isoformat() if agent_call.end_time else None, - # 'duration': agent_call.duration, - # 'memory_used': agent_call.memory_used 'llm_call_ids': self.parse_json_field(agent_call.llm_call_ids), 'tool_call_ids': self.parse_json_field(agent_call.tool_call_ids), 'user_interaction_ids': self.parse_json_field(agent_call.user_interaction_ids), @@ -206,7 +224,6 @@ def serialize_trace(self, trace): } for interaction in trace.user_interactions ] } - def parse_json_field(self, field): """Parse a JSON string field into a Python object, if necessary.""" From 32cb7fe871672e5e3add375a9fd9a0338a897b55 Mon Sep 17 00:00:00 2001 From: Abhishek Chauhan Date: Mon, 18 Nov 2024 21:33:52 +0530 Subject: [PATCH 2/5] added tests + updated the docs --- agentneo/evaluation/evaluation.py | 16 ++++++- docs/content/metrics/configuration.md | 1 + docs/content/metrics/overview.md | 2 +- docs/content/metrics/supported-metrics.md | 5 ++- tests/test_agentneo.py | 53 +++++++++++++++++++++++ 5 files changed, 72 insertions(+), 5 deletions(-) diff --git a/agentneo/evaluation/evaluation.py b/agentneo/evaluation/evaluation.py index dc6effd3..59e4ca01 100644 --- a/agentneo/evaluation/evaluation.py +++ b/agentneo/evaluation/evaluation.py @@ -37,18 +37,28 @@ def __init__(self, session, trace_id): self.trace_data = self.get_trace_data() - def evaluate(self, metric_list=[], config={}, metadata={}): + def evaluate(self, metric_list=[], config={}, metadata={}, max_workers=None): """ Evaluate a list of metrics in parallel. + + Parameters: + - metric_list: List of metrics to evaluate. + - config: Configuration settings for the evaluation. + - metadata: Metadata for the evaluation. + - max_workers: The maximum number of workers to use for parallel execution. + If None, it will use the default number of workers based on the system. """ results = [] - with ThreadPoolExecutor() as executor: + + # Use ThreadPoolExecutor with max_workers if provided + with ThreadPoolExecutor(max_workers=max_workers) as executor: # Submit all metrics for parallel execution future_to_metric = { executor.submit(self._execute_and_save_metric, metric, config, metadata): metric for metric in metric_list } + # Collect results as they are completed for future in as_completed(future_to_metric): metric = future_to_metric[future] try: @@ -58,9 +68,11 @@ def evaluate(self, metric_list=[], config={}, metadata={}): except Exception as e: print(f"Metric {metric} failed with exception: {e}") + # Commit session and close it self.session.commit() self.session.close() + def _execute_and_save_metric(self, metric, config, metadata): """ Execute a metric and save its result. diff --git a/docs/content/metrics/configuration.md b/docs/content/metrics/configuration.md index 31141218..eb207d92 100644 --- a/docs/content/metrics/configuration.md +++ b/docs/content/metrics/configuration.md @@ -25,6 +25,7 @@ exe.evaluate( metric_list=['metric_name'], config=config, metadata=metadata + max_workers = 4 ) ``` diff --git a/docs/content/metrics/overview.md b/docs/content/metrics/overview.md index a1698507..75258687 100644 --- a/docs/content/metrics/overview.md +++ b/docs/content/metrics/overview.md @@ -18,7 +18,7 @@ from agentneo import Evaluation exe = Evaluation(session=neo_session, trace_id=tracer.trace_id) # Run evaluation -exe.evaluate(metric_list=['goal_decomposition_efficiency']) +exe.evaluate(metric_list=['goal_decomposition_efficiency'], max_workers = 4 ) # Get results results = exe.get_results() diff --git a/docs/content/metrics/supported-metrics.md b/docs/content/metrics/supported-metrics.md index 38ba1c43..3420eed1 100644 --- a/docs/content/metrics/supported-metrics.md +++ b/docs/content/metrics/supported-metrics.md @@ -58,13 +58,14 @@ exe.evaluate(metric_list=['tool_call_success_rate']) ## Using Multiple Metrics ```python -# Evaluate multiple metrics together +# Evaluate multiple metrics together exe.evaluate( metric_list=[ 'goal_decomposition_efficiency', 'goal_fulfillment_rate', 'tool_call_correctness_rate', 'tool_call_success_rate' - ] + ], + max_workers = 4 ) ``` \ No newline at end of file diff --git a/tests/test_agentneo.py b/tests/test_agentneo.py index 7991940b..f48ce590 100644 --- a/tests/test_agentneo.py +++ b/tests/test_agentneo.py @@ -3,6 +3,15 @@ from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from unittest import mock +from concurrent.futures import ThreadPoolExecutor +import time + + +def mock_task(id): + """Mock task for testing, just a dummy task.""" + time.sleep(1) + return f"Task {id} completed" + @pytest.fixture(scope="module") def setup_engine(): @@ -15,11 +24,20 @@ def setup_engine(): Base.metadata.create_all(engine) yield engine + @pytest.fixture def agentneo_instance(setup_engine): """Fixture to provide a clean instance of AgentNeo for each test.""" return AgentNeo() + +@pytest.fixture +def executor(): + """Fixture for creating a ThreadPoolExecutor.""" + with ThreadPoolExecutor(max_workers=3) as executor: + yield executor + + def test_connect_project(agentneo_instance): """Test connecting to an existing project.""" project_name = "Connect Project" @@ -35,6 +53,7 @@ def test_connect_project(agentneo_instance): assert connected_id == agentneo_instance.project_id assert agentneo_instance.project_name == project_name + def test_create_project(agentneo_instance): """Test creating a new project.""" project_name = "Test Project" @@ -49,6 +68,7 @@ def test_create_project(agentneo_instance): with pytest.raises(ValueError, match=f"Project '{project_name}' already exists."): agentneo_instance.create_project(project_name) + def test_list_projects(agentneo_instance): """Test listing projects.""" project_names = ["Project 1", "Project 2", "Project 3"] @@ -72,6 +92,39 @@ def test_list_projects(agentneo_instance): limited_projects = AgentNeo.list_projects(num_projects=2) assert len(limited_projects) == 2 + +def test_parallel_execution(agentneo_instance, executor): + """Test parallel execution with ThreadPoolExecutor, simulating metric evaluations.""" + metric_list = ["metric_1", "metric_2", "metric_3", "metric_4", "metric_5"] + + # Submit multiple tasks for each metric evaluation (simulating work) + futures = [executor.submit(mock_task, i) for i in range(len(metric_list))] + + # Ensure that all tasks complete + results = [future.result() for future in futures] + + # Assert that all tasks completed as expected + assert len(results) == len(metric_list) + assert all(f"Task {i} completed" for i in range(len(metric_list))) + + +def test_max_workers(agentneo_instance, executor): + """Test the max workers configuration in ThreadPoolExecutor.""" + + # Submit 5 tasks, but only 3 workers should be running concurrently + start_time = time.time() + futures = [executor.submit(mock_task, i) for i in range(5)] + + # Wait for all tasks to complete + for future in futures: + future.result() + + end_time = time.time() + + # Total time should be around 3 seconds since we have a max of 3 workers running concurrently + assert end_time - start_time < 5 # It should not exceed 5 seconds + + @mock.patch("agentneo.server.dashboard.launch_dashboard") def test_launch_dashboard(mock_launch_dashboard): """Test launching the dashboard.""" From 446d94b59fe1b2207f35db32bf89578edb57d8cc Mon Sep 17 00:00:00 2001 From: Abhishek Chauhan Date: Mon, 18 Nov 2024 23:09:36 +0530 Subject: [PATCH 3/5] Include progress tracking and error handling --- agentneo/evaluation/evaluation.py | 63 ++++++++++++++++++------------- 1 file changed, 36 insertions(+), 27 deletions(-) diff --git a/agentneo/evaluation/evaluation.py b/agentneo/evaluation/evaluation.py index 59e4ca01..0fa1623c 100644 --- a/agentneo/evaluation/evaluation.py +++ b/agentneo/evaluation/evaluation.py @@ -1,12 +1,11 @@ from sqlalchemy import create_engine from sqlalchemy.orm import Session -from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.exc import SQLAlchemyError -from typing import Optional -import json -import ast from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime +import json +import ast +from tqdm import tqdm from ..data import ( ProjectInfoModel, @@ -39,7 +38,7 @@ def __init__(self, session, trace_id): def evaluate(self, metric_list=[], config={}, metadata={}, max_workers=None): """ - Evaluate a list of metrics in parallel. + Evaluate a list of metrics in parallel with progress tracking. Parameters: - metric_list: List of metrics to evaluate. @@ -50,7 +49,7 @@ def evaluate(self, metric_list=[], config={}, metadata={}, max_workers=None): """ results = [] - # Use ThreadPoolExecutor with max_workers if provided + # Uses ThreadPoolExecutor with max_workers if provided with ThreadPoolExecutor(max_workers=max_workers) as executor: # Submit all metrics for parallel execution future_to_metric = { @@ -58,31 +57,41 @@ def evaluate(self, metric_list=[], config={}, metadata={}, max_workers=None): for metric in metric_list } - # Collect results as they are completed - for future in as_completed(future_to_metric): - metric = future_to_metric[future] - try: - # Get the result of the future - result = future.result() - results.append(result) - except Exception as e: - print(f"Metric {metric} failed with exception: {e}") + # Initialize progress bar + with tqdm(total=len(future_to_metric), desc="Evaluating Metrics") as pbar: + # Collect results as they are completed + for future in as_completed(future_to_metric): + metric = future_to_metric[future] + try: + result = future.result() + results.append(result) + except Exception as e: + print(f"Metric {metric} failed with exception: {e}") + finally: + pbar.update(1) # Update the progress bar after each metric completion # Commit session and close it self.session.commit() self.session.close() - def _execute_and_save_metric(self, metric, config, metadata): """ Execute a metric and save its result. """ start_time = datetime.now() - result = self._execute_metric(metric, config, metadata) + try: + result = self._execute_metric(metric, config, metadata) + except ValueError as e: + print(f"Error executing metric {metric}: {e}") + return None + except Exception as e: + print(f"Unexpected error while executing metric {metric}: {e}") + return None end_time = datetime.now() duration = (end_time - start_time).total_seconds() - self._save_metric_result(metric, result, start_time, end_time, duration) + if result: + self._save_metric_result(metric, result, start_time, end_time, duration) return result def _execute_metric(self, metric, config, metadata): @@ -109,15 +118,15 @@ def _execute_metric(self, metric, config, metadata): ) else: raise ValueError("provided metric name is not supported.") - + def _save_metric_result(self, metric, result, start_time, end_time, duration): metric_entry = MetricModel( trace_id=self.trace_id, metric_name=metric, - score = result['result']['score'], - reason = result['result']['reason'], - result_detail = result, - config = result['config'], + score=result['result']['score'], + reason=result['result']['reason'], + result_detail=result, + config=result['config'], start_time=start_time, end_time=end_time, duration=duration @@ -130,10 +139,10 @@ def get_results(self): for result in results: result_dict = { 'metric_name': result.metric_name, - 'score' : result.score, - 'reason' : result.reason, + 'score': result.score, + 'reason': result.reason, 'result_detail': result.result_detail, - 'config' : result.config, + 'config': result.config, 'start_time': result.start_time.isoformat() if result.start_time else None, 'end_time': result.end_time.isoformat() if result.end_time else None, 'duration': result.duration @@ -249,4 +258,4 @@ def parse_json_field(self, field): return field elif isinstance(field, (list, dict)): return field - return field + return field \ No newline at end of file From 58897b17f06a1f6197a015241e94166f86b65315 Mon Sep 17 00:00:00 2001 From: Abhishek Chauhan Date: Tue, 19 Nov 2024 16:35:21 +0530 Subject: [PATCH 4/5] max_evaluations_per_thread added + updated docs with example --- agentneo/evaluation/evaluation.py | 79 +++++++-- ai_travel_agent.py | 7 +- docs/content/metrics/configuration.md | 3 +- docs/content/metrics/overview.md | 2 +- docs/content/metrics/supported-metrics.md | 8 +- tests/test_agentneo.py | 203 +++++++++++++++------- 6 files changed, 214 insertions(+), 88 deletions(-) diff --git a/agentneo/evaluation/evaluation.py b/agentneo/evaluation/evaluation.py index 0fa1623c..800407fa 100644 --- a/agentneo/evaluation/evaluation.py +++ b/agentneo/evaluation/evaluation.py @@ -5,7 +5,9 @@ from datetime import datetime import json import ast +import math from tqdm import tqdm +import os from ..data import ( ProjectInfoModel, @@ -36,7 +38,14 @@ def __init__(self, session, trace_id): self.trace_data = self.get_trace_data() - def evaluate(self, metric_list=[], config={}, metadata={}, max_workers=None): + @staticmethod + def chunk_metrics(metrics, chunk_size): + """Yield successive chunks of the metrics list.""" + for i in range(0, len(metrics), chunk_size): + yield metrics[i:i + chunk_size] + + + def evaluate(self, metric_list=[], config={}, metadata={}, max_workers=None, max_evaluations_per_thread=None): """ Evaluate a list of metrics in parallel with progress tracking. @@ -45,35 +54,67 @@ def evaluate(self, metric_list=[], config={}, metadata={}, max_workers=None): - config: Configuration settings for the evaluation. - metadata: Metadata for the evaluation. - max_workers: The maximum number of workers to use for parallel execution. - If None, it will use the default number of workers based on the system. + - max_evaluations_per_thread: Limit the number of evaluations a single thread handles. """ + if not metric_list: + raise ValueError("The metric list cannot be empty.") + + # Set default values for max_workers + if max_workers is None or max_workers <= 0: + max_workers = os.cpu_count() # Use all available CPU threads + + print( + f"\nParallel Processing Configuration:" + f"\n - max_workers: {max_workers}" + ) + # Ensure max_workers doesn't exceed the number of metrics + max_workers = min(max_workers, len(metric_list)) + + # Calculate max_evaluations_per_thread, ensuring it's at least 1 + if max_evaluations_per_thread is None or max_evaluations_per_thread <= 0: + max_evaluations_per_thread = max(1, math.ceil(len(metric_list) / max_workers)) + + print( + f" - max_evaluations_per_thread: {max_evaluations_per_thread}" + ) + + metric_chunks = list(self.chunk_metrics(metric_list, max_evaluations_per_thread)) results = [] - # Uses ThreadPoolExecutor with max_workers if provided with ThreadPoolExecutor(max_workers=max_workers) as executor: - # Submit all metrics for parallel execution - future_to_metric = { - executor.submit(self._execute_and_save_metric, metric, config, metadata): metric - for metric in metric_list + future_to_chunk = { + executor.submit(self._process_metric_chunk, chunk, config, metadata): chunk + for chunk in metric_chunks } - - # Initialize progress bar - with tqdm(total=len(future_to_metric), desc="Evaluating Metrics") as pbar: - # Collect results as they are completed - for future in as_completed(future_to_metric): - metric = future_to_metric[future] + + with tqdm(total=len(metric_list), desc="Evaluating Metrics") as pbar: + for future in as_completed(future_to_chunk): + chunk = future_to_chunk[future] try: - result = future.result() - results.append(result) + chunk_results = future.result() + results.extend(chunk_results) except Exception as e: - print(f"Metric {metric} failed with exception: {e}") + print(f"Chunk {chunk} failed with exception: {e}") finally: - pbar.update(1) # Update the progress bar after each metric completion - - # Commit session and close it + pbar.update(len(chunk)) + self.session.commit() self.session.close() + + + def _process_metric_chunk(self, chunk, config, metadata): + """ + Process a chunk of metrics. + """ + results = [] + for metric in chunk: + result = self._execute_and_save_metric(metric, config, metadata) + if result: + results.append(result) + return results + + def _execute_and_save_metric(self, metric, config, metadata): """ Execute a metric and save its result. diff --git a/ai_travel_agent.py b/ai_travel_agent.py index 621c0d7d..a6323260 100644 --- a/ai_travel_agent.py +++ b/ai_travel_agent.py @@ -15,7 +15,7 @@ neo_session = AgentNeo(session_name="ai_travel_agent_session22") -project_name = "ai_travel_agent_demo22" +project_name = "ai_travel_agent_demo" try: neo_session.create_project(project_name=project_name) @@ -211,7 +211,10 @@ def travel_agent(): exe.evaluate(metric_list=['goal_decomposition_efficiency', 'goal_fulfillment_rate', 'tool_call_correctness_rate', - 'tool_call_success_rate']) + 'tool_call_success_rate'], + max_workers=2, + max_evaluations_per_thread=2 + ) # print the performance result # metric_results = exe.get_results() # print(metric_results) diff --git a/docs/content/metrics/configuration.md b/docs/content/metrics/configuration.md index eb207d92..cd652313 100644 --- a/docs/content/metrics/configuration.md +++ b/docs/content/metrics/configuration.md @@ -25,7 +25,8 @@ exe.evaluate( metric_list=['metric_name'], config=config, metadata=metadata - max_workers = 4 + max_workers = 4, + max_evaluations_per_thread=2 ) ``` diff --git a/docs/content/metrics/overview.md b/docs/content/metrics/overview.md index 75258687..080f1415 100644 --- a/docs/content/metrics/overview.md +++ b/docs/content/metrics/overview.md @@ -18,7 +18,7 @@ from agentneo import Evaluation exe = Evaluation(session=neo_session, trace_id=tracer.trace_id) # Run evaluation -exe.evaluate(metric_list=['goal_decomposition_efficiency'], max_workers = 4 ) +exe.evaluate(metric_list=['goal_decomposition_efficiency'], max_workers = 4 ,max_evaluations_per_thread=2 ) # Get results results = exe.get_results() diff --git a/docs/content/metrics/supported-metrics.md b/docs/content/metrics/supported-metrics.md index 3420eed1..17655194 100644 --- a/docs/content/metrics/supported-metrics.md +++ b/docs/content/metrics/supported-metrics.md @@ -58,7 +58,10 @@ exe.evaluate(metric_list=['tool_call_success_rate']) ## Using Multiple Metrics ```python -# Evaluate multiple metrics together +# Evaluate multiple metrics together by specifying: +# - "max_workers": Maximum number of workers to be used simultaneously. +# - "max_evaluations_per_thread": Maximum evaluations to be performed in a single thread. + exe.evaluate( metric_list=[ 'goal_decomposition_efficiency', @@ -66,6 +69,7 @@ exe.evaluate( 'tool_call_correctness_rate', 'tool_call_success_rate' ], - max_workers = 4 + max_workers = 4 , + max_evaluations_per_thread=2 ) ``` \ No newline at end of file diff --git a/tests/test_agentneo.py b/tests/test_agentneo.py index f48ce590..c10d5e77 100644 --- a/tests/test_agentneo.py +++ b/tests/test_agentneo.py @@ -1,74 +1,114 @@ import pytest +from datetime import datetime +from unittest import mock +from sqlalchemy import create_engine, inspect from agentneo.agentneo import AgentNeo -from sqlalchemy import create_engine +from agentneo.evaluation.evaluation import Evaluation +from agentneo.data.data_models import ( + TraceModel, + MetricModel, + ProjectInfoModel, + SystemInfoModel, + Base +) +import json from sqlalchemy.orm import sessionmaker -from unittest import mock -from concurrent.futures import ThreadPoolExecutor -import time - - -def mock_task(id): - """Mock task for testing, just a dummy task.""" - time.sleep(1) - return f"Task {id} completed" - -@pytest.fixture(scope="module") +@pytest.fixture(scope="session") def setup_engine(): """Fixture to create a temporary SQLite engine for testing.""" engine = create_engine("sqlite:///:memory:") + Base.metadata.drop_all(engine) + + # Ensure all models are registered + from agentneo.data import data_models + + Base.metadata.create_all(engine) AgentNeo.Session = sessionmaker(bind=engine) AgentNeo.engine = engine - # Create all tables - from agentneo.data.data_models import Base - Base.metadata.create_all(engine) + + # Verify tables were created + inspector = inspect(engine) + tables = inspector.get_table_names() + assert 'traces' in tables, f"Tables created: {tables}" + yield engine - @pytest.fixture def agentneo_instance(setup_engine): """Fixture to provide a clean instance of AgentNeo for each test.""" return AgentNeo() - @pytest.fixture -def executor(): - """Fixture for creating a ThreadPoolExecutor.""" - with ThreadPoolExecutor(max_workers=3) as executor: - yield executor +def sample_trace_data(setup_engine, agentneo_instance): + """Create sample trace data in the database""" + session = AgentNeo.Session() + + # Create or connect to test project + project_name = "Test Project" + try: + agentneo_instance.create_project(project_name) + except ValueError: + agentneo_instance.connect_project(project_name) + + # Create trace + trace = TraceModel( + project_id=agentneo_instance.project_id, + start_time=datetime.now(), + end_time=datetime.now() + ) + session.add(trace) + session.flush() + + # Create system info + system_info = SystemInfoModel( + project_id=agentneo_instance.project_id, + trace_id=trace.id, + os_name="Test OS", + os_version="1.0", + python_version="3.8", + cpu_info="Test CPU", + memory_total=8589934592, + gpu_info="Test GPU", + disk_info=json.dumps({"total": 256000000000}), + installed_packages=json.dumps({"python": "3.8", "pytest": "7.0"}) + ) + session.add(system_info) + session.commit() + + return trace.id +@pytest.fixture +def evaluation_instance(agentneo_instance, sample_trace_data): + """Create an Evaluation instance for testing""" + return Evaluation(agentneo_instance, sample_trace_data) +# AgentNeo Tests def test_connect_project(agentneo_instance): """Test connecting to an existing project.""" project_name = "Connect Project" - # Try creating the project; if it already exists, pass the exception try: agentneo_instance.create_project(project_name) except ValueError as e: assert str(e) == f"Project '{project_name}' already exists." - # Connect to the project (should succeed if already exists or just created) connected_id = agentneo_instance.connect_project(project_name) assert connected_id == agentneo_instance.project_id assert agentneo_instance.project_name == project_name - def test_create_project(agentneo_instance): """Test creating a new project.""" project_name = "Test Project" - # Create project try: project_id = agentneo_instance.create_project(project_name) assert project_id is not None assert agentneo_instance.project_name == project_name except: - # Test for duplicate project creation with pytest.raises(ValueError, match=f"Project '{project_name}' already exists."): agentneo_instance.create_project(project_name) - def test_list_projects(agentneo_instance): """Test listing projects.""" project_names = ["Project 1", "Project 2", "Project 3"] @@ -79,54 +119,91 @@ def test_list_projects(agentneo_instance): except ValueError as e: assert str(e) == f"Project '{name}' already exists." - # List projects projects = AgentNeo.list_projects() assert len(projects) >= len(project_names) - # Verify projects are listed project_names_in_list = [p["name"] for p in projects] for name in project_names: assert name in project_names_in_list - # Test limiting the number of listed projects limited_projects = AgentNeo.list_projects(num_projects=2) assert len(limited_projects) == 2 - -def test_parallel_execution(agentneo_instance, executor): - """Test parallel execution with ThreadPoolExecutor, simulating metric evaluations.""" - metric_list = ["metric_1", "metric_2", "metric_3", "metric_4", "metric_5"] - - # Submit multiple tasks for each metric evaluation (simulating work) - futures = [executor.submit(mock_task, i) for i in range(len(metric_list))] - - # Ensure that all tasks complete - results = [future.result() for future in futures] - - # Assert that all tasks completed as expected - assert len(results) == len(metric_list) - assert all(f"Task {i} completed" for i in range(len(metric_list))) - - -def test_max_workers(agentneo_instance, executor): - """Test the max workers configuration in ThreadPoolExecutor.""" - - # Submit 5 tasks, but only 3 workers should be running concurrently - start_time = time.time() - futures = [executor.submit(mock_task, i) for i in range(5)] - - # Wait for all tasks to complete - for future in futures: - future.result() - - end_time = time.time() - - # Total time should be around 3 seconds since we have a max of 3 workers running concurrently - assert end_time - start_time < 5 # It should not exceed 5 seconds - - @mock.patch("agentneo.server.dashboard.launch_dashboard") def test_launch_dashboard(mock_launch_dashboard): """Test launching the dashboard.""" AgentNeo.launch_dashboard(port=4000) mock_launch_dashboard.assert_called_once_with(4000) + +# Evaluation Tests +def test_evaluation_initialization(evaluation_instance): + """Test proper initialization of Evaluation class""" + assert evaluation_instance.project_name == "Test Project" + assert evaluation_instance.trace_id is not None + assert evaluation_instance.engine is not None + assert evaluation_instance.session is not None + +def test_chunk_metrics(): + """Test the chunk_metrics static method""" + metrics = ['metric1', 'metric2', 'metric3', 'metric4', 'metric5'] + chunks = list(Evaluation.chunk_metrics(metrics, 2)) + assert len(chunks) == 3 + assert chunks == [['metric1', 'metric2'], ['metric3', 'metric4'], ['metric5']] + +def test_evaluate_empty_metric_list(evaluation_instance): + """Test evaluate method with empty metric list""" + with pytest.raises(ValueError, match="The metric list cannot be empty."): + evaluation_instance.evaluate([]) + +def test_evaluate_with_invalid_metric(evaluation_instance): + """Test evaluate method with invalid metric name""" + result = evaluation_instance.evaluate(['invalid_metric']) + +def test_evaluate_with_valid_metrics(evaluation_instance): + """Test evaluate method with valid metrics""" + metrics = ['goal_decomposition_efficiency', 'goal_fulfillment_rate'] + evaluation_instance.evaluate( + metric_list=metrics, + config={}, + metadata={}, + max_workers=2, + max_evaluations_per_thread=1 + ) + + results = evaluation_instance.get_results() + assert isinstance(results, list) + +def test_get_results(setup_engine, evaluation_instance): + """Test getting evaluation results""" + metric = MetricModel( + trace_id=evaluation_instance.trace_id, + metric_name="test_metric", + score=0.8, + reason="Test reason", + result_detail={"detail": "test"}, + config={}, + start_time=datetime.now(), + end_time=datetime.now(), + duration=1.0 + ) + evaluation_instance.session.add(metric) + evaluation_instance.session.commit() + + results = evaluation_instance.get_results() + assert len(results) > 0 + assert isinstance(results[0], dict) + assert 'metric_name' in results[0] + assert 'score' in results[0] + +def test_get_trace_data_invalid_id(agentneo_instance): + """Test getting trace data with invalid trace ID""" + with pytest.raises(ValueError): + Evaluation(agentneo_instance, -1) + +def test_parallel_processing_configuration(setup_engine, evaluation_instance): + """Test parallel processing configuration""" + metrics = ['goal_decomposition_efficiency'] * 5 + + evaluation_instance.evaluate(metrics, max_workers=2) + evaluation_instance.evaluate(metrics, max_workers=2, max_evaluations_per_thread=2) + evaluation_instance.evaluate(metrics, max_workers=-1, max_evaluations_per_thread=-1) \ No newline at end of file From 8c881198208c69c36cd543fa6918c8983da57013 Mon Sep 17 00:00:00 2001 From: Vijay Chaurasiya <98342941+vijayc9@users.noreply.github.com> Date: Thu, 16 Jan 2025 17:43:10 +0530 Subject: [PATCH 5/5] Delete tests/test_agentneo.py --- tests/test_agentneo.py | 209 ----------------------------------------- 1 file changed, 209 deletions(-) delete mode 100644 tests/test_agentneo.py diff --git a/tests/test_agentneo.py b/tests/test_agentneo.py deleted file mode 100644 index c10d5e77..00000000 --- a/tests/test_agentneo.py +++ /dev/null @@ -1,209 +0,0 @@ -import pytest -from datetime import datetime -from unittest import mock -from sqlalchemy import create_engine, inspect -from agentneo.agentneo import AgentNeo -from agentneo.evaluation.evaluation import Evaluation -from agentneo.data.data_models import ( - TraceModel, - MetricModel, - ProjectInfoModel, - SystemInfoModel, - Base -) -import json -from sqlalchemy.orm import sessionmaker - -@pytest.fixture(scope="session") -def setup_engine(): - """Fixture to create a temporary SQLite engine for testing.""" - engine = create_engine("sqlite:///:memory:") - Base.metadata.drop_all(engine) - - # Ensure all models are registered - from agentneo.data import data_models - - Base.metadata.create_all(engine) - AgentNeo.Session = sessionmaker(bind=engine) - AgentNeo.engine = engine - - # Verify tables were created - inspector = inspect(engine) - tables = inspector.get_table_names() - assert 'traces' in tables, f"Tables created: {tables}" - - yield engine - -@pytest.fixture -def agentneo_instance(setup_engine): - """Fixture to provide a clean instance of AgentNeo for each test.""" - return AgentNeo() - -@pytest.fixture -def sample_trace_data(setup_engine, agentneo_instance): - """Create sample trace data in the database""" - session = AgentNeo.Session() - - # Create or connect to test project - project_name = "Test Project" - try: - agentneo_instance.create_project(project_name) - except ValueError: - agentneo_instance.connect_project(project_name) - - # Create trace - trace = TraceModel( - project_id=agentneo_instance.project_id, - start_time=datetime.now(), - end_time=datetime.now() - ) - session.add(trace) - session.flush() - - # Create system info - system_info = SystemInfoModel( - project_id=agentneo_instance.project_id, - trace_id=trace.id, - os_name="Test OS", - os_version="1.0", - python_version="3.8", - cpu_info="Test CPU", - memory_total=8589934592, - gpu_info="Test GPU", - disk_info=json.dumps({"total": 256000000000}), - installed_packages=json.dumps({"python": "3.8", "pytest": "7.0"}) - ) - session.add(system_info) - session.commit() - - return trace.id - -@pytest.fixture -def evaluation_instance(agentneo_instance, sample_trace_data): - """Create an Evaluation instance for testing""" - return Evaluation(agentneo_instance, sample_trace_data) - -# AgentNeo Tests -def test_connect_project(agentneo_instance): - """Test connecting to an existing project.""" - project_name = "Connect Project" - - try: - agentneo_instance.create_project(project_name) - except ValueError as e: - assert str(e) == f"Project '{project_name}' already exists." - - connected_id = agentneo_instance.connect_project(project_name) - assert connected_id == agentneo_instance.project_id - assert agentneo_instance.project_name == project_name - -def test_create_project(agentneo_instance): - """Test creating a new project.""" - project_name = "Test Project" - - try: - project_id = agentneo_instance.create_project(project_name) - assert project_id is not None - assert agentneo_instance.project_name == project_name - except: - with pytest.raises(ValueError, match=f"Project '{project_name}' already exists."): - agentneo_instance.create_project(project_name) - -def test_list_projects(agentneo_instance): - """Test listing projects.""" - project_names = ["Project 1", "Project 2", "Project 3"] - - for name in project_names: - try: - agentneo_instance.create_project(name) - except ValueError as e: - assert str(e) == f"Project '{name}' already exists." - - projects = AgentNeo.list_projects() - assert len(projects) >= len(project_names) - - project_names_in_list = [p["name"] for p in projects] - for name in project_names: - assert name in project_names_in_list - - limited_projects = AgentNeo.list_projects(num_projects=2) - assert len(limited_projects) == 2 - -@mock.patch("agentneo.server.dashboard.launch_dashboard") -def test_launch_dashboard(mock_launch_dashboard): - """Test launching the dashboard.""" - AgentNeo.launch_dashboard(port=4000) - mock_launch_dashboard.assert_called_once_with(4000) - -# Evaluation Tests -def test_evaluation_initialization(evaluation_instance): - """Test proper initialization of Evaluation class""" - assert evaluation_instance.project_name == "Test Project" - assert evaluation_instance.trace_id is not None - assert evaluation_instance.engine is not None - assert evaluation_instance.session is not None - -def test_chunk_metrics(): - """Test the chunk_metrics static method""" - metrics = ['metric1', 'metric2', 'metric3', 'metric4', 'metric5'] - chunks = list(Evaluation.chunk_metrics(metrics, 2)) - assert len(chunks) == 3 - assert chunks == [['metric1', 'metric2'], ['metric3', 'metric4'], ['metric5']] - -def test_evaluate_empty_metric_list(evaluation_instance): - """Test evaluate method with empty metric list""" - with pytest.raises(ValueError, match="The metric list cannot be empty."): - evaluation_instance.evaluate([]) - -def test_evaluate_with_invalid_metric(evaluation_instance): - """Test evaluate method with invalid metric name""" - result = evaluation_instance.evaluate(['invalid_metric']) - -def test_evaluate_with_valid_metrics(evaluation_instance): - """Test evaluate method with valid metrics""" - metrics = ['goal_decomposition_efficiency', 'goal_fulfillment_rate'] - evaluation_instance.evaluate( - metric_list=metrics, - config={}, - metadata={}, - max_workers=2, - max_evaluations_per_thread=1 - ) - - results = evaluation_instance.get_results() - assert isinstance(results, list) - -def test_get_results(setup_engine, evaluation_instance): - """Test getting evaluation results""" - metric = MetricModel( - trace_id=evaluation_instance.trace_id, - metric_name="test_metric", - score=0.8, - reason="Test reason", - result_detail={"detail": "test"}, - config={}, - start_time=datetime.now(), - end_time=datetime.now(), - duration=1.0 - ) - evaluation_instance.session.add(metric) - evaluation_instance.session.commit() - - results = evaluation_instance.get_results() - assert len(results) > 0 - assert isinstance(results[0], dict) - assert 'metric_name' in results[0] - assert 'score' in results[0] - -def test_get_trace_data_invalid_id(agentneo_instance): - """Test getting trace data with invalid trace ID""" - with pytest.raises(ValueError): - Evaluation(agentneo_instance, -1) - -def test_parallel_processing_configuration(setup_engine, evaluation_instance): - """Test parallel processing configuration""" - metrics = ['goal_decomposition_efficiency'] * 5 - - evaluation_instance.evaluate(metrics, max_workers=2) - evaluation_instance.evaluate(metrics, max_workers=2, max_evaluations_per_thread=2) - evaluation_instance.evaluate(metrics, max_workers=-1, max_evaluations_per_thread=-1) \ No newline at end of file