diff --git a/assets/training/finetune_acft_hf_nlp/components/pipeline_components/chat_completion/spec.yaml b/assets/training/finetune_acft_hf_nlp/components/pipeline_components/chat_completion/spec.yaml index feb38efded..aea31c4cef 100644 --- a/assets/training/finetune_acft_hf_nlp/components/pipeline_components/chat_completion/spec.yaml +++ b/assets/training/finetune_acft_hf_nlp/components/pipeline_components/chat_completion/spec.yaml @@ -488,9 +488,9 @@ outputs: description: output folder containing _best_ finetuned model in mlflow format. mode: rw_mount - # evaluation_result: - # type: uri_folder - # description: Test Data Evaluation Results + evaluation_result: + type: uri_folder + description: Test Data Evaluation Results jobs: ft_nlp_common_validation: @@ -627,34 +627,30 @@ jobs: # converted_model: '${{parent.jobs.chat_completion_finetune.outputs.mlflow_model_folder}}' outputs: mlflow_model_folder: '${{parent.outputs.mlflow_model_folder}}' - # model_prediction: - # type: command - # component: azureml:model_prediction:0.0.21 - # compute: '${{parent.inputs.compute_model_evaluation}}' - # resources: - # instance_type: '${{parent.inputs.instance_type_model_evaluation}}' - # inputs: - # task: chat-completion - # test_data: '${{parent.jobs.chat_completion_datapreprocess.outputs.output_dir}}' - # label_column_name: '' - # input_column_names: "''" - # batch_size: '${{parent.inputs.per_device_train_batch_size}}' - # device: auto - # mlflow_model: '${{parent.jobs.chat_completion_model_converter.outputs.mlflow_model_folder}}' - # compute_metrics: - # type: command - # component: azureml:compute_metrics:0.0.21 - # compute: '${{parent.inputs.compute_model_evaluation}}' - # resources: - # instance_type: '${{parent.inputs.instance_type_model_evaluation}}' - # inputs: - # task: chat-completion - # ground_truth: '${{parent.jobs.model_prediction.outputs.ground_truth}}' - # ground_truth_column_name: '${{parent.inputs.answers_key}}' - # prediction: '${{parent.jobs.model_prediction.outputs.predictions}}' - # prediction_column_name: predictions - # prediction_probabilities: '${{parent.jobs.model_prediction.outputs.prediction_probabilities}}' - # evaluation_config: '${{parent.inputs.evaluation_config}}' - # evaluation_config_params: '${{parent.inputs.evaluation_config_params}}' - # outputs: - # evaluation_result: '${{parent.outputs.evaluation_result}}' + model_prediction: + type: command + component: azureml:model_prediction_with_container:0.0.2 + compute: '${{parent.inputs.compute_model_evaluation}}' + resources: + instance_type: '${{parent.inputs.instance_type_model_evaluation}}' + inputs: + task: chat-completion + test_data: '${{parent.jobs.chat_completion_datapreprocess.outputs.output_dir}}' + label_column_name: messages + mlflow_model: '${{parent.jobs.chat_completion_model_converter.outputs.mlflow_model_folder}}' + evaluation_config_params: '${{parent.inputs.evaluation_config_params}}' + compute_metrics: + type: command + component: azureml:compute_metrics:0.0.31 + compute: '${{parent.inputs.compute_model_evaluation}}' + resources: + instance_type: '${{parent.inputs.instance_type_model_evaluation}}' + inputs: + task: chat-completion + ground_truth: '${{parent.jobs.model_prediction.outputs.ground_truth}}' + prediction: '${{parent.jobs.model_prediction.outputs.predictions}}' + prediction_probabilities: '${{parent.jobs.model_prediction.outputs.prediction_probabilities}}' + evaluation_config: '${{parent.inputs.evaluation_config}}' + evaluation_config_params: '${{parent.inputs.evaluation_config_params}}' + outputs: + evaluation_result: '${{parent.outputs.evaluation_result}}' diff --git a/assets/training/finetune_acft_hf_nlp/components/pipeline_components/text_generation/spec.yaml b/assets/training/finetune_acft_hf_nlp/components/pipeline_components/text_generation/spec.yaml index 7496151dfa..e676b70906 100644 --- a/assets/training/finetune_acft_hf_nlp/components/pipeline_components/text_generation/spec.yaml +++ b/assets/training/finetune_acft_hf_nlp/components/pipeline_components/text_generation/spec.yaml @@ -667,7 +667,7 @@ jobs: mlflow_model_folder: '${{parent.outputs.mlflow_model_folder}}' model_prediction: type: command - component: azureml:model_prediction:0.0.30 + component: azureml:model_prediction_with_container:0.0.2 compute: '${{parent.inputs.compute_model_evaluation}}' resources: instance_type: '${{parent.inputs.instance_type_model_evaluation}}' @@ -676,10 +676,7 @@ jobs: test_data: '${{parent.jobs.text_generation_datapreprocess.outputs.output_dir}}' label_column_name: '${{parent.inputs.ground_truth_key}}' input_column_names: '${{parent.inputs.text_key}}' - batch_size: '${{parent.inputs.per_device_train_batch_size}}' - device: auto - mlflow_model: '${{parent.jobs.text_generation_model_converter.outputs.mlflow_model_folder}}' - evaluation_config: '${{parent.inputs.evaluation_config}}' + mlflow_model: '${{parent.jobs.ft_nlp_model_converter.outputs.mlflow_model_folder}}' evaluation_config_params: '${{parent.inputs.evaluation_config_params}}' compute_metrics: type: command diff --git a/assets/training/model_evaluation/components/compute_metrics/spec.yaml b/assets/training/model_evaluation/components/compute_metrics/spec.yaml index cef89c4186..9f47115e41 100644 --- a/assets/training/model_evaluation/components/compute_metrics/spec.yaml +++ b/assets/training/model_evaluation/components/compute_metrics/spec.yaml @@ -3,7 +3,7 @@ name: compute_metrics display_name: Compute Metrics description: Calculate model performance metrics, given ground truth and prediction data. -version: 0.0.30 +version: 0.0.31 type: command tags: type: evaluation diff --git a/assets/training/model_evaluation/components/distributed_model_prediction/spec.yaml b/assets/training/model_evaluation/components/distributed_model_prediction/spec.yaml index eb7e740cc1..5aef9b5759 100644 --- a/assets/training/model_evaluation/components/distributed_model_prediction/spec.yaml +++ b/assets/training/model_evaluation/components/distributed_model_prediction/spec.yaml @@ -1,6 +1,6 @@ $schema: https://azuremlschemas.azureedge.net/latest/pipelineComponent.schema.json name: model_prediction_with_container -version: 0.0.1 +version: 0.0.2 type: command display_name: Distributed Model Prediction description: "Optimized Distributed inference component for LLMs." @@ -69,7 +69,7 @@ outputs: code: ../../src_distributed -environment: azureml://registries/azureml/environments/foundation-model-inference/versions/42 +environment: azureml://registries/azureml/environments/foundation-model-inference/versions/46 command: >- python download_extra_dependency.py --mlflow-model '${{inputs.mlflow_model}}' ; diff --git a/assets/training/model_evaluation/components/model_prediction/spec.yaml b/assets/training/model_evaluation/components/model_prediction/spec.yaml index 676eae12f4..7772524f9f 100644 --- a/assets/training/model_evaluation/components/model_prediction/spec.yaml +++ b/assets/training/model_evaluation/components/model_prediction/spec.yaml @@ -3,7 +3,7 @@ name: model_prediction display_name: Model Prediction description: Generate predictions on a given mlflow model for supported tasks. -version: 0.0.30 +version: 0.0.31 type: command tags: type: evaluation diff --git a/assets/training/model_evaluation/components/pipeline_component/spec.yaml b/assets/training/model_evaluation/components/pipeline_component/spec.yaml index 0ac6691c66..d476ef72b0 100644 --- a/assets/training/model_evaluation/components/pipeline_component/spec.yaml +++ b/assets/training/model_evaluation/components/pipeline_component/spec.yaml @@ -1,6 +1,6 @@ $schema: https://azuremlschemas.azureedge.net/latest/pipelineComponent.schema.json name: model_evaluation_pipeline -version: 0.0.30 +version: 0.0.31 type: pipeline display_name: Model Evaluation Pipeline description: Pipeline component for model evaluation for supported tasks. \ @@ -87,7 +87,7 @@ outputs: jobs: validation_trigger_model_evaluation: type: command - component: azureml:validation_trigger_model_evaluation:0.0.30 + component: azureml:validation_trigger_model_evaluation:0.0.31 compute: '${{parent.inputs.compute_name}}' resources: instance_type: '${{parent.inputs.instance_type}}' @@ -111,7 +111,7 @@ jobs: model_prediction: type: command - component: azureml:model_prediction:0.0.30 + component: azureml:model_prediction:0.0.31 compute: '${{parent.inputs.compute_name}}' resources: instance_type: '${{parent.inputs.instance_type}}' @@ -128,7 +128,7 @@ jobs: compute_metrics: type: command - component: azureml:compute_metrics:0.0.30 + component: azureml:compute_metrics:0.0.31 compute: '${{parent.inputs.compute_name}}' resources: instance_type: '${{parent.inputs.instance_type}}' diff --git a/assets/training/model_evaluation/components/validation_trigger_model_evaluation/spec.yaml b/assets/training/model_evaluation/components/validation_trigger_model_evaluation/spec.yaml index 049d753344..7b6d3eb5e4 100644 --- a/assets/training/model_evaluation/components/validation_trigger_model_evaluation/spec.yaml +++ b/assets/training/model_evaluation/components/validation_trigger_model_evaluation/spec.yaml @@ -3,7 +3,7 @@ name: validation_trigger_model_evaluation display_name: Validation Trigger Model Evaluation description: Component for enabling validation of model evaluation pipeline. -version: 0.0.30 +version: 0.0.31 type: command tags: type: evaluation diff --git a/assets/training/model_evaluation/src/evaluators/evaluators.py b/assets/training/model_evaluation/src/evaluators/evaluators.py index 6bd999b0ed..9021da11e6 100644 --- a/assets/training/model_evaluation/src/evaluators/evaluators.py +++ b/assets/training/model_evaluation/src/evaluators/evaluators.py @@ -617,16 +617,45 @@ def evaluate(self, y_test, y_pred, **kwargs): """ # dataframe with 2 columns predictions and predictions appended to the conversation if len(y_pred.columns) > 1: - y_pred_formatted = [ - list(item[ChatCompletionConstants.OUTPUT_FULL_CONVERSATION][0].values())[0] - for idx, item in y_pred.iterrows() - ] + logger.info("Found more than 1 col. Trying to fetch conversation.") + + def check_item(row_item: pd.Series): + """Convert input data to correct format for metrics package. + + Args: + row_item (pd.Series): Single row input from Dataframe + """ + item = row_item.get(ChatCompletionConstants.OUTPUT_FULL_CONVERSATION, None) + if item is None: + return row_item + if isinstance(item, list) and isinstance(item[0], dict): + if item[0].get("role", False) and item[0].get("content", False): + return item + else: + if item[0].get("0", False): + return item["0"] + return item + + y_pred_formatted = y_pred.apply(check_item, axis=1).tolist() # dataframe wih just predictions appended to conversations else: - y_pred_formatted = y_pred.values.tolist()[0] - # if ground truth is passed + y_pred_formatted = y_pred.values.tolist() + # if ground truth is passed if y_test is not None and len(y_test) > 0: - y_test = y_test.iloc[:, 0].apply(lambda x: [x]).tolist() + + def check_y_test(row_item: pd.Series): + """Convert ground truth into correct format for metrics package. + + Args: + row_item (pd.Series): Single row input from Dataframe + """ + item = row_item.get(y_test.columns[0]) + if isinstance(item, str) or isinstance(item, dict): + return [item] + if isinstance(item, list): + return item + + y_test = y_test.apply(check_y_test, axis=1).tolist() metrics = compute_metrics(task_type=constants.Tasks.CHAT_COMPLETION, y_pred=y_pred_formatted, y_test=y_test, **self.metrics_config) else: diff --git a/assets/training/model_evaluation/src/task_factory/text/ner.py b/assets/training/model_evaluation/src/task_factory/text/ner.py index bfcdcdc00a..4a98b51bcc 100644 --- a/assets/training/model_evaluation/src/task_factory/text/ner.py +++ b/assets/training/model_evaluation/src/task_factory/text/ner.py @@ -24,7 +24,12 @@ def ner_predictor_for_transformers(X_test, params=None): Returns: _type_: _description_ """ - transformers_class._override_model_config(params) + try: + transformers_class._override_model_config(params) + except AttributeError: + logger.info("Using newer version of mlflow.transformers._TransformersWrapper\ + model config override API") + transformers_class._merge_model_config_with_params(transformers_class.model_config, params) from azureml.evaluate.mlflow.hftransformers._task_based_predictors import NERPredictor predictor = NERPredictor(task_type="token-classification", model=transformers_class.pipeline.model, tokenizer=transformers_class.pipeline.tokenizer, diff --git a/assets/training/model_evaluation/src_distributed/data_utils.py b/assets/training/model_evaluation/src_distributed/data_utils.py index ae4100deae..9e74efd3c8 100644 --- a/assets/training/model_evaluation/src_distributed/data_utils.py +++ b/assets/training/model_evaluation/src_distributed/data_utils.py @@ -17,9 +17,9 @@ import glob from mltable import load - - -from logging_utilities import get_logger +from exceptions import DataLoaderException +from error_definitions import BadLabelColumnData +from logging_utilities import get_logger, get_azureml_exception, log_traceback logger = get_logger(name=__name__) @@ -180,6 +180,28 @@ def read_multiple_files(path): return iter([data]) +def prepare_chat_data_from_ft_pipeline(data: pd.DataFrame): + """Prepare Chat completion data from FT pipeline. + + Args: + data: pd.DataFrame + """ + try: + messages_col = data[local_constants.LLM_FT_CHAT_COMPLETION_KEY] + except Exception as e: + logger.error(f"'{local_constants.LLM_FT_CHAT_COMPLETION_KEY}' not found in FT test dataset.") + exception = get_azureml_exception(DataLoaderException, BadLabelColumnData, e, error=repr(e)) + log_traceback(exception, logger) + raise exception + X_test, y_test = {local_constants.LLM_FT_CHAT_COMPLETION_KEY:[]}, [] + for message in messages_col.to_list(): + X_test[local_constants.LLM_FT_CHAT_COMPLETION_KEY].append(message[:-1]) + y_test.append(message[-1]["content"]) + X_test = pd.DataFrame(X_test) + y_test = pd.Series(y_test) + return X_test, y_test.values + + def prepare_data(data, task, label_column_name=None, _has_multiple_output=False, extra_y_test_cols=None): """Prepare data. diff --git a/assets/training/model_evaluation/src_distributed/exceptions.py b/assets/training/model_evaluation/src_distributed/exceptions.py index 9dd2671deb..a5d9211f5c 100644 --- a/assets/training/model_evaluation/src_distributed/exceptions.py +++ b/assets/training/model_evaluation/src_distributed/exceptions.py @@ -4,7 +4,7 @@ """File to create AzureML Based Exceptions for Model Evaluation.""" from azureml.exceptions import AzureMLException -from constants import ExceptionLiterals +from local_constants import ExceptionLiterals class ModelEvaluationException(AzureMLException): diff --git a/assets/training/model_evaluation/src_distributed/local_constants.py b/assets/training/model_evaluation/src_distributed/local_constants.py index c87f5db44d..cf3e793c73 100644 --- a/assets/training/model_evaluation/src_distributed/local_constants.py +++ b/assets/training/model_evaluation/src_distributed/local_constants.py @@ -12,6 +12,7 @@ MLTABLE_FILE_NAME = "MLTable" LLM_FT_PREPROCESS_FILENAME = "preprocess_args.json" LLM_FT_TEST_DATA_KEY = "raw_test_data_fname" +LLM_FT_CHAT_COMPLETION_KEY = "messages" # default values class ModelPath: @@ -194,4 +195,10 @@ class TASK: FILTER_MODEL_PREDICTION_PARAMS = [ "tokenizer_config", "generator_config" -] \ No newline at end of file +] + +class ChatCompletionConstants: + """Chat completion constants.""" + + OUTPUT = "predictions" + OUTPUT_FULL_CONVERSATION = "prediction_appended" diff --git a/assets/training/model_evaluation/src_distributed/model_prediction.py b/assets/training/model_evaluation/src_distributed/model_prediction.py index 9675c553b7..c7385b244a 100644 --- a/assets/training/model_evaluation/src_distributed/model_prediction.py +++ b/assets/training/model_evaluation/src_distributed/model_prediction.py @@ -21,13 +21,14 @@ from argparse import ArgumentParser from concurrent.futures import ThreadPoolExecutor from local_constants import ArgumentLiterals, ModelPath, TEXT_TOKEN_TASKS, PerformanceColumns, FILTER_MODEL_PREDICTION_PARAMS +from local_constants import LLM_FT_PREPROCESS_FILENAME, LLM_FT_CHAT_COMPLETION_KEY, ChatCompletionConstants from itertools import repeat from accelerate import PartialState import torch.distributed as dist from datetime import datetime, timezone -from data_utils import read_model_prediction_data, prepare_data +from data_utils import read_model_prediction_data, prepare_data, prepare_chat_data_from_ft_pipeline from prepare_data import _clean_and_validate_dataset, validate_and_get_columns from exceptions import PredictException, DataLoaderException, ModelLoadingException from error_definitions import ModelPredictionInternalError, BadModel, BadInputData @@ -113,7 +114,19 @@ def predict(self, data): data, )) return self.postprocess(result) - + + + def _make_chat_completion_data(self, input_df, last_chats, col_name): + appended_data = {col_name:[]} + input_rows = input_df.values.tolist() + for ind, datarow in enumerate(input_rows): + conversation = datarow[0] + conversation.append({"role":"assistant", "content":last_chats[ind]}) + appended_data[col_name].append(conversation) + logger.info(f"Final Conversations: {appended_data}") + return pd.DataFrame(appended_data) + + def predict_single(self, data): """Predict single batch. @@ -131,19 +144,26 @@ def predict_single(self, data): input_texts = X_test.values.tolist() if isinstance(input_texts[0], list): if self.task_type == SupportedTask.CHAT_COMPLETION: - input_texts = [i[0] for i in input_texts] - input_texts = [i[0] if len(i) == 0 else [j.strip() for j in i] for i in input_texts] - data = { - "input_data": { - "input_string": input_texts, - "parameters": self.extra_params, + input_data = [] + add_generation_prompt = self.extra_params.pop("add_generation_prompt", True) + for itext in input_texts: + input_data.append(self.tokenizer.apply_chat_template(itext[0], tokenize=False, add_generation_prompt=add_generation_prompt)) + input_texts = input_data[:] + self.extra_params.update({"return_full_text": False}) + payload = MIRPayload(input_texts, self.extra_params, TaskType.CONVERSATIONAL, False) + else: + input_texts = [i[0] if len(i) == 1 else [j.strip() for j in i] for i in input_texts] + data = { + "input_data": { + "input_string": input_texts, + "parameters": self.extra_params, + } } - } - data.update({'task_type': self.task_type}) - #logger.info(f"Input Data: {data}") + payload = MIRPayload.from_dict(data) + payload.update_params(get_generator_params(payload.params)) + - payload = MIRPayload.from_dict(data) - payload.update_params(get_generator_params(payload.params)) + logger.info( f"Processing new request with parameters: {payload.params}" ) @@ -162,7 +182,6 @@ def predict_single(self, data): end_ms = time.time() * 1000 outputs = [res.response for i, res in enumerate(inference_results)] pred_probas = [res.scores for res in inference_results] - #logger.info(f"Outputs: {outputs}") perf_data = [{ PerformanceColumns.BATCH_SIZE_COLUMN_NAME: len(input_texts), PerformanceColumns.START_TIME_COLUMN_NAME: datetime.fromtimestamp(start_ms / 1000, timezone.utc).isoformat(), @@ -173,13 +192,19 @@ def predict_single(self, data): PerformanceColumns.INPUT_CHARACTERS_COLUMN_NAME: len(gt) if isinstance(gt, str) else 1, PerformanceColumns.INPUT_TOKENS_COLUMN_NAME: len(self.tokenizer(gt)) if self.tokenizer is not None else 0 } for gt, pred in zip(input_texts, outputs)] - pred_df = pd.DataFrame(outputs, index=X_test.index, columns=["prediction"]) pred_proba_df = pd.DataFrame(pred_probas, index=X_test.index) + perf_data = pd.DataFrame(perf_data) + if self.task_type == SupportedTask.CHAT_COMPLETION or self.task_type == TaskType.CONVERSATIONAL: + pred_df = self._make_chat_completion_data(X_test, outputs, + col_name=ChatCompletionConstants.OUTPUT_FULL_CONVERSATION) + pred_df[ChatCompletionConstants.OUTPUT] = outputs + y_test = self._make_chat_completion_data(X_test, y_test, col_name="ground_truth") + return pred_df, y_test, perf_data, pred_proba_df + pred_df = pd.DataFrame(outputs, index=X_test.index, columns=["prediction"]) if isinstance(y_test, pd.Series): y_test = y_test.to_frame() elif isinstance(y_test, np.ndarray) or isinstance(y_test, list): y_test = pd.DataFrame(y_test, index=X_test.index) - perf_data = pd.DataFrame(perf_data) return pred_df, y_test, perf_data, pred_proba_df except Exception as e: @@ -280,6 +305,10 @@ def load_data(task, test_data, label_column_name, input_column_names, extra_y_te all_cols += extra_y_test_cols data = read_model_prediction_data(file_path=test_data, batch_size=batch_size) + if task == SupportedTask.CHAT_COMPLETION and os.path.isdir(test_data) and LLM_FT_PREPROCESS_FILENAME in os.listdir(test_data): + logger.info(f"Run from Finetune Pipeline. Fetching chat completion data from {test_data}") + data = map(prepare_chat_data_from_ft_pipeline, data) + return data data = map(_clean_and_validate_dataset, data, repeat(all_cols), repeat(batch_size)) data = map(prepare_data, data, repeat(task), repeat(label_column_name), repeat(False), repeat(extra_y_test_cols)) @@ -389,7 +418,7 @@ def is_fsdp_enabled(): and strtobool(os.environ.get("FSDP_CPU_RAM_EFFICIENT_LOADING", "False")) == 1 ) -@swallow_all_exceptions +@swallow_all_exceptions(logger) def main(): """Initialize text-generation-inference server and client.""" extra_params = {} @@ -472,7 +501,7 @@ def main(): ) if not os.path.exists(tokenizer_path): tokenizer_path = model_path - engine_config, task_config, default_generator_configs, task_type = build_configs_from_model( + engine_config, task_config, default_generator_configs, task_type, model_info = build_configs_from_model( mlmodel, model_path, config_path, @@ -491,7 +520,7 @@ def main(): enable_character_counts = True extra_params.pop("char_count_per_sample") tokenizer = None - if task_type in TEXT_TOKEN_TASKS and enable_token_counts: + if (task_type in TEXT_TOKEN_TASKS and enable_token_counts) or (task_type == SupportedTask.CHAT_COMPLETION or task_type == TaskType.CONVERSATIONAL): tokenizer = load_tokenizer(engine_config["tokenizer"], engine_config["ml_model_info"].get("hf_tokenizer_class", "AutoTokenizer")) g_fmscorer = FMScore(config) @@ -532,7 +561,8 @@ def main(): logger.info(f"Type of each key: {[(k, type(v), len(v)) for k, v in collated_res[0].items()]}") y_pred_df, y_test_df, y_perf_df, y_pred_proba_df = _gather_predictions(collated_res) - y_pred_df.columns = ["predictions"] + if task_type != SupportedTask.CHAT_COMPLETION and task_type != TaskType.CONVERSATIONAL: + y_pred_df.columns = ["predictions"] ground_truth_columns = [label_column_name] if extra_y_test_cols is not None: ground_truth_columns += extra_y_test_cols