From 56e72cf6066f855806ff502ae6cc43416cecfcd5 Mon Sep 17 00:00:00 2001 From: aavarghese Date: Thu, 12 Dec 2024 11:30:33 -0500 Subject: [PATCH] fix: modify header_cleanser to minimum size import without DPK lib pip Signed-off-by: aavarghese --- .../code/header-cleanser/requirements.txt | 1 - .../src/header_cleanser_transform.py | 224 ------------------ .../code/header-cleanser/src/main.py | 206 +++++++++++++++- .../language/pdf2parquet/src/main.py | 9 +- .../pii-redactor/src/flair_recognizer.py | 1 - .../doc-id/src/doc_id_transform_base.py | 177 -------------- .../doc-id/src/doc_id_transform_python.py | 120 ---------- .../universal/doc-id/src/main.py | 160 ++++++++++--- .../universal/resize/src/main.py | 7 +- 9 files changed, 320 insertions(+), 585 deletions(-) delete mode 100644 demos/data-prep-kit/code/header-cleanser/src/header_cleanser_transform.py delete mode 100644 demos/data-prep-kit/universal/doc-id/src/doc_id_transform_base.py delete mode 100644 demos/data-prep-kit/universal/doc-id/src/doc_id_transform_python.py diff --git a/demos/data-prep-kit/code/header-cleanser/requirements.txt b/demos/data-prep-kit/code/header-cleanser/requirements.txt index 1135c635..7f108d51 100644 --- a/demos/data-prep-kit/code/header-cleanser/requirements.txt +++ b/demos/data-prep-kit/code/header-cleanser/requirements.txt @@ -1,4 +1,3 @@ -data-prep-toolkit==0.2.2.dev1 scancode-toolkit-mini # we can probably update to 18+, but we will have to re-generate expected output as pyarrow 18 seems to have resulted in a binary format change diff --git a/demos/data-prep-kit/code/header-cleanser/src/header_cleanser_transform.py b/demos/data-prep-kit/code/header-cleanser/src/header_cleanser_transform.py deleted file mode 100644 index d9d0d5a3..00000000 --- a/demos/data-prep-kit/code/header-cleanser/src/header_cleanser_transform.py +++ /dev/null @@ -1,224 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the “License”); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an “AS IS” BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -################################################################################ - -import os -import tempfile -from argparse import ArgumentParser, Namespace - -import pyarrow as pa -from data_processing.runtime.pure_python.runtime_configuration import ( - PythonTransformRuntimeConfiguration, -) -from data_processing.transform import AbstractTableTransform, TransformConfiguration -from data_processing.utils import CLIArgumentProvider, get_logger, str2bool -from scancode import api - - -logger = get_logger(__name__) - -short_name = "header_cleanser" -cli_prefix = short_name + "_" -COLUMN_KEY = "contents_column_name" -LICENSE_KEY = "license" -COPYRIGHT_KEY = "copyright" - -column_cli_params = f"{cli_prefix}{COLUMN_KEY}" -license_cli_params = f"{cli_prefix}{LICENSE_KEY}" -copyright_cli_params = f"{cli_prefix}{COPYRIGHT_KEY}" - -DEFAULT_COLUMN = "contents" -DEFAULT_LICENSE = True -DEFAULT_COPYRIGHT = True - - -def file_generate(content): - """ - Generate temporary file so that it can be passed to scancode-toolkit. - """ - try: - with tempfile.NamedTemporaryFile(delete=False, suffix=".txt") as temp_file: - temp_file.write(content.encode("utf-8")) - temp_file_path = temp_file.name - except Exception as e: - print(f"Failed to create file : {e}") - return temp_file_path - - -def fetch_index(dict_data): - """ - Extract License and copyright start and endline from dictonary - """ - ignore_lines = [] - if dict_data.get("license_detections") != None: - for licenses in dict_data.get("license_detections"): - for match in licenses.get("matches"): - start_line = match["start_line"] - 1 - end_line = match["end_line"] - 1 - ignore_lines.extend([i for i in range(start_line, end_line + 1)]) - - if dict_data.get("copyrights") != None: - for copyrights in dict_data.get("copyrights"): - start_line = copyrights.get("start_line") - 1 - end_line = copyrights.get("end_line") - 1 - ignore_lines.extend([i for i in range(start_line, end_line + 1)]) - - return ignore_lines - - -def check_empty_comment(code, ignore_lines): - min_index = min(ignore_lines) - max_index = max(ignore_lines) - code_list = code.split("\n") - if min_index != 0: - min_index = min_index - 1 - - if max_index <= len(code_list): - max_index = max_index + 2 - - for index in range(min_index, max_index): - if all( - not isinstance(x, (int, float, complex)) - and not isinstance(x, str) - or (isinstance(x, str) and not x.isalnum()) - for x in code_list[index] - ): - if index not in ignore_lines: - ignore_lines.append(index) - - return ignore_lines - - -def remove_copyright(code): - """ - Using scancode.api function to detecte and remove copyright. - """ - file_path = file_generate(content=code) - copyright_dict = api.get_copyrights(file_path) - os.remove(file_path) - ignore_lines = fetch_index(copyright_dict) - if ignore_lines != []: - modified_code = "\n".join([line for i, line in enumerate(code.split("\n"), 0) if i not in ignore_lines]) - return modified_code, ignore_lines != [] - else: - return code, False - - -def remove_license(code): - """ - Using scancode.api function to detecte and remove license. - """ - file_path = file_generate(content=code) - license_dict = api.get_licenses(file_path) - os.remove(file_path) - ignore_lines = fetch_index(license_dict) - if ignore_lines != []: - modified_code = "\n".join([line for i, line in enumerate(code.split("\n"), 0) if i not in ignore_lines]) - return modified_code, ignore_lines != [] - else: - return code, False - - -def remove_license_copyright(code): - - file_path = file_generate(code) - copyright_dict = api.get_copyrights(file_path) - license_dict = api.get_licenses(file_path) - os.remove(file_path) - ignore_lines_license = fetch_index(license_dict) - ignore_lines_copyright = fetch_index(copyright_dict) - ignore_lines = ignore_lines_license + ignore_lines_copyright - if ignore_lines != []: - ignore_lines = check_empty_comment(code, ignore_lines) - modified_code = "\n".join([line for i, line in enumerate(code.split("\n"), 0) if i not in ignore_lines]) - return modified_code, True - else: - return code, False - - -class HeaderCleanserTransform(AbstractTableTransform): - def __init__(self, config: dict): - super().__init__(config) - - self.column_name = config.get(COLUMN_KEY, DEFAULT_COLUMN) - self.license_remove = config.get(LICENSE_KEY, DEFAULT_LICENSE) - self.copyright_remove = config.get(COPYRIGHT_KEY, DEFAULT_COPYRIGHT) - - def transform(self, table: pa.Table, file_name: str = None) -> tuple[list[pa.Table], dict]: - - contents = table.column(self.column_name).to_pylist() - updated_content = [] - remove_code_count = 0 - for content in contents: - if self.license_remove and self.copyright_remove: - new_content, detect = remove_license_copyright(content) - if detect: - remove_code_count += 1 - updated_content.append(new_content) - - elif self.copyright_remove: - new_content, detect = remove_copyright(content) - if detect: - remove_code_count += 1 - updated_content.append(new_content) - - elif self.license_remove: - new_content, detect = remove_license(content) - if detect: - remove_code_count += 1 - updated_content.append(new_content) - - else: - return [table], {"Removed code count": remove_code_count} - - updated_content = pa.array(updated_content) - - table = table.set_column(table.column_names.index(self.column_name), self.column_name, updated_content) - - return [table], {"Removed code count": remove_code_count} - - -class HeaderCleanserTransformConfiguration(TransformConfiguration): - def __init__(self): - super().__init__(name="header_cleanser", transform_class=HeaderCleanserTransform) - - def add_input_params(self, parser: ArgumentParser) -> None: - parser.add_argument( - f"--{column_cli_params}", - required=False, - type=str, - default=f"{DEFAULT_COLUMN}", - help="Name of the column holds the data to process", - ) - parser.add_argument( - f"--{license_cli_params}", - required=False, - type=lambda x: bool(str2bool(x)), - default=f"{DEFAULT_LICENSE}", - help="Set False if license should not be removed", - ) - parser.add_argument( - f"--{copyright_cli_params}", - required=False, - type=lambda x: bool(str2bool(x)), - default=f"{DEFAULT_COPYRIGHT}", - help="Set False if copyright should not be removed ", - ) - - def apply_input_params(self, args: Namespace) -> bool: - captured = CLIArgumentProvider.capture_parameters(args, cli_prefix, False) - self.params = self.params | captured - return True - - -class HeaderCleanserPythonTransformConfiguration(PythonTransformRuntimeConfiguration): - def __init__(self): - super().__init__(transform_config=HeaderCleanserTransformConfiguration()) diff --git a/demos/data-prep-kit/code/header-cleanser/src/main.py b/demos/data-prep-kit/code/header-cleanser/src/main.py index bbb4cfb3..3bf0c42c 100644 --- a/demos/data-prep-kit/code/header-cleanser/src/main.py +++ b/demos/data-prep-kit/code/header-cleanser/src/main.py @@ -14,23 +14,203 @@ import sys import pyarrow.parquet as pq +import tempfile +from argparse import ArgumentParser, Namespace -from header_cleanser_transform import ( - COLUMN_KEY, - COPYRIGHT_KEY, - LICENSE_KEY, - HeaderCleanserTransform, -) +import pyarrow as pa +from scancode import api -header_cleanser_params = { - COLUMN_KEY: "contents", - COPYRIGHT_KEY: True, - LICENSE_KEY: True, -} +short_name = "header_cleanser" +cli_prefix = short_name + "_" +COLUMN_KEY = "contents_column_name" +LICENSE_KEY = "license" +COPYRIGHT_KEY = "copyright" + +column_cli_params = f"{cli_prefix}{COLUMN_KEY}" +license_cli_params = f"{cli_prefix}{LICENSE_KEY}" +copyright_cli_params = f"{cli_prefix}{COPYRIGHT_KEY}" + +DEFAULT_COLUMN = "contents" +DEFAULT_LICENSE = True +DEFAULT_COPYRIGHT = True + +def str2bool(value: str) -> bool: + """ + Convert string to boolean. Helper for getting boolean parameters + :param value - input string + """ + if value.strip().lower() in ("yes", "true", "t", "y", "1"): + return True + return False + +def file_generate(content): + """ + Generate temporary file so that it can be passed to scancode-toolkit. + """ + try: + with tempfile.NamedTemporaryFile(delete=False, suffix=".txt") as temp_file: + temp_file.write(content.encode("utf-8")) + temp_file_path = temp_file.name + except Exception as e: + print(f"Failed to create file : {e}") + return temp_file_path + + +def fetch_index(dict_data): + """ + Extract License and copyright start and endline from dictonary + """ + ignore_lines = [] + if dict_data.get("license_detections") != None: + for licenses in dict_data.get("license_detections"): + for match in licenses.get("matches"): + start_line = match["start_line"] - 1 + end_line = match["end_line"] - 1 + ignore_lines.extend([i for i in range(start_line, end_line + 1)]) + + if dict_data.get("copyrights") != None: + for copyrights in dict_data.get("copyrights"): + start_line = copyrights.get("start_line") - 1 + end_line = copyrights.get("end_line") - 1 + ignore_lines.extend([i for i in range(start_line, end_line + 1)]) + + return ignore_lines + + +def check_empty_comment(code, ignore_lines): + min_index = min(ignore_lines) + max_index = max(ignore_lines) + code_list = code.split("\n") + if min_index != 0: + min_index = min_index - 1 + + if max_index <= len(code_list): + max_index = max_index + 2 + + for index in range(min_index, max_index): + if all( + not isinstance(x, (int, float, complex)) + and not isinstance(x, str) + or (isinstance(x, str) and not x.isalnum()) + for x in code_list[index] + ): + if index not in ignore_lines: + ignore_lines.append(index) + + return ignore_lines + + +def remove_copyright(code): + """ + Using scancode.api function to detecte and remove copyright. + """ + file_path = file_generate(content=code) + copyright_dict = api.get_copyrights(file_path) + os.remove(file_path) + ignore_lines = fetch_index(copyright_dict) + if ignore_lines != []: + modified_code = "\n".join([line for i, line in enumerate(code.split("\n"), 0) if i not in ignore_lines]) + return modified_code, ignore_lines != [] + else: + return code, False + + +def remove_license(code): + """ + Using scancode.api function to detecte and remove license. + """ + file_path = file_generate(content=code) + license_dict = api.get_licenses(file_path) + os.remove(file_path) + ignore_lines = fetch_index(license_dict) + if ignore_lines != []: + modified_code = "\n".join([line for i, line in enumerate(code.split("\n"), 0) if i not in ignore_lines]) + return modified_code, ignore_lines != [] + else: + return code, False + + +def remove_license_copyright(code): + + file_path = file_generate(code) + copyright_dict = api.get_copyrights(file_path) + license_dict = api.get_licenses(file_path) + os.remove(file_path) + ignore_lines_license = fetch_index(license_dict) + ignore_lines_copyright = fetch_index(copyright_dict) + ignore_lines = ignore_lines_license + ignore_lines_copyright + if ignore_lines != []: + ignore_lines = check_empty_comment(code, ignore_lines) + modified_code = "\n".join([line for i, line in enumerate(code.split("\n"), 0) if i not in ignore_lines]) + return modified_code, True + else: + return code, False + +def add_input_params(self, parser: ArgumentParser) -> None: + parser.add_argument( + f"--{column_cli_params}", + required=False, + type=str, + default=f"{DEFAULT_COLUMN}", + help="Name of the column holds the data to process", + ) + parser.add_argument( + f"--{license_cli_params}", + required=False, + type=lambda x: bool(str2bool(x)), + default=f"{DEFAULT_LICENSE}", + help="Set False if license should not be removed", + ) + parser.add_argument( + f"--{copyright_cli_params}", + required=False, + type=lambda x: bool(str2bool(x)), + default=f"{DEFAULT_COPYRIGHT}", + help="Set False if copyright should not be removed ", + ) + +def apply_input_params(self, args: Namespace) -> bool: + captured = CLIArgumentProvider.capture_parameters(args, cli_prefix, False) + params = params | captured + return True + +column_name = "contents" +license_remove = True +copyright_remove = True if __name__ == "__main__": # Create and configure the transform. - transform = HeaderCleanserTransform(header_cleanser_params) + def transform(table: pa.Table, file_name: str = None) -> tuple[list[pa.Table], dict]: + contents = table.column(column_name).to_pylist() + updated_content = [] + remove_code_count = 0 + for content in contents: + if license_remove and copyright_remove: + new_content, detect = remove_license_copyright(content) + if detect: + remove_code_count += 1 + updated_content.append(new_content) + + elif copyright_remove: + new_content, detect = remove_copyright(content) + if detect: + remove_code_count += 1 + updated_content.append(new_content) + + elif license_remove: + new_content, detect = remove_license(content) + if detect: + remove_code_count += 1 + updated_content.append(new_content) + + else: + return [table], {"Removed code count": remove_code_count} + + updated_content = pa.array(updated_content) + + table = table.set_column(table.column_names.index(column_name), column_name, updated_content) + + return [table], {"Removed code count": remove_code_count} try: print(f"Reading in parquet file {sys.argv[1]}") @@ -42,7 +222,7 @@ print(f"input table has {table.num_rows} rows") # Transform the table - table_list, metadata = transform.transform(table) + table_list, metadata = transform(table) print(f"\noutput table has {table_list[0].num_rows} rows") print(f"output metadata : {metadata}") pq.write_table(table_list[0], sys.argv[2]) diff --git a/demos/data-prep-kit/language/pdf2parquet/src/main.py b/demos/data-prep-kit/language/pdf2parquet/src/main.py index d92abcaa..c8db7396 100644 --- a/demos/data-prep-kit/language/pdf2parquet/src/main.py +++ b/demos/data-prep-kit/language/pdf2parquet/src/main.py @@ -32,10 +32,7 @@ import pandas as pd import pyarrow as pa import numpy as np -#from data_processing.transform import AbstractBinaryTransform, TransformConfiguration -#from data_processing.utils import TransformUtils, get_logger, str2bool -#from data_processing.utils.cli_utils import CLIArgumentProvider -#from data_processing.utils.multilock import MultiLock + from docling.backend.docling_parse_backend import DoclingParseDocumentBackend from docling.backend.docling_parse_v2_backend import DoclingParseV2DocumentBackend from docling.backend.pypdfium2_backend import PyPdfiumDocumentBackend @@ -50,10 +47,6 @@ from docling.document_converter import DocumentConverter, InputFormat, PdfFormatOption from docling.models.base_ocr_model import OcrOptions - -#logger = get_logger(__name__) -# logger = get_logger(__name__, level="DEBUG") - shortname = "pdf2parquet" cli_prefix = f"{shortname}_" pdf2parquet_batch_size_key = f"batch_size" diff --git a/demos/data-prep-kit/language/pii-redactor/src/flair_recognizer.py b/demos/data-prep-kit/language/pii-redactor/src/flair_recognizer.py index c8df1f48..3e925aff 100644 --- a/demos/data-prep-kit/language/pii-redactor/src/flair_recognizer.py +++ b/demos/data-prep-kit/language/pii-redactor/src/flair_recognizer.py @@ -6,7 +6,6 @@ from typing import List, Optional, Set, Tuple import logging -#from data_processing.utils import get_logger from flair.data import Sentence from flair.models import SequenceTagger from presidio_analyzer import AnalysisExplanation, EntityRecognizer, RecognizerResult diff --git a/demos/data-prep-kit/universal/doc-id/src/doc_id_transform_base.py b/demos/data-prep-kit/universal/doc-id/src/doc_id_transform_base.py deleted file mode 100644 index 132a3d96..00000000 --- a/demos/data-prep-kit/universal/doc-id/src/doc_id_transform_base.py +++ /dev/null @@ -1,177 +0,0 @@ -# (C) Copyright IBM Corp. 2024. -# Licensed under the Apache License, Version 2.0 (the “License”); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an “AS IS” BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -from argparse import ArgumentParser, Namespace -from typing import Any - -import pyarrow as pa - -from data_processing.transform import AbstractTableTransform, TransformConfiguration -from data_processing.utils import CLIArgumentProvider, TransformUtils, UnrecoverableException - - -class IDGenerator(): - """ - A class maintaining unique integer ids - """ - - def __init__(self, start: int=0): - """ - Initialization - :param start: starting id number - """ - self.id = start - - def get_ids(self, n_rows: int) -> int: - """ - Give out a new portion of integer ids - :param n_rows: number of required Ids - :return: starting value of blocks of ids - """ - start_id = self.id - self.id = self.id + n_rows - return start_id - - def get_current(self) -> int: - """ - Give out a new portion of integer ids - :return: current value for ID - """ - return self.id - - -short_name = "doc_id" -cli_prefix = f"{short_name}_" -doc_column_name_key = "doc_column" -hash_column_name_key = "hash_column" -int_column_name_key = "int_column" -start_id_key = "start_id" -id_generator_key = "id_generator" - -doc_column_name_cli_param = f"{cli_prefix}{doc_column_name_key}" -hash_column_name_cli_param = f"{cli_prefix}{hash_column_name_key}" -int_column_name_cli_param = f"{cli_prefix}{int_column_name_key}" -start_id_cli_param = f"{cli_prefix}{start_id_key}" - -doc_column_name_default = "contents" - - -class DocIDTransformBase(AbstractTableTransform): - """ - Implements schema modification of a pyarrow Table. - """ - - def __init__(self, config: dict[str, Any]): - """ - Initialize based on the dictionary of configuration information. - """ - # Make sure that the param name corresponds to the name used in apply_input_params method - super().__init__(config) - self.doc_column = config.get(doc_column_name_key, doc_column_name_default) - self.hash_column = config.get(hash_column_name_key, None) - self.int_column = config.get(int_column_name_key, None) - if self.hash_column is None and self.int_column is None: - raise UnrecoverableException("At least one of hash or integer column names must be specified.") - - def transform(self, table: pa.Table, file_name: str = None) -> tuple[list[pa.Table], dict[str, Any]]: - """ - Put Transform-specific to convert one Table to 0 or more tables. It also returns - a dictionary of execution statistics - arbitrary dictionary - This implementation makes no modifications so effectively implements a copy of the - input parquet to the output folder, without modification. - """ - TransformUtils.validate_columns(table=table, required=[self.doc_column]) - - if self.hash_column is not None: - # add doc id column - docs = table[self.doc_column] - doc_ids = [""] * table.num_rows - for n in range(table.num_rows): - doc_ids[n] = TransformUtils.str_to_hash(docs[n].as_py()) - table = TransformUtils.add_column(table=table, name=self.hash_column, content=doc_ids) - if self.int_column is not None: - # add integer document id - sid = self._get_starting_id(table.num_rows) - int_doc_ids = list(range(sid, table.num_rows + sid)) - table = TransformUtils.add_column(table=table, name=self.int_column, content=int_doc_ids) - return [table], {} - - def _get_starting_id(self, n_rows: int) -> int: - """ - Get starting Id - :param n_rows - number of rows in the table - :return: starting id for the table - """ - raise NotImplementedError - - -class DocIDTransformConfigurationBase(TransformConfiguration): - - """ - Provides support for configuring and using the associated Transform class include - configuration with CLI args and combining of metadata. - """ - - def __init__(self, transform_class: type[AbstractTableTransform]): - super().__init__( - name=short_name, - transform_class=transform_class, - ) - from data_processing.utils import get_logger - self.logger = get_logger(__name__) - - def add_input_params(self, parser: ArgumentParser) -> None: - """ - Add Transform-specific arguments to the given parser. - This will be included in a dictionary used to initialize the NOOPTransform. - By convention a common prefix should be used for all transform-specific CLI args - (e.g, noop_, pii_, etc.) - """ - parser.add_argument( - f"--{doc_column_name_cli_param}", - type=str, - default=doc_column_name_default, - help="doc column name" - ) - parser.add_argument( - f"--{hash_column_name_cli_param}", - type=str, - default=None, - help="Compute document hash and place in the given named column", - ) - parser.add_argument( - f"--{int_column_name_cli_param}", - type=str, - default=None, - help="Compute unique integer id and place in the given named column", - ) - parser.add_argument( - f"--{start_id_cli_param}", - type=int, - default=0, - help="starting integer id", - ) - - def apply_input_params(self, args: Namespace) -> bool: - """ - Validate and apply the arguments that have been parsed - :param args: user defined arguments. - :return: True, if validate pass or False otherwise - """ - captured = CLIArgumentProvider.capture_parameters(args, cli_prefix, False) - if captured.get(hash_column_name_key) is None and captured.get(int_column_name_key) is None: - self.logger.info("One of hash or int id column names must be specified.") - return False - - self.params = self.params | captured - self.logger.info(f"Doc id parameters are : {self.params}") - return True \ No newline at end of file diff --git a/demos/data-prep-kit/universal/doc-id/src/doc_id_transform_python.py b/demos/data-prep-kit/universal/doc-id/src/doc_id_transform_python.py deleted file mode 100644 index cbc63592..00000000 --- a/demos/data-prep-kit/universal/doc-id/src/doc_id_transform_python.py +++ /dev/null @@ -1,120 +0,0 @@ -# (C) Copyright IBM Corp. 2024. -# Licensed under the Apache License, Version 2.0 (the “License”); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an “AS IS” BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -from argparse import Namespace -from typing import Any - -from data_processing.data_access import DataAccessFactoryBase -from data_processing.transform import TransformStatistics -from data_processing.runtime.pure_python import ( - DefaultPythonTransformRuntime, - PythonTransformRuntimeConfiguration, - PythonTransformLauncher -) -from doc_id_transform_base import ( - IDGenerator, - DocIDTransformBase, - DocIDTransformConfigurationBase, - start_id_key, - id_generator_key -) - - -class DocIDTransform(DocIDTransformBase): - """ - Implements schema modification of a pyarrow Table. - """ - - def __init__(self, config: dict[str, Any]): - """ - Initialize based on the dictionary of configuration information. - """ - # Make sure that the param name corresponds to the name used in apply_input_params method - super().__init__(config) - self.id_generator = config.get(id_generator_key, IDGenerator(config.get(start_id_key, 1))) - - def _get_starting_id(self, n_rows: int) -> int: - """ - Get starting ID - :param n_rows - number of rows in the table - :return: starting id for the table - """ - return self.id_generator.get_ids(n_rows=n_rows) - - -class DocIDTransformConfiguration(DocIDTransformConfigurationBase): - - def __init__(self): - super().__init__(transform_class=DocIDTransform) - - def apply_input_params(self, args: Namespace) -> bool: - """ - Validate and apply the arguments that have been parsed - :param args: user defined arguments. - :return: True, if validate pass or False otherwise - """ - if args.runtime_num_processors > 0: - self.logger.info( - f"doc_id does not support multiprocessing. Runtime_num_processors should be 0, " - f"current {args.runtime_num_processors}" - ) - return False - return super().apply_input_params(args=args) - - -class DocIDRuntime(DefaultPythonTransformRuntime): - """ - Exact dedup runtime support - """ - - def __init__(self, params: dict[str, Any]): - super().__init__(params=params) - self.id_generator = None - - def get_transform_config( - self, data_access_factory: DataAccessFactoryBase, statistics: TransformStatistics, files: list[str] - ) -> dict[str, Any]: - """ - Get the dictionary of configuration that will be provided to the transform's initializer. - This is the opportunity for this runtime to create a new set of configuration based on the - config/params provided to this instance's initializer. This may include the addition - of new configuration data such as ray shared memory, new actors, etc., that might be needed and - expected by the transform in its initializer and/or transform() methods. - :param data_access_factory - data access factory class being used by the RayOrchestrator. - :param statistics - reference to statistics actor - :param files - list of files to process - :return: dictionary of transform init params - """ - self.id_generator = IDGenerator(self.params.get(start_id_key, 1)) - return self.params | {id_generator_key: self.id_generator} - - def compute_execution_stats(self, stats: TransformStatistics) -> None: - """ - Update/augment the given statistics object with runtime-specific additions/modifications. - :param stats: output of statistics as aggregated across all calls to all transforms. - :return: job execution statistics. These are generally reported as metadata by the Ray Orchestrator. - """ - # compute and add additional statistics - stats.add_stats({"final id": self.id_generator.get_current()}) - - -class DocIDPythonTransformRuntimeConfiguration(PythonTransformRuntimeConfiguration): - def __init__(self): - super().__init__( - transform_config=DocIDTransformConfiguration(), - runtime_class=DocIDRuntime, - ) - - -if __name__ == "__main__": - launcher = PythonTransformLauncher(DocIDPythonTransformRuntimeConfiguration()) - launcher.launch() diff --git a/demos/data-prep-kit/universal/doc-id/src/main.py b/demos/data-prep-kit/universal/doc-id/src/main.py index 0ed18261..84f5df94 100644 --- a/demos/data-prep-kit/universal/doc-id/src/main.py +++ b/demos/data-prep-kit/universal/doc-id/src/main.py @@ -12,43 +12,133 @@ import sys import pyarrow.parquet as pq +import pyarrow as pa +import hashlib +from typing import Any import os +from os import getenv -from data_processing.data_access import DataAccessLocal -from doc_id_transform_python import DocIDTransform -from doc_id_transform_base import (IDGenerator, - doc_column_name_key, - hash_column_name_key, - int_column_name_key, - id_generator_key, - ) - -doc_id_params = {doc_column_name_key: "contents", - hash_column_name_key: "hash_column", - int_column_name_key: "int_id_column", - id_generator_key: IDGenerator(5), - } -doc_column_name_key = "doc_column" -hash_column_name_key = "hash_column" -int_column_name_key = "int_column" +doc_column = "contents" +hash_column = "hash_column" +int_column = "int_id_column" start_id_key = "start_id" -if __name__ == "__main__": - # Create and configure the transform. - transform = DocIDTransform(doc_id_params) - - try: - print(f"Reading in parquet file {sys.argv[1]}") - table = pq.read_table(sys.argv[1]) - except Exception as e: - print(f"Error reading table: {e}", file=sys.stderr) - exit(1) - print(f"Done Reading in parquet file {sys.argv[1]}") - - print(f"input table has {table.num_rows} rows") - # Transform the table - table_list, metadata = transform.transform(table) - print(f"\noutput table has {table_list[0].num_rows} rows") - print(f"output metadata : {metadata}") - pq.write_table(table_list[0], sys.argv[2]) +class IDGenerator(): + """ + A class maintaining unique integer ids + """ + + def __init__(self, start: int=0): + """ + Initialization + :param start: starting id number + """ + self.id = start + + def get_ids(self, n_rows: int) -> int: + """ + Give out a new portion of integer ids + :param n_rows: number of required Ids + :return: starting value of blocks of ids + """ + start_id = self.id + self.id = self.id + n_rows + return start_id + +id_generator = IDGenerator(getenv(start_id_key, 1)) + +def add_column(table: pa.Table, name: str, content: list[Any]) -> pa.Table: + """ + Add column to the table + :param table: original table + :param name: column name + :param content: content of the column + :return: updated table, containing new column + """ + # check if column already exist and drop it + if name in table.schema.names: + table = table.drop(columns=[name]) + # append column + return table.append_column(field_=name, column=[content]) + +def validate_columns(table: pa.Table, required: list[str]) -> None: + """ + Check if required columns exist in the table + :param table: table + :param required: list of required columns + :return: None + """ + columns = table.schema.names + result = True + for r in required: + if r not in columns: + result = False + break + if not result: + raise Exception( + f"Not all required columns are present in the table - " f"required {required}, present {columns}" + ) + +def str_to_hash(val: str) -> str: + """ + compute string hash + :param val: string + :return: hash value + """ + return hashlib.sha256(val.encode("utf-8")).hexdigest() + +def normalize_string(doc: str) -> str: + """ + Normalize string + :param doc: string to normalize + :return: normalized string + """ + return doc.replace(" ", "").replace("\n", "").lower().translate(str.maketrans("", "", string.punctuation)) + +def _get_starting_id(n_rows: int) -> int: + """ + Get starting ID + :param n_rows - number of rows in the table + :return: starting id for the table + """ + return id_generator.get_ids(n_rows=n_rows) + +# Create and configure the transform. +def transform(table: pa.Table, file_name: str = None) -> tuple[list[pa.Table], dict[str, Any]]: + """ + Put Transform-specific to convert one Table to 0 or more tables. It also returns + a dictionary of execution statistics - arbitrary dictionary + This implementation makes no modifications so effectively implements a copy of the + input parquet to the output folder, without modification. + """ + validate_columns(table=table, required=[doc_column]) + + if hash_column is not None: + # add doc id column + docs = table[doc_column] + doc_ids = [""] * table.num_rows + for n in range(table.num_rows): + doc_ids[n] = str_to_hash(docs[n].as_py()) + table = add_column(table=table, name=hash_column, content=doc_ids) + if int_column is not None: + # add integer document id + sid = _get_starting_id(table.num_rows) + int_doc_ids = list(range(sid, table.num_rows + sid)) + table = add_column(table=table, name=int_column, content=int_doc_ids) + return [table], {} + +try: + print(f"Reading in parquet file {sys.argv[1]}") + table = pq.read_table(sys.argv[1]) +except Exception as e: + print(f"Error reading table: {e}", file=sys.stderr) + exit(1) + print(f"Done Reading in parquet file {sys.argv[1]}") + +print(f"input table has {table.num_rows} rows") +# Transform the table +table_list, metadata = transform(table, sys.argv[1]) +print(f"\noutput table has {table_list[0].num_rows} rows") +print(f"output metadata : {metadata}") +pq.write_table(table_list[0], sys.argv[2]) diff --git a/demos/data-prep-kit/universal/resize/src/main.py b/demos/data-prep-kit/universal/resize/src/main.py index 11c4367c..49044d96 100644 --- a/demos/data-prep-kit/universal/resize/src/main.py +++ b/demos/data-prep-kit/universal/resize/src/main.py @@ -19,12 +19,7 @@ from typing import Any import pyarrow as pa -#from data_processing.utils import ( -# LOCAL_TO_DISK, -# MB, -# CLIArgumentProvider, -# UnrecoverableException -#) + LOCAL_TO_DISK = 2 KB = 1024 MB = 1024 * KB