Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added parallel processing for metric evaluations + Progress Bar (Error handling done properly) #107

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 107 additions & 28 deletions agentneo/evaluation/evaluation.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.exc import SQLAlchemyError
from typing import Optional
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
import json
import ast
import math
from tqdm import tqdm
import os

from ..data import (
ProjectInfoModel,
Expand All @@ -22,8 +25,6 @@
execute_plan_adaptibility_metric,
)

from datetime import datetime

class Evaluation:
def __init__(self, session, trace_id):
self.user_session = session
Expand All @@ -37,18 +38,103 @@ def __init__(self, session, trace_id):

self.trace_data = self.get_trace_data()

def evaluate(self, metric_list=[], config={}, metadata={}):
for metric in metric_list:
start_time = datetime.now()
result = self._execute_metric(metric, config, metadata)
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
@staticmethod
def chunk_metrics(metrics, chunk_size):
"""Yield successive chunks of the metrics list."""
for i in range(0, len(metrics), chunk_size):
yield metrics[i:i + chunk_size]

self._save_metric_result(metric, result, start_time, end_time, duration)

def evaluate(self, metric_list=[], config={}, metadata={}, max_workers=None, max_evaluations_per_thread=None):
"""
Evaluate a list of metrics in parallel with progress tracking.

Parameters:
- metric_list: List of metrics to evaluate.
- config: Configuration settings for the evaluation.
- metadata: Metadata for the evaluation.
- max_workers: The maximum number of workers to use for parallel execution.
- max_evaluations_per_thread: Limit the number of evaluations a single thread handles.
"""
if not metric_list:
raise ValueError("The metric list cannot be empty.")

# Set default values for max_workers
if max_workers is None or max_workers <= 0:
max_workers = os.cpu_count() # Use all available CPU threads

print(
f"\nParallel Processing Configuration:"
f"\n - max_workers: {max_workers}"
)
# Ensure max_workers doesn't exceed the number of metrics
max_workers = min(max_workers, len(metric_list))

# Calculate max_evaluations_per_thread, ensuring it's at least 1
if max_evaluations_per_thread is None or max_evaluations_per_thread <= 0:
max_evaluations_per_thread = max(1, math.ceil(len(metric_list) / max_workers))

print(
f" - max_evaluations_per_thread: {max_evaluations_per_thread}"
)

metric_chunks = list(self.chunk_metrics(metric_list, max_evaluations_per_thread))
results = []

with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_chunk = {
executor.submit(self._process_metric_chunk, chunk, config, metadata): chunk
for chunk in metric_chunks
}

with tqdm(total=len(metric_list), desc="Evaluating Metrics") as pbar:
for future in as_completed(future_to_chunk):
chunk = future_to_chunk[future]
try:
chunk_results = future.result()
results.extend(chunk_results)
except Exception as e:
print(f"Chunk {chunk} failed with exception: {e}")
finally:
pbar.update(len(chunk))

self.session.commit()
self.session.close()



def _process_metric_chunk(self, chunk, config, metadata):
"""
Process a chunk of metrics.
"""
results = []
for metric in chunk:
result = self._execute_and_save_metric(metric, config, metadata)
if result:
results.append(result)
return results


def _execute_and_save_metric(self, metric, config, metadata):
"""
Execute a metric and save its result.
"""
start_time = datetime.now()
try:
result = self._execute_metric(metric, config, metadata)
except ValueError as e:
print(f"Error executing metric {metric}: {e}")
return None
except Exception as e:
print(f"Unexpected error while executing metric {metric}: {e}")
return None
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()

if result:
self._save_metric_result(metric, result, start_time, end_time, duration)
return result

def _execute_metric(self, metric, config, metadata):
if metric == 'goal_decomposition_efficiency':
return execute_goal_decomposition_efficiency_metric(
Expand All @@ -73,16 +159,15 @@ def _execute_metric(self, metric, config, metadata):
)
else:
raise ValueError("provided metric name is not supported.")


