Skip to content

Commit

Permalink
Merge pull request #107 from abhishekchauhan15/evaluation_enhancement…
Browse files Browse the repository at this point in the history
…s/parallel_processing

Added parallel processing for metric evaluations + Progress Bar (Error handling done properly)
  • Loading branch information
vijayc9 authored Jan 16, 2025
2 parents b9b5cc4 + 8c88119 commit d011c6f
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 112 deletions.
135 changes: 107 additions & 28 deletions agentneo/evaluation/evaluation.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.exc import SQLAlchemyError
from typing import Optional
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
import json
import ast
import math
from tqdm import tqdm
import os

from ..data import (
ProjectInfoModel,
Expand All @@ -23,8 +26,6 @@
execute_learning_adaptability_rate_metric,
)

from datetime import datetime

class Evaluation:
def __init__(self, session, trace_id):
self.user_session = session
Expand All @@ -38,18 +39,103 @@ def __init__(self, session, trace_id):

self.trace_data = self.get_trace_data()

def evaluate(self, metric_list=[], config={}, metadata={}):
for metric in metric_list:
start_time = datetime.now()
result = self._execute_metric(metric, config, metadata)
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
@staticmethod
def chunk_metrics(metrics, chunk_size):
"""Yield successive chunks of the metrics list."""
for i in range(0, len(metrics), chunk_size):
yield metrics[i:i + chunk_size]

self._save_metric_result(metric, result, start_time, end_time, duration)

def evaluate(self, metric_list=[], config={}, metadata={}, max_workers=None, max_evaluations_per_thread=None):
"""
Evaluate a list of metrics in parallel with progress tracking.
Parameters:
- metric_list: List of metrics to evaluate.
- config: Configuration settings for the evaluation.
- metadata: Metadata for the evaluation.
- max_workers: The maximum number of workers to use for parallel execution.
- max_evaluations_per_thread: Limit the number of evaluations a single thread handles.
"""
if not metric_list:
raise ValueError("The metric list cannot be empty.")

# Set default values for max_workers
if max_workers is None or max_workers <= 0:
max_workers = os.cpu_count() # Use all available CPU threads

print(
f"\nParallel Processing Configuration:"
f"\n - max_workers: {max_workers}"
)
# Ensure max_workers doesn't exceed the number of metrics
max_workers = min(max_workers, len(metric_list))

# Calculate max_evaluations_per_thread, ensuring it's at least 1
if max_evaluations_per_thread is None or max_evaluations_per_thread <= 0:
max_evaluations_per_thread = max(1, math.ceil(len(metric_list) / max_workers))

print(
f" - max_evaluations_per_thread: {max_evaluations_per_thread}"
)

metric_chunks = list(self.chunk_metrics(metric_list, max_evaluations_per_thread))
results = []

with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_chunk = {
executor.submit(self._process_metric_chunk, chunk, config, metadata): chunk
for chunk in metric_chunks
}

with tqdm(total=len(metric_list), desc="Evaluating Metrics") as pbar:
for future in as_completed(future_to_chunk):
chunk = future_to_chunk[future]
try:
chunk_results = future.result()
results.extend(chunk_results)
except Exception as e:
print(f"Chunk {chunk} failed with exception: {e}")
finally:
pbar.update(len(chunk))

self.session.commit()
self.session.close()



def _process_metric_chunk(self, chunk, config, metadata):
"""
Process a chunk of metrics.
"""
results = []
for metric in chunk:
result = self._execute_and_save_metric(metric, config, metadata)
if result:
results.append(result)
return results


def _execute_and_save_metric(self, metric, config, metadata):
"""
Execute a metric and save its result.
"""
start_time = datetime.now()
try:
result = self._execute_metric(metric, config, metadata)
except ValueError as e:
print(f"Error executing metric {metric}: {e}")
return None
except Exception as e:
print(f"Unexpected error while executing metric {metric}: {e}")
return None
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()

if result:
self._save_metric_result(metric, result, start_time, end_time, duration)
return result

def _execute_metric(self, metric, config, metadata):
if metric == 'goal_decomposition_efficiency':
return execute_goal_decomposition_efficiency_metric(
Expand Down Expand Up @@ -79,16 +165,15 @@ def _execute_metric(self, metric, config, metadata):
)
else:
raise ValueError("provided metric name is not supported.")


