diff --git a/langtest/metrics/llm_eval.py b/langtest/metrics/llm_eval.py index db1b31a05..b53853856 100644 --- a/langtest/metrics/llm_eval.py +++ b/langtest/metrics/llm_eval.py @@ -1,6 +1,7 @@ import re import string -from typing import List, Optional, Tuple +from textwrap import dedent +from typing import List, Mapping, Optional, Tuple from ..utils.custom_types.helpers import HashableDict template = """You are a teacher grading a quiz. @@ -22,10 +23,82 @@ input_variables = ["query", "result", "answer"] +class EvalTemplate: + """ + The EvalTemplate class provides a method to build a prompt for evaluating student answers + based on a given rubric. The prompt is designed for a teacher to grade a quiz by comparing + the student's answer with the true answer and scoring it according to specified criteria. + + Methods + ------- + build_prompt(rubic_score: Mapping[str, str] = {"CORRECT": None, "INCORRECT": None}) -> str + Constructs and returns a grading prompt based on the provided rubric scores. + + """ + + @staticmethod + def build_prompt( + rubic_score: Mapping[str, str] = { + "CORRECT": None, + "INCORRECT": None, + } + ): + """ """ + grade_list = list(rubic_score.keys()) + grade_list = ", ".join(grade_list[:-1]) + f" or {grade_list[-1]}" + + eval_criteria = [ + f"{grade_name}: {criteria}\n" + for grade_name, criteria in rubic_score.items() + if criteria + ] + prompt = ( + "You are a teacher grading a quiz. You are given a question, the student's " + "answer, and the true answer, and are asked to score the student answer as either " + f"{grade_list}." + ) + + if eval_criteria: + eval_criteria = "".join(eval_criteria) + prompt += dedent( + f"""\n\nScore the student answer based on the following criteria:\n{eval_criteria}""" + ) + + prompt += dedent( + f""" + Example Format: + QUESTION: question here + STUDENT ANSWER: student's answer here + TRUE ANSWER: true answer here + GRADE: {grade_list} here + + { + ("Grade the student answers based ONLY on their factual accuracy. Ignore differences" + " in punctuation and phrasing between the student answer and true answer. It is OK " + "if the student answer contains more or relevant information than the true answer, as" + " long as it does not contain any conflicting statements. Begin!") + } + + QUESTION: {{query}} + STUDENT ANSWER: {{result}} + TRUE ANSWER: {{answer}} + GRADE:""" + ) + return prompt + + class LlmEval: """llm_eval for evaluating question answering.""" - def __init__(self, llm, template=template, input_variables=input_variables): + grade_list = None + + def __init__( + self, + llm, + template=template, + input_variables=input_variables, + grade_list=None, + ): """ Initializes the LlmEval object. @@ -42,6 +115,7 @@ def __init__(self, llm, template=template, input_variables=input_variables): self.template = template self.input_variables = input_variables self.server_prompt = server_prompt + LlmEval.grade_list = grade_list expected_input_vars = {"query", "answer", "result"} if expected_input_vars != set(self.input_variables): @@ -52,33 +126,55 @@ def __init__(self, llm, template=template, input_variables=input_variables): @staticmethod def _get_score(text: str) -> Optional[Tuple[str, int]]: - match = re.search(r"grade:\s*(correct|incorrect)", text.strip(), re.IGNORECASE) + if LlmEval.grade_list is None: + default_grades = ["CORRECT", "INCORRECT"] + grade_list_pattern = f"grade:\\s*({'|'.join(default_grades).lower()})" + else: + grade_list_pattern = f"(?:grade\\s*)?({'|'.join(LlmEval.grade_list).lower()})" + + match = re.search(grade_list_pattern, text.strip(), re.IGNORECASE) if match: - if match.group(1).upper() == "CORRECT": - return "CORRECT", 1 - elif match.group(1).upper() == "INCORRECT": - return "INCORRECT", 0 - try: - first_word = ( - text.strip() - .split()[0] - .translate(str.maketrans("", "", string.punctuation)) - ) - if first_word.upper() == "CORRECT": - return "CORRECT", 1 - elif first_word.upper() == "INCORRECT": - return "INCORRECT", 0 - last_word = ( - text.strip() - .split()[-1] - .translate(str.maketrans("", "", string.punctuation)) - ) - if last_word.upper() == "CORRECT": - return "CORRECT", 1 - elif last_word.upper() == "INCORRECT": - return "INCORRECT", 0 - except IndexError: - pass + grade = match.group(1).upper() + if LlmEval.grade_list is None: + if grade == "CORRECT": + return "CORRECT", 1 + elif grade == "INCORRECT": + return "INCORRECT", 0 + elif grade in LlmEval.grade_list: + return grade, LlmEval.grade_list.index(grade) + else: + try: + # Check for first word + first_word = ( + text.strip() + .split()[0] + .translate(str.maketrans("", "", string.punctuation)) + ) + if LlmEval.grade_list is None: + if first_word.upper() == "CORRECT": + return "CORRECT", 1 + elif first_word.upper() == "INCORRECT": + return "INCORRECT", 0 + elif first_word.upper() in LlmEval.grade_list: + return first_word.upper(), LlmEval.grade_list.index( + first_word.upper() + ) + + # Check for last word + last_word = ( + text.strip() + .split()[-1] + .translate(str.maketrans("", "", string.punctuation)) + ) + if LlmEval.grade_list is None: + if last_word.upper() == "CORRECT": + return "CORRECT", 1 + elif last_word.upper() == "INCORRECT": + return "INCORRECT", 0 + elif last_word.upper() in LlmEval.grade_list: + return last_word.upper(), LlmEval.grade_list.index(last_word.upper()) + except IndexError: + pass return None @staticmethod diff --git a/langtest/utils/custom_types/helpers.py b/langtest/utils/custom_types/helpers.py index ac04cb20c..cb7866128 100644 --- a/langtest/utils/custom_types/helpers.py +++ b/langtest/utils/custom_types/helpers.py @@ -2,7 +2,8 @@ from pydantic import BaseModel from collections.abc import Hashable import importlib -from typing import List, Tuple +from typing import List, Tuple, Union + from ...errors import Errors default_user_prompt = { @@ -350,6 +351,7 @@ def is_pass_llm_eval( answer: str, perturbed_question: str, prediction: str, + eval_template: Union[str, dict] = None, ): """ Determines whether the model's prediction passes the Language Model Metric (LLM) evaluation. @@ -367,22 +369,47 @@ def is_pass_llm_eval( """ - if prediction.lower().strip() == answer.lower().strip(): - return True + if eval_template is None: + if prediction.lower().strip() == answer.lower().strip(): + return True inputs, predictions = prepare_llm_evaluation_data( original_question, answer, perturbed_question, prediction ) + + grades = None + if eval_template is None: + # from ...transform.constants import qa_prompt_template as template + from ...metrics.llm_eval import template + + eval_template = template + elif isinstance(eval_template, dict): + from ...metrics.llm_eval import EvalTemplate + + rubic_score_dict = eval_template.get("rubic_score", None) + grades = list(rubic_score_dict.keys()) + + eval_template = EvalTemplate.build_prompt(rubic_score_dict) + if "llm" in str(type(eval_model)): - result = llm_prompt_eval(eval_model, dataset_name, inputs, predictions) + result = llm_prompt_eval( + eval_model, dataset_name, inputs, predictions, eval_template, grades + ) else: - result = transformer_prompt_eval(eval_model, inputs, predictions) + result = transformer_prompt_eval( + eval_model, inputs, predictions, eval_template, grades + ) return result def llm_prompt_eval( - eval_model, dataset_name: str, inputs: List[dict], predictions: List[dict] + eval_model, + dataset_name: str, + inputs: List[dict], + predictions: List[dict], + template: str = None, + grades: List[str] = None, ) -> bool: """ Evaluates model predictions using the Language Model Metric (LLM) with prompt-based evaluation. @@ -400,9 +427,6 @@ def llm_prompt_eval( from langchain.evaluation.qa import QAEvalChain from langchain.prompts import PromptTemplate - # from ...transform.constants import qa_prompt_template as template - from ...metrics.llm_eval import template - PROMPT = PromptTemplate( input_variables=["query", "answer", "result"], template=template, @@ -436,17 +460,31 @@ def llm_prompt_eval( answer_key="answer", prediction_key="text", ) - result = bool( - re.match( - r"CORRECT|TRUE", + if grades: + # Extract the grade from the result by matching the pattern + result = re.sub( + r"GRADE: ", + "", list(graded_outputs[0].values())[0].replace("\n", "").strip(), ) - ) + match = re.search(f"({'|'.join(grades)})", result, re.IGNORECASE).group(0) + return match + else: + result = bool( + re.match( + r"CORRECT|TRUE", + list(graded_outputs[0].values())[0].replace("\n", "").strip(), + ) + ) return result def transformer_prompt_eval( - eval_model, inputs: List[dict], predictions: List[dict] + eval_model, + inputs: List[dict], + predictions: List[dict], + template: str = None, + grades: List[str] = None, ) -> bool: """ Evaluates model predictions using a transformer-based language model. @@ -461,7 +499,7 @@ def transformer_prompt_eval( """ from ...metrics.llm_eval import LlmEval - eval_chain = LlmEval(llm=eval_model) + eval_chain = LlmEval(llm=eval_model, template=template, grade_list=grades) graded_outputs = eval_chain.evaluate( inputs, predictions, @@ -469,7 +507,16 @@ def transformer_prompt_eval( answer_key="answer", prediction_key="result", ) - result = list(graded_outputs[0].values())[0].replace("\n", "").strip() == "CORRECT" + if grades is None: + result = ( + list(graded_outputs[0].values())[0].replace("\n", "").strip() == "CORRECT" + ) + else: + result = re.sub( + r"GRADE: ", + "", + list(graded_outputs[0].values())[0].replace("\n", "").strip(), + ) return result diff --git a/langtest/utils/custom_types/sample.py b/langtest/utils/custom_types/sample.py index dd4b49300..bdb34362f 100644 --- a/langtest/utils/custom_types/sample.py +++ b/langtest/utils/custom_types/sample.py @@ -399,7 +399,7 @@ class BaseQASample(BaseModel): state: str = None task: str = Field(default="question-answering", const=True) test_case: str = None - config: str = None + config: Mapping[str, Mapping] = None distance_result: float = None eval_model: Union[str, tuple] = None ran_pass: bool = None @@ -553,6 +553,8 @@ def __update_params(self): self.eval_model = load_eval_model.model( model, hub, **harness_config.get("model_parameters", {}) ) + else: + self.eval_model = EVAL_MODEL else: self.eval_model = EVAL_MODEL @@ -656,6 +658,12 @@ def is_pass(self) -> bool: elif self.metric_name == "llm_eval": if isinstance(self.eval_model, dict): self.eval_model = list(self.eval_model.values())[-1] + + # get the template for evaluation + + template = self.config.get("evaluation", {}).get("eval_prompt", None) + + # get the metric function result = metric_function( eval_model=self.eval_model, dataset_name=self.dataset_name, @@ -663,6 +671,7 @@ def is_pass(self) -> bool: answer=self.expected_results, perturbed_question=self.perturbed_question, prediction=self.actual_results, + eval_template=template, ) self.ran_pass = result diff --git a/langtest/utils/report_utils.py b/langtest/utils/report_utils.py index a26431642..4d498e449 100644 --- a/langtest/utils/report_utils.py +++ b/langtest/utils/report_utils.py @@ -122,50 +122,119 @@ def model_report( """ report = {} + unique_labels = [] + for sample in generated_results: if sample.test_type in ["degradation_analysis"]: continue + pass_value = str(sample.is_pass()).lower() summary[sample.test_type]["category"] = sample.category - summary[sample.test_type][str(sample.is_pass()).lower()] += 1 - for test_type, value in summary.items(): - pass_rate = summary[test_type]["true"] / ( - summary[test_type]["true"] + summary[test_type]["false"] + summary[sample.test_type][pass_value] += 1 + if pass_value not in unique_labels: + unique_labels.append(pass_value) + + for test_type, test_values in summary.items(): + # get minimum pass rate for the test type from the min_pass_dict or default_min_pass_dict + min_pass_rate = min_pass_dict.get(test_type, default_min_pass_dict) + + # get minimum pass rate for multiple perturbations same as if the test type contains "-" + if "-" in test_type and test_values["category"] == "robustness": + multiple_perturbations_min_pass_rate = min_pass_dict.get( + "multiple_perturbations", default_min_pass_dict + ) + min_pass_rate = min_pass_dict.get( + test_type, multiple_perturbations_min_pass_rate ) - min_pass_rate = min_pass_dict.get(test_type, default_min_pass_dict) - if "-" in test_type and summary[test_type]["category"] == "robustness": - multiple_perturbations_min_pass_rate = min_pass_dict.get( - "multiple_perturbations", default_min_pass_dict - ) - min_pass_rate = min_pass_dict.get( - test_type, multiple_perturbations_min_pass_rate - ) - if summary[test_type]["category"] in ["Accuracy", "performance"]: - min_pass_rate = 1 + # Accuracy and performance tests should have a minimum pass rate of 1 + if test_values["category"] in ["accuracy", "performance"]: + min_pass_rate = 1 - report[test_type] = { - "category": summary[test_type]["category"], - "fail_count": summary[test_type]["false"], - "pass_count": summary[test_type]["true"], - "pass_rate": pass_rate, - "minimum_pass_rate": min_pass_rate, - "pass": pass_rate >= min_pass_rate, - } + # create a temporary dictionary to store the category, test_type, and pass/fail or score_1, score_2, score_3 etc. + temp = { + "category": test_values["category"], + } + + # handling multiple keys in the dictionary like (true or false), (score_1, score_2, score_3) + record_count = sum( + num for num in test_values.values() if isinstance(num, (int, float)) + ) + # record_count = test_values["total"] + + if record_count == 0: + temp.update( + { + "fail_count": 0, + "pass_count": 0, + "pass_rate": 0, + "minimum_pass_rate": min_pass_rate, + "pass": False, + } + ) + else: + ispass = False + for key, value in test_values.items(): + if key in ("category",): + continue + + name = "pass" if key == "true" else "fail" if key == "false" else key + temp[name + "_count"] = value if value else 0 + + if key in ["true", "false"]: + pass_rate = summary[test_type].get("true", 0) / record_count + ispass = pass_rate >= min_pass_rate + temp.update( + { + "pass_rate": pass_rate, + "minimum_pass_rate": min_pass_rate, + "pass": ispass, + } + ) + + report[test_type] = temp df_report = pd.DataFrame.from_dict(report, orient="index") df_report = df_report.reset_index().rename(columns={"index": "test_type"}) - df_report["pass_rate"] = df_report["pass_rate"].apply( - lambda x: "{:.0f}%".format(x * 100) - ) - df_report["minimum_pass_rate"] = df_report["minimum_pass_rate"].apply( - lambda x: "{:.0f}%".format(x * 100) - ) - col_to_move = "category" - first_column = df_report.pop("category") - df_report.insert(0, col_to_move, first_column) + if "pass_rate" in df_report.columns and "minimum_pass_rate" in df_report.columns: + df_report["pass_rate"] = df_report["pass_rate"].apply( + lambda x: "{:.0f}%".format(x * 100) + ) + df_report["minimum_pass_rate"] = df_report["minimum_pass_rate"].apply( + lambda x: "{:.0f}%".format(x * 100) + ) + + # rearrange the columns + columns = df_report.columns.tolist() + + ordered_columns = [ + "category", + "test_type", + "fail_count", + "pass_count", + "pass_rate", + "minimum_pass_rate", + "pass", + ] + [f"{col}_count" for col in unique_labels if col not in ["true", "false"]] + df_report = df_report.reset_index(drop=True) + columns = list(set(columns)) + columns = sorted( + columns, + key=lambda x: ordered_columns.index(x) + if x in ordered_columns + else len(ordered_columns), + ) + + # df_report = df_report.T.drop_duplicates().T + # col_to_move = "category" + # first_column = df_report.pop("category") + # df_report.insert(0, col_to_move, first_column) + df_report = df_report[columns] + df_report.loc[:, [col for col in columns if col.endswith("_count")]] = df_report[ + [col for col in columns if col.endswith("_count")] + ].fillna(0) df_report = df_report.fillna("-") return df_report