def _save_metric_result(self, metric, result, start_time, end_time, duration):
metric_entry = MetricModel(
trace_id=self.trace_id,
metric_name=metric,
score = result['result']['score'],
reason = result['result']['reason'],
result_detail = result,
config = result['config'],
score=result['result']['score'],
reason=result['result']['reason'],
result_detail=result,
config=result['config'],
start_time=start_time,
end_time=end_time,
duration=duration
Expand All @@ -95,10 +180,10 @@ def get_results(self):
for result in results:
result_dict = {
'metric_name': result.metric_name,
'score' : result.score,
'reason' : result.reason,
'score': result.score,
'reason': result.reason,
'result_detail': result.result_detail,
'config' : result.config,
'config': result.config,
'start_time': result.start_time.isoformat() if result.start_time else None,
'end_time': result.end_time.isoformat() if result.end_time else None,
'duration': result.duration
Expand All @@ -122,9 +207,8 @@ def get_trace_data(self):

return json_data[0]

except:
raise ValueError("Unable to load the trace data.")

except Exception as e:
raise ValueError(f"Unable to load the trace data: {e}")

def serialize_trace(self, trace):
"""Convert a TraceModel object into a dictionary, including all related objects."""
Expand All @@ -149,12 +233,8 @@ def serialize_trace(self, trace):
{
'id': agent_call.id,
'name': agent_call.name,
# 'input_parameters': self.parse_json_field(agent_call.input_parameters),
# 'output': self.parse_json_field(agent_call.output),
'start_time': agent_call.start_time.isoformat() if agent_call.start_time else None,
'end_time': agent_call.end_time.isoformat() if agent_call.end_time else None,
# 'duration': agent_call.duration,
# 'memory_used': agent_call.memory_used
'llm_call_ids': self.parse_json_field(agent_call.llm_call_ids),
'tool_call_ids': self.parse_json_field(agent_call.tool_call_ids),
'user_interaction_ids': self.parse_json_field(agent_call.user_interaction_ids),
Expand Down Expand Up @@ -206,7 +286,6 @@ def serialize_trace(self, trace):
} for interaction in trace.user_interactions
]
}


def parse_json_field(self, field):
"""Parse a JSON string field into a Python object, if necessary."""
Expand All @@ -220,4 +299,4 @@ def parse_json_field(self, field):
return field
elif isinstance(field, (list, dict)):
return field
return field
return field
7 changes: 5 additions & 2 deletions ai_travel_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

neo_session = AgentNeo(session_name="ai_travel_agent_session22")

project_name = "ai_travel_agent_demo22"
project_name = "ai_travel_agent_demo"

try:
neo_session.create_project(project_name=project_name)
Expand Down Expand Up @@ -211,7 +211,10 @@ def travel_agent():
exe.evaluate(metric_list=['goal_decomposition_efficiency',
'goal_fulfillment_rate',
'tool_call_correctness_rate',
'tool_call_success_rate'])
'tool_call_success_rate'],
max_workers=2,
max_evaluations_per_thread=2
)
# print the performance result
# metric_results = exe.get_results()
# print(metric_results)
Expand Down
2 changes: 2 additions & 0 deletions docs/content/metrics/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ exe.evaluate(
metric_list=['metric_name'],
config=config,
metadata=metadata
max_workers = 4,
max_evaluations_per_thread=2
)
```

Expand Down
2 changes: 1 addition & 1 deletion docs/content/metrics/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ from agentneo import Evaluation
exe = Evaluation(session=neo_session, trace_id=tracer.trace_id)

# Run evaluation
exe.evaluate(metric_list=['goal_decomposition_efficiency'])
exe.evaluate(metric_list=['goal_decomposition_efficiency'], max_workers = 4 ,max_evaluations_per_thread=2 )

# Get results
results = exe.get_results()
Expand Down
9 changes: 7 additions & 2 deletions docs/content/metrics/supported-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,18 @@ exe.evaluate(metric_list=['tool_call_success_rate'])

## Using Multiple Metrics
```python
# Evaluate multiple metrics together
# Evaluate multiple metrics together by specifying:
# - "max_workers": Maximum number of workers to be used simultaneously.
# - "max_evaluations_per_thread": Maximum evaluations to be performed in a single thread.

exe.evaluate(
metric_list=[
'goal_decomposition_efficiency',
'goal_fulfillment_rate',
'tool_call_correctness_rate',
'tool_call_success_rate'
]
],
max_workers = 4 ,
max_evaluations_per_thread=2
)
```
79 changes: 0 additions & 79 deletions tests/test_agentneo.py

This file was deleted.