def _save_metric_result(self, metric, result, start_time, end_time, duration):
metric_entry = MetricModel(
trace_id=self.trace_id,
metric_name=metric,
score = result['result']['score'],
reason = result['result']['reason'],
result_detail = result,
config = result['config'],
score=result['result']['score'],
reason=result['result']['reason'],
result_detail=result,
config=result['config'],
start_time=start_time,
end_time=end_time,
duration=duration
Expand All @@ -101,10 +186,10 @@ def get_results(self):
for result in results:
result_dict = {
'metric_name': result.metric_name,
'score' : result.score,
'reason' : result.reason,
'score': result.score,
'reason': result.reason,
'result_detail': result.result_detail,
'config' : result.config,
'config': result.config,
'start_time': result.start_time.isoformat() if result.start_time else None,
'end_time': result.end_time.isoformat() if result.end_time else None,
'duration': result.duration
Expand All @@ -128,9 +213,8 @@ def get_trace_data(self):

return json_data[0]

except:
raise ValueError("Unable to load the trace data.")

except Exception as e:
raise ValueError(f"Unable to load the trace data: {e}")

def serialize_trace(self, trace):
"""Convert a TraceModel object into a dictionary, including all related objects."""
Expand All @@ -155,12 +239,8 @@ def serialize_trace(self, trace):
{
'id': agent_call.id,
'name': agent_call.name,
# 'input_parameters': self.parse_json_field(agent_call.input_parameters),
# 'output': self.parse_json_field(agent_call.output),
'start_time': agent_call.start_time.isoformat() if agent_call.start_time else None,
'end_time': agent_call.end_time.isoformat() if agent_call.end_time else None,
# 'duration': agent_call.duration,
# 'memory_used': agent_call.memory_used
'llm_call_ids': self.parse_json_field(agent_call.llm_call_ids),
'tool_call_ids': self.parse_json_field(agent_call.tool_call_ids),
'user_interaction_ids': self.parse_json_field(agent_call.user_interaction_ids),
Expand Down Expand Up @@ -212,7 +292,6 @@ def serialize_trace(self, trace):
} for interaction in trace.user_interactions
]
}


def parse_json_field(self, field):
"""Parse a JSON string field into a Python object, if necessary."""
Expand All @@ -226,4 +305,4 @@ def parse_json_field(self, field):
return field
elif isinstance(field, (list, dict)):
return field
return field
return field
7 changes: 5 additions & 2 deletions ai_travel_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

neo_session = AgentNeo(session_name="ai_travel_agent_session22")

project_name = "ai_travel_agent_demo22"
project_name = "ai_travel_agent_demo"

try:
neo_session.create_project(project_name=project_name)
Expand Down Expand Up @@ -211,7 +211,10 @@ def travel_agent():
exe.evaluate(metric_list=['goal_decomposition_efficiency',
'goal_fulfillment_rate',
'tool_call_correctness_rate',
'tool_call_success_rate'])
'tool_call_success_rate'],
max_workers=2,
max_evaluations_per_thread=2
)
# print the performance result
# metric_results = exe.get_results()
# print(metric_results)
Expand Down
2 changes: 2 additions & 0 deletions docs/content/metrics/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ exe.evaluate(
metric_list=['metric_name'],
config=config,
metadata=metadata
max_workers = 4,
max_evaluations_per_thread=2
)
```

Expand Down
2 changes: 1 addition & 1 deletion docs/content/metrics/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ from agentneo import Evaluation
exe = Evaluation(session=neo_session, trace_id=tracer.trace_id)

# Run evaluation
exe.evaluate(metric_list=['goal_decomposition_efficiency'])
exe.evaluate(metric_list=['goal_decomposition_efficiency'], max_workers = 4 ,max_evaluations_per_thread=2 )

# Get results
results = exe.get_results()
Expand Down
9 changes: 7 additions & 2 deletions docs/content/metrics/supported-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,19 @@ exe.evaluate(metric_list=['tool_call_success_rate'])

## Using Multiple Metrics
```python
# Evaluate multiple metrics together
# Evaluate multiple metrics together by specifying:
# - "max_workers": Maximum number of workers to be used simultaneously.
# - "max_evaluations_per_thread": Maximum evaluations to be performed in a single thread.

exe.evaluate(
metric_list=[
'goal_decomposition_efficiency',
'goal_fulfillment_rate',
'tool_call_correctness_rate',
'learning_adaptability_rate'
'tool_call_success_rate'
]
],
max_workers = 4 ,
max_evaluations_per_thread=2
)
```
79 changes: 0 additions & 79 deletions tests/test_agentneo.py

This file was deleted.

0 comments on commit d011c6f

Please sign in to comment.