diff --git a/trulens_eval/tests/unit/test_tru.py b/trulens_eval/tests/unit/test_tru.py index 3b0d473a4..4f7c3f61f 100644 --- a/trulens_eval/tests/unit/test_tru.py +++ b/trulens_eval/tests/unit/test_tru.py @@ -5,35 +5,23 @@ from concurrent.futures import Future as FutureClass from concurrent.futures import wait from datetime import datetime -import os from pathlib import Path -import unittest -from unittest import IsolatedAsyncioTestCase from unittest import main from unittest import TestCase from examples.expositional.end2end_apps.custom_app.custom_app import CustomApp -from langchain.callbacks import AsyncIteratorCallbackHandler from langchain.chains import LLMChain from langchain.llms.openai import OpenAI -from langchain.memory import ConversationSummaryBufferMemory from langchain.prompts import PromptTemplate -from langchain.schema.messages import HumanMessage -from tests.unit.test import JSONTestCase from tests.unit.test import optional_test from trulens_eval import Feedback from trulens_eval import Tru from trulens_eval import TruCustomApp -from trulens_eval.feedback.provider.endpoint import Endpoint +from trulens_eval.feedback.provider.hugs import Dummy from trulens_eval.keys import check_keys -from trulens_eval.schema import FeedbackMode, FeedbackResult -from trulens_eval.schema import Record -from trulens_eval.tru_basic_app import TruBasicApp +from trulens_eval.schema import FeedbackResult from trulens_eval.tru_custom_app import TruCustomApp -from trulens_eval.utils.asynchro import sync -from trulens_eval.feedback.provider.hugs import Dummy -from trulens_eval.utils.json import jsonify class TestTru(TestCase): @@ -276,6 +264,8 @@ def test_run_feedback_functions_wait(self): feedbacks = self._create_feedback_functions() + expected_feedback_names = set(f.name for f in feedbacks) + tru = Tru() tru_app = TruCustomApp(app) @@ -292,24 +282,31 @@ def test_run_feedback_functions_wait(self): # Check we get the right number of results. self.assertEqual(len(feedback_results), len(feedbacks)) - # Check that the results are for the feedbacks we submitted. - self.assertEqual(set(tup[0] for tup in feedback_results), set(feedbacks)) + # Check that the results are for the feedbacks we submitted. + self.assertEqual( + set(expected_feedback_names), + set(res.name for res in feedback_results), + "feedback result names do not match requested feedback names" + ) # Check that the structure of returned tuples is correct. - for feedback, result in feedback_results: - self.assertIsInstance(feedback, Feedback) + for result in feedback_results: self.assertIsInstance(result, FeedbackResult) self.assertIsInstance(result.result, float) + # TODO: move tests to test_add_feedbacks. + # Add to db. + tru.add_feedbacks(feedback_results) + # Check that results were added to db. - df, feedback_names = tru.get_records_and_feedback( + df, returned_feedback_names = tru.get_records_and_feedback( app_ids = [tru_app.app_id] ) - # Check we got the right feedback names. + # Check we got the right feedback names from db. self.assertEqual( - set(feedback.name for feedback in feedbacks), - set(feedback_names) + expected_feedback_names, + set(returned_feedback_names) ) def test_run_feedback_functions_nowait(self): @@ -321,6 +318,7 @@ def test_run_feedback_functions_nowait(self): app = self._create_custom() feedbacks = self._create_feedback_functions() + expected_feedback_names = set(f.name for f in feedbacks) tru = Tru() @@ -333,7 +331,7 @@ def test_run_feedback_functions_nowait(self): start_time = datetime.now() - feedback_results = list(tru.run_feedback_functions( + future_feedback_results = list(tru.run_feedback_functions( record=record, feedback_functions=feedbacks, app=tru_app, wait=False )) @@ -341,35 +339,40 @@ def test_run_feedback_functions_nowait(self): # Should return quickly. self.assertLess( - (end_time - start_time).total_seconds(), 1.0, # TODO: get it to return faster + (end_time - start_time).total_seconds(), 2.0, # TODO: get it to return faster "Non-blocking run_feedback_functions did not return fast enough." ) # Check we get the right number of results. - self.assertEqual(len(feedback_results), len(feedbacks)) + self.assertEqual(len(future_feedback_results), len(feedbacks)) - # Check that the results are for the feedbacks we submitted. - self.assertEqual(set(tup[0] for tup in feedback_results), set(feedbacks)) + feedback_results = [] # Check that the structure of returned tuples is correct. - for feedback, future_result in feedback_results: - self.assertIsInstance(feedback, Feedback) + for future_result in future_feedback_results: self.assertIsInstance(future_result, FutureClass) wait([future_result]) result = future_result.result() + self.assertIsInstance(result, FeedbackResult) self.assertIsInstance(result.result, float) + feedback_results.append(result) + + # TODO: move tests to test_add_feedbacks. + # Add to db. + tru.add_feedbacks(feedback_results) + # Check that results were added to db. - df, feedback_names = tru.get_records_and_feedback( + df, returned_feedback_names = tru.get_records_and_feedback( app_ids = [tru_app.app_id] ) # Check we got the right feedback names. self.assertEqual( - set(feedback.name for feedback in feedbacks), - set(feedback_names) + expected_feedback_names, + set(returned_feedback_names) ) def test_reset_database(self): @@ -389,11 +392,13 @@ def test_add_feedback(self): pass def test_add_feedbacks(self): - # TODO + # TODO: move testing from test_run_feedback_functions_wait and + # test_run_feedback_functions_nowait. pass def test_get_records_and_feedback(self): - # Also tested in test_run_feedback_functions_wait. + # Also tested in test_run_feedback_functions_wait and + # test_run_feedback_functions_nowait. # TODO pass diff --git a/trulens_eval/tests/unit/test_tru_chain.py b/trulens_eval/tests/unit/test_tru_chain.py index 966504190..fd32a7315 100644 --- a/trulens_eval/tests/unit/test_tru_chain.py +++ b/trulens_eval/tests/unit/test_tru_chain.py @@ -179,8 +179,6 @@ def test_async_with_task(self): llm = ChatOpenAI(temperature=0.0, streaming=False, cache=False) chain = LLMChain(llm=llm, prompt=prompt) - res1, costs1 = await Endpoint.atrack_all_costs(chain.llm._agenerate, messages=[msg]) - res1, costs1 = sync(Endpoint.atrack_all_costs, test1) async def test2(): diff --git a/trulens_eval/trulens_eval/tru.py b/trulens_eval/trulens_eval/tru.py index 9cc100707..d971f9763 100644 --- a/trulens_eval/trulens_eval/tru.py +++ b/trulens_eval/trulens_eval/tru.py @@ -1,5 +1,5 @@ from collections import defaultdict -from concurrent.futures import as_completed +from concurrent.futures import as_completed, wait from concurrent.futures import TimeoutError from datetime import datetime from datetime import timedelta @@ -278,7 +278,7 @@ def _submit_feedback_functions( for ffunc in feedback_functions: fut: Future[FeedbackResult] = \ - tp.submit(ffunc.run_and_log, app=app, record=record, tru=self) + tp.submit(ffunc.run, app=app, record=record) if on_done is not None: fut.add_done_callback(on_done) @@ -294,8 +294,8 @@ def run_feedback_functions( app: Optional[AppDefinition] = None, wait: bool = True ) -> Union[ - Iterable[Tuple[Feedback, FeedbackResult]], - Iterable[Tuple[Feedback, Future[FeedbackResult]]] + Iterable[FeedbackResult], + Iterable[Future[FeedbackResult]] ]: """ Run a collection of feedback functions and report their result. @@ -314,9 +314,8 @@ def run_feedback_functions( - wait: (bool, optional): If set (default), will wait for results before returning. - Yields tuples of `Feedback` and their `FeedbackResult`, one tuple for - each element of `feedback_functions` potentially in random order. If - `wait` is set to `False`, yields tuples of `Feedback` and + Yields `FeedbackResult`, one for each element of `feedback_functions` + potentially in random order. If `wait` is set to `False`, yields `Future[FeedbackResult]` instead. """ @@ -336,12 +335,21 @@ def run_feedback_functions( if wait: # In blocking mode, wait for futures to complete. for fut_result in as_completed(future_feedback_map.keys()): - yield (future_feedback_map[fut_result], fut_result.result()) + # TODO: Do we want a version that gives the feedback for which + # the result is being produced too? This is more useful in the + # Future case as we cannot check associate a Future result to + # its feedback before result is ready. + + # yield (future_feedback_map[fut_result], fut_result.result()) + yield fut_result.result() else: # In non-blocking, return the futures instead. for fut_result, feedback in future_feedback_map.items(): - yield (feedback, fut_result) + # TODO: see prior. + + # yield (feedback, fut_result) + yield fut_result def add_app(self, app: AppDefinition) -> None: """ @@ -352,31 +360,51 @@ def add_app(self, app: AppDefinition) -> None: def add_feedback( self, - feedback_result: Optional[FeedbackResult] = None, + feedback_result_or_future: Optional[Union[FeedbackResult, Future[FeedbackResult]]] = None, **kwargs ) -> None: """ - Add a single feedback result to the database. + Add a single feedback result to the database. Accepts a FeedbackResult, + Future[FeedbackResult], or kwargs to create a FeedbackResult from. If a + Future is given, it will wait for the result before adding it to the + database. If kwargs are given and a FeedbackResult is also given, the + kwargs will be used to update the FeedbackResult. """ - if feedback_result is None: + if feedback_result_or_future is None: if 'result' in kwargs and 'status' not in kwargs: # If result already present, set status to done. kwargs['status'] = FeedbackResultStatus.DONE - feedback_result = FeedbackResult(**kwargs) + feedback_or_future_result = FeedbackResult(**kwargs) + else: - feedback_result.update(**kwargs) + if isinstance(feedback_result_or_future, Future): + wait([feedback_result_or_future]) + feedback_result_or_future = feedback_result_or_future.result() + elif isinstance(feedback_result_or_future, FeedbackResult): + pass + else: + raise ValueError( + f"Unknown type {type(feedback_result_or_future)} in feedback_results." + ) + + feedback_result_or_future.update(**kwargs) - self.db.insert_feedback(feedback_result=feedback_result) + self.db.insert_feedback(feedback_result=feedback_result_or_future) - def add_feedbacks(self, feedback_results: Iterable[FeedbackResult]) -> None: + def add_feedbacks( + self, + feedback_results: Iterable[Union[FeedbackResult, Future[FeedbackResult]]] + ) -> None: """ - Add multiple feedback results to the database. + Add multiple feedback results to the database. Accepts a list of either + `FeedbackResult` or `Future[FeedbackResult]`. If a `Future` is given, it + will wait for the result before adding it to the database. """ - for feedback_result in feedback_results: - self.add_feedback(feedback_result=feedback_result) + for feedback_result_or_future in feedback_results: + self.add_feedback(feedback_result_or_future=feedback_result_or_future) def get_app(self, app_id: Optional[str] = None) -> JSON: """