From 01aed4e5a44f2b0612e888cadb8dae5f29c3674b Mon Sep 17 00:00:00 2001 From: Vitor Date: Fri, 16 Jun 2023 16:22:45 -0300 Subject: [PATCH] create function to create table from teiid --- fastetl/custom_functions/fast_etl.py | 203 +--------- .../utils/config/types_mapping.yml | 99 +++-- .../custom_functions/utils/create_table.py | 365 ++++++++++++++++++ .../custom_functions/utils/db_connection.py | 116 +++++- .../custom_functions/utils/table_comments.py | 41 +- fastetl/custom_functions/utils/teiid.py | 100 ----- setup.py | 2 +- 7 files changed, 556 insertions(+), 370 deletions(-) create mode 100644 fastetl/custom_functions/utils/create_table.py delete mode 100644 fastetl/custom_functions/utils/teiid.py diff --git a/fastetl/custom_functions/fast_etl.py b/fastetl/custom_functions/fast_etl.py index d7f0478..0a8a23b 100644 --- a/fastetl/custom_functions/fast_etl.py +++ b/fastetl/custom_functions/fast_etl.py @@ -9,19 +9,14 @@ from typing import Union, Tuple, Dict import logging import pandas as pd -from pandas.io.sql import DatabaseError -from psycopg2 import OperationalError -from sqlalchemy.exc import NoSuchModuleError -from sqlalchemy import Table, Column, MetaData -from sqlalchemy.engine import reflection -from sqlalchemy.sql import sqltypes as sa_types -import sqlalchemy.dialects as sa_dialects from airflow.providers.postgres.hooks.postgres import PostgresHook from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook from fastetl.custom_functions.utils.db_connection import ( DbConnection, + SourceConnection, + DestinationConnection, get_conn_type, get_mssql_odbc_engine, get_hook_and_engine_by_provider, @@ -31,68 +26,7 @@ from fastetl.custom_functions.utils.get_table_cols_name import ( get_table_cols_name, ) - - -class SourceConnection: - """Represents a source connection to a database, encapsulating the - connection details (e.g., connection ID, schema, table, query) - required to read data from a database. - - Args: - conn_id (str): The unique identifier of the connection to use. - schema (str, optional): The name of the schema to use. - Default is None. - table (str, optional): The name of the table to use. - Default is None. - query (str, optional): The SQL query to use. Default is None. - - Raises: - ValueError: If `conn_id` is empty or if neither `query` nor - (`schema` and `table`) is provided. - - Attributes: - conn_id (str): The unique identifier of the connection. - schema (str): The name of the schema. - table (str): The name of the table. - query (str): The SQL query. - """ - - def __init__( - self, conn_id: str, schema: str = None, table: str = None, query: str = None - ): - - if not conn_id: - raise ValueError("conn_id argument cannot be empty") - if not query and not (schema or table): - raise ValueError("must provide either schema and table or query") - - self.conn_id = conn_id - self.schema = schema - self.table = table - self.query = query - - -class DestinationConnection: - """Represents a destination connection to a database, encapsulating - the connection details (e.g., connection ID, schema, table) required - to write data to a database. - - Args: - conn_id (str): The unique identifier of the connection to use. - schema (str): The name of the schema to use. - table (str): The name of the table to use. - - Attributes: - conn_id (str): The unique identifier of the connection. - schema (str): The name of the schema. - table (str): The name of the table. - """ - - def __init__(self, conn_id: str, schema: str, table: str): - - self.conn_id = conn_id - self.schema = schema - self.table = table +from fastetl.custom_functions.utils.create_table import create_table_if_not_exists def build_select_sql(schema: str, table: str, column_list: str) -> str: @@ -161,121 +95,6 @@ def insert_df_to_db( index=False, ) - -def create_table_if_not_exists( - source: SourceConnection, - destination: DestinationConnection, - copy_table_comments: bool, -) -> None: - """Creates a destination table if it does not already exist and copies - data from a source table to the destination. - - Args: - source (SourceConnection): A `SourceConnection` object containing - the connection details for the source database. - destination (DestinationConnection): A `DestinationConnection` - object containing the connection details for the destination - database. - copy_table_comments (bool): A flag indicating whether to copy table - and columns comments/descriptions. - - Returns: - None. - - Raises: - DatabaseError: If there is an error with the database connection or - query. - OperationalError: If there is an error with the database operation. - NoSuchModuleError: If a required module is missing. - """ - def _check_is_teiid(conn_id): - - sql = "SELECT * FROM SYS.Tables WHERE 1=2" - hook = PostgresHook(conn_id) - check = hook.get_first(sql)[0] - if check: - return True - - return False - - def _convert_column(old_col: Column, db_provider: str) -> Column: - """Convert column type. - - Args: - old_col (Column): Column to convert type. - db_provider (str): Connection type. If `mssql` or `postgres`. - - Returns: - Column: Column with converted type. - """ - - type_mapping = { - "NUMERIC": sa_types.Numeric(38, 13), - "BIT": sa_types.Boolean(), - } - - if db_provider == "mssql": - type_mapping["DATETIME"] = sa_dialects.mssql.DATETIME2() - - return Column( - old_col["name"], - type_mapping.get( - str(old_col["type"]._type_affinity()), old_col["type"]._type_affinity() - ), - ) - if _check_is_teiid(source.conn_id): - create_table_from_teiid - else: - destination_provider = get_conn_type(destination.conn_id) - - ERROR_TABLE_DOES_NOT_EXIST = { - "mssql": "Invalid object name", - "postgres": "does not exist", - } - _, source_eng = get_hook_and_engine_by_provider(source.conn_id) - destination_hook, destination_eng = get_hook_and_engine_by_provider( - destination.conn_id - ) - try: - destination_hook.get_pandas_df( - f"select * from {destination.schema}.{destination.table} where 1=2" - ) - except (DatabaseError, OperationalError, NoSuchModuleError) as db_error: - if not ERROR_TABLE_DOES_NOT_EXIST[destination_provider] in str(db_error): - raise db_error - # Table does not exist so we create it - source_eng.echo = True - try: - insp = reflection.Inspector.from_engine(source_eng) - except AssertionError as e: # pylint: disable=invalid-name - logging.error( - "Cannot create the table automatically from this database." - "Please create the table manually to execute data copying." - ) - raise e - - generic_columns = insp.get_columns(source.table, source.schema) - dest_columns = [ - _convert_column(c, destination_provider) for c in generic_columns - ] - - destination_meta = MetaData(bind=destination_eng) - Table( - destination.table, - destination_meta, - *dest_columns, - schema=destination.schema, - ) - - destination_meta.create_all(destination_eng) - - if copy_table_comments: - _copy_table_comments( - source=source, - destination=destination, - ) - - def _copy_table_comments( source: SourceConnection, destination: DestinationConnection ) -> None: @@ -442,14 +261,10 @@ def copy_db_to_db( destination = DestinationConnection(**destination) # create table if not exists in destination db - create_table_if_not_exists( - source, - destination, - copy_table_comments, - ) + create_table_if_not_exists(source, destination) - source_provider = get_conn_type(source.conn_id) - destination_provider = get_conn_type(destination.conn_id) + if copy_table_comments: + _copy_table_comments(source, destination) # create connections with DbConnection(source.conn_id) as source_conn: @@ -457,7 +272,7 @@ def copy_db_to_db( with source_conn.cursor() as source_cur: with destination_conn.cursor() as destination_cur: # Fast etl - if destination_provider == "mssql": + if destination.conn_type == "mssql": destination_conn.autocommit = False destination_cur.fast_executemany = True wildcard_symbol = "?" @@ -485,13 +300,13 @@ def copy_db_to_db( ) # remove quotes for mysql compatibility - if source_provider == "mysql": + if source.conn_type == "mysql": select_sql = select_sql.replace('"', "") # truncate stage if destination_truncate: destination_cur.execute(truncate) - if destination_provider == "mssql": + if destination.conn_type == "mssql": destination_cur.commit() # download data diff --git a/fastetl/custom_functions/utils/config/types_mapping.yml b/fastetl/custom_functions/utils/config/types_mapping.yml index 23a5c31..e64adbc 100644 --- a/fastetl/custom_functions/utils/config/types_mapping.yml +++ b/fastetl/custom_functions/utils/config/types_mapping.yml @@ -1,118 +1,107 @@ teiid: string: postgres: + IsLengthFixed: dtype: - IsLengthFixed: - - true: CHAR - - false: VARCHAR + true: CHAR + false: VARCHAR max_length: - - 8000: MAX + 8000: MAX + length_columns: + - Length mssql: + IsLengthFixed: dtype: - IsLengthFixed: - - true: CHAR - - false: VARCHAR + true: CHAR + false: VARCHAR max_length: - - 8000: MAX + 8000: MAX + length_columns: + - Length integer: postgres: dtype: - - 'INT' + INT mssql: dtype: - - 'INT' + INT long: postgres: dtype: - - 'BIGINT' + BIGINT mssql: dtype: - - 'BIGINT' + BIGINT short: postgres: dtype: - - 'SMALLINT' + SMALLINT mssql: dtype: - - 'SMALLINT' + SMALLINT boolean: postgres: dtype: - - 'BOOLEAN' + BOOLEAN mssql: dtype: - - 'BIT' + BIT float: postgres: dtype: - - 'FLOAT8' + FLOAT8 mssql: dtype: - - 'FLOAT' + FLOAT double: postgres: dtype: - - 'FLOAT8' + FLOAT8 mssql: dtype: - - 'FLOAT' + FLOAT decimal: postgres: dtype: - - 'DECIMAL' - max_width: - - 32767 : '(38, 8)' + DECIMAL + max_length: + 32767 : '38, 8' + length_columns: + - Precision + - Scale mssql: dtype: - - 'NUMERIC' - max_width: - - 32767 : '(38, 8)' + NUMERIC + max_length: + 32767 : '38, 8' + length_columns: + - Precision + - Scale bigdecimal: postgres: dtype: - - 'FLOAT8' - mssql: - dtype: - - 'FLOAT' - date: - postgres: - dtype: - - 'DATE' - mssql: - dtype: - - 'DATE' - time: - postgres: - dtype: - - 'TIME' + FLOAT8 mssql: dtype: - - 'TIME' + FLOAT timestamp: postgres: dtype: - - 'TIMESTAMP' + TIMESTAMP mssql: dtype: - - 'DATETIME2' + DATETIME2 object: postgres: dtype: - - 'BYTEA' + BYTEA mssql: dtype: - - 'VARBINARY' + VARBINARY xml: postgres: dtype: - - 'VARBINARY' - mssql: - dtype: - - 'VARBINARY' - geometry: - postgres: - dtype: - - 'GEOMETRY' + VARBINARY mssql: dtype: - - 'GEOMETRY' + VARBINARY diff --git a/fastetl/custom_functions/utils/create_table.py b/fastetl/custom_functions/utils/create_table.py new file mode 100644 index 0000000..e73c74c --- /dev/null +++ b/fastetl/custom_functions/utils/create_table.py @@ -0,0 +1,365 @@ +""" +Create new table on the destination database based on the source database +table layout. + +Works from: + - postgres + - teiid + - mssql +To: + - postgres + - mssql +""" + +import os +import logging +import yaml +import pandas as pd +import psycopg2 +import pymssql + +from sqlalchemy import Table, Column, MetaData +from sqlalchemy.engine import reflection +from sqlalchemy.sql import sqltypes as sa_types +import sqlalchemy.dialects as sa_dialects + +from airflow.hooks.base import BaseHook + +from fastetl.custom_functions.utils.db_connection import ( + SourceConnection, + DestinationConnection, + get_hook_and_engine_by_provider, +) + + +def _execute_query(conn_id, query): + """Executes a SQL query using the specified database connection. + + Args: + conn_id (str): The connection ID or name of the database connection. + query (str): The SQL query to execute. + + Raises: + Exception: If there is an error while executing the query. + """ + + conn = BaseHook.get_connection(conn_id) + hook = conn.get_hook() + hook.run(query) + + +def _create_table_ddl(destination: DestinationConnection, df: pd.DataFrame): + """Generates a Data Definition Language (DDL) query to create a + table based on a pandas DataFrame. + + Args: + destination (DestinationConnection): The destination database + connection object containing information about the schema + and table where the table will be created. + df (pd.DataFrame): The pandas DataFrame containing the column + information for the table. + + Returns: + str: The DDL query to create the table. + """ + + sql_columns = [] + for _, row in df.iterrows(): + sql_columns.append( + f"{row['Name']} {row['DataType']}{row['converted_length']}" + ) + + query = ( + f"CREATE TABLE {destination.schema}.{destination.table} (" + f"{', '.join(sql_columns)}" + ");" + ) + + return query + + +def _convert_datatypes( + row: pd.Series, + types_mapping: dict, + source_conn_type: str, + destination_conn_type: str, +) -> pd.Series: + """Convert row(pd.Series) columns `DataType` and `converted_length` + based on mapped information (from-to) at `types_mapping` dictionary. + + Args: + row (pd.Series): table column metadata. + types_mapping (dict): dictionary with database columns metadata + (datatypes). + source_conn_type (str): source table database connection type, + as `postgres`, `mssql` or `teiid`. + destination_conn_type (str): destination table database + connection type, as `postgres`, `mssql` or `teiid`. + + Returns: + pd.Series: updated/converted row columns `DataType` and + `converted_length` values. + """ + + if row["DataType"] in types_mapping[source_conn_type]: + types_node = types_mapping[source_conn_type][row["DataType"]][ + destination_conn_type + ] + + if "IsLengthFixed" in types_node: + row["DataType"] = types_node["dtype"][row["IsLengthFixed"]] + else: + row["DataType"] = types_node["dtype"] + + if "length_columns" in types_node: + length_columns = types_node["length_columns"] + values = [str(row[key]) for key in length_columns] + row["converted_length"] = f"({','.join(values)})" + + if "max_length" in types_node: + max_length, mapped_length = next( + iter(types_node["max_length"].items()) + ) + # uses only the first length_column of a list to compare + # with column max_length + if row[length_columns[0]] >= max_length: + row["converted_length"] = f"({mapped_length})" + + return row + + +def _load_yaml(file_name: str) -> dict: + """Loads a YAML file and returns its contents as a dictionary. + + Args: + file_name (str): The name of the YAML file to load. + + Returns: + dict: A dictionary containing the contents of the YAML file. + + Raises: + FileNotFoundError: If the specified file does not exist. + yaml.YAMLError: If there is an error while parsing the YAML file. + """ + + current_path = os.path.dirname(__file__) + yaml_dict = yaml.safe_load( + open( + os.path.join(current_path, file_name), + encoding="utf-8", + ) + ) + + return yaml_dict + + +def _get_teiid_columns_datatype(source: SourceConnection) -> pd.DataFrame: + """Retrieves table columns information with data types from a Teiid + source database. + + Args: + source (SourceConnection): A `SourceConnection` object containing + the connection details for the source database. + + Returns: + pd.DataFrame: A pandas DataFrame containing the retrieved column + information. + """ + + conn = BaseHook.get_connection(source.conn_id) + hook = conn.get_hook() + + rows = hook.get_pandas_df( + f"""SELECT + TableName, + Name, + DataType, + Scale, + Length, + IsLengthFixed, + "Precision", + Description + FROM + SYS.Columns + WHERE + VDBName = '{source.conn_database}' + and SchemaName = '{source.schema}' + and TableName IN ('{source.table}') + """ + ) + + rows.replace({'"': "", "'": ""}, regex=True, inplace=True) + + return rows + + +def create_table_from_teiid( + source: SourceConnection, destination: DestinationConnection +): + """Create table at destination database when the source database + conn_type is `teiid`. + + Args: + source (SourceConnection): A `SourceConnection` object containing + the connection details for the source database. + destination (DestinationConnection): A `DestinationConnection` + object containing the connection details for the destination + database. + """ + + df_source_columns = _get_teiid_columns_datatype(source) + df_source_columns["converted_length"] = "" + types_mapping = _load_yaml("config/types_mapping.yml") + df_destination_columns = df_source_columns.apply( + _convert_datatypes, + args=( + types_mapping, + source.conn_type, + destination.conn_type, + ), + axis=1, + ) + table_ddl = _create_table_ddl(destination, df_destination_columns) + _execute_query(destination.conn_id, table_ddl) + + +def create_table_from_others( + source: SourceConnection, destination: DestinationConnection +): + """Creates a destination table if it does not already exist and copies + data from a source table to the destination. Works only with postgres + and mssql on source. + + Args: + source (SourceConnection): A `SourceConnection` object containing + the connection details for the source database. + destination (DestinationConnection): A `DestinationConnection` + object containing the connection details for the destination + database. + Returns: + None. + Raises: + DatabaseError: If there is an error with the database connection or + query. + OperationalError: If there is an error with the database operation. + NoSuchModuleError: If a required module is missing. + """ + + def _convert_column(old_col: Column, db_provider: str): + """Convert column type. + + Args: + old_col (Column): Column to convert type. + db_provider (str): Connection type. If `mssql` or `postgres`. + """ + + type_mapping = { + "NUMERIC": sa_types.Numeric(38, 13), + "BIT": sa_types.Boolean(), + } + + if db_provider == "mssql": + type_mapping["DATETIME"] = sa_dialects.mssql.DATETIME2() + + return Column( + old_col["name"], + type_mapping.get( + str(old_col["type"]._type_affinity()), + old_col["type"]._type_affinity(), + ), + ) + + # Table does not exist so we create it + _, source_eng = get_hook_and_engine_by_provider(source.conn_id) + _, destination_eng = get_hook_and_engine_by_provider(destination.conn_id) + source_eng.echo = True + try: + insp = reflection.Inspector.from_engine(source_eng) + except AssertionError as e: # pylint: disable=invalid-name + logging.error( + "Cannot create the table automatically from this database." + "Please create the table manually to execute data copying." + ) + raise e + + generic_columns = insp.get_columns(source.table, source.schema) + dest_columns = [ + _convert_column(c, destination.conn_type) for c in generic_columns + ] + + destination_meta = MetaData(bind=destination_eng) + Table( + destination.table, + destination_meta, + *dest_columns, + schema=destination.schema, + ) + + destination_meta.create_all(destination_eng) + + +def _check_if_table_exists(destination: DestinationConnection) -> bool: + """Checks if a table exists in the specified database connection. + + Args: + destination (DestinationConnection): A `DestinationConnection` + object containing the connection details for the destination + database. + + Returns: + bool: True if the table exists, False otherwise. + + Raises: + Exception: If there is an error while checking for table existence. + """ + + conn = BaseHook.get_connection(destination.conn_id) + hook = conn.get_hook() + + try: + # Query to check if the table exists + query = f""" + SELECT * + FROM {destination.schema}.{destination.table} + WHERE 1=2 + """ + hook.get_first(query) + return True + except ( # pylint: disable=invalid-name + psycopg2.errors.UndefinedTable, + pymssql._pymssql.ProgrammingError, + ) as e: + logging.info( + "Table `%s.%s` does not exists at connection `%s`. Exception: %s", + destination.schema, + destination.table, + destination.conn_id, + e, + ) + return False + + +def create_table_if_not_exists( + source: SourceConnection, destination: DestinationConnection +): + """Create table at destination database based on source database table + if it not exsists already. + + Args: + source (SourceConnection): A `SourceConnection` object containing + the connection details for the source database. + destination (DestinationConnection): A `DestinationConnection` + object containing the connection details for the destination + database. + + To Do: + * Refactor function `create_table_from_teiid(source, destination)` + to implement `create_table_from_others(source, destination)` + scenarios (source table from databases mssql and postgres) + """ + + if not _check_if_table_exists(destination): + if source.conn_type == "teiid": + create_table_from_teiid(source, destination) + else: + create_table_from_others(source, destination) diff --git a/fastetl/custom_functions/utils/db_connection.py b/fastetl/custom_functions/utils/db_connection.py index ff218f5..ef8796b 100644 --- a/fastetl/custom_functions/utils/db_connection.py +++ b/fastetl/custom_functions/utils/db_connection.py @@ -7,6 +7,7 @@ from sqlalchemy import create_engine from sqlalchemy.engine import Engine, URL import pyodbc +import psycopg2 from airflow.hooks.base import BaseHook from airflow.providers.common.sql.hooks.sql import DbApiHook @@ -37,12 +38,16 @@ def __enter__(self): try: self.conn = pyodbc.connect(self.mssql_conn_string) except Exception as exc: - raise Exception(f"{self.conn_type} connection failed.") from exc + raise Exception( + f"{self.conn_type} connection failed." + ) from exc else: try: self.conn = self.hook.get_conn() except Exception as exc: - raise Exception(f"{self.conn_type} connection failed.") from exc + raise Exception( + f"{self.conn_type} connection failed." + ) from exc return self.conn @@ -50,6 +55,79 @@ def __exit__(self, exc_type, exc_value, traceback): self.conn.close() +class SourceConnection: + """Represents a source connection to a database, encapsulating the + connection details (e.g., connection ID, schema, table, query) + required to read data from a database. + + Args: + conn_id (str): The unique identifier of the connection to use. + schema (str, optional): The name of the schema to use. + Default is None. + table (str, optional): The name of the table to use. + Default is None. + query (str, optional): The SQL query to use. Default is None. + + Raises: + ValueError: If `conn_id` is empty or if neither `query` nor + (`schema` and `table`) is provided. + + Attributes: + conn_id (str): The unique identifier of the connection. + schema (str): The name of the schema. + table (str): The name of the table. + query (str): The SQL query. + conn_type (str): Connection type/provider. + """ + + def __init__( + self, + conn_id: str, + schema: str = None, + table: str = None, + query: str = None, + ): + if not conn_id: + raise ValueError("conn_id argument cannot be empty") + if not query and not (schema or table): + raise ValueError("must provide either schema and table or query") + + self.conn_id = conn_id + self.schema = schema + self.table = table + self.query = query + self.conn_type = get_conn_type(conn_id) + conn_values = BaseHook.get_connection(conn_id) + self.conn_database = conn_values.schema + + +class DestinationConnection: + """Represents a destination connection to a database, encapsulating + the connection details (e.g., connection ID, schema, table) required + to write data to a database. + + Args: + conn_id (str): The unique identifier of the connection to use. + schema (str): The name of the schema to use. + table (str): The name of the table to use. + + Attributes: + conn_id (str): The unique identifier of the connection. + schema (str): The name of the schema. + table (str): The name of the table. + conn_type (str): Connection type/provider. + """ + + def __init__(self, conn_id: str, schema: str, table: str): + self.conn_id = conn_id + self.schema = schema + self.table = table + self.conn_type = get_conn_type(conn_id) + conn_values = BaseHook.get_connection(conn_id) + self.conn_database = conn_values.schema + + + def get_mssql_odbc_conn_str(conn_id: str, raw_str: bool = False) -> str: """ Creates a default SQL Server database connection string @@ -78,7 +156,9 @@ def get_mssql_odbc_conn_str(conn_id: str, raw_str: bool = False) -> str: if raw_str: return mssql_conn_str - connection_url = URL.create("mssql+pyodbc", query={"odbc_connect": mssql_conn_str}) + connection_url = URL.create( + "mssql+pyodbc", query={"odbc_connect": mssql_conn_str} + ) return connection_url @@ -120,6 +200,31 @@ def get_hook_and_engine_by_provider(conn_id: str) -> Tuple[DbApiHook, Engine]: return hook, engine +def check_is_teiid(conn_id: str) -> bool: + """Checks if a given connection is a Teiid connection. + + Args: + conn_id (str): The connection ID or name. + + Returns: + bool: True if the connection is a Teiid connection, + False otherwise. + + Raises: + Exception: If there is an error while checking the connection. + """ + + conn = BaseHook.get_connection(conn_id) + hook = conn.get_hook() + + try: + # access teiid SYS.Tables(specific on teiid) + hook.get_first("SELECT * FROM SYS.Tables WHERE 1=2") + return True + except psycopg2.errors.UndefinedTable: + return False + + def get_conn_type(conn_id: str) -> str: """Get connection type from Airflow connections. @@ -127,10 +232,13 @@ def get_conn_type(conn_id: str) -> str: conn_id (str): Airflow connection id. Returns: - str: type of connection. Ex: mssql, postgres, ... + str: type of connection. Ex: mssql, postgres, teiid, ... """ conn_values = BaseHook.get_connection(conn_id) conn_type = conn_values.conn_type + if conn_type == "postgres" and check_is_teiid(conn_id): + conn_type = "teiid" + return conn_type diff --git a/fastetl/custom_functions/utils/table_comments.py b/fastetl/custom_functions/utils/table_comments.py index 5c14cae..2e71f6d 100644 --- a/fastetl/custom_functions/utils/table_comments.py +++ b/fastetl/custom_functions/utils/table_comments.py @@ -12,8 +12,13 @@ from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook from airflow.providers.postgres.hooks.postgres import PostgresHook -from fastetl.custom_functions.utils.db_connection import get_hook_and_engine_by_provider -from fastetl.custom_functions.utils.get_table_cols_name import get_table_cols_name +from fastetl.custom_functions.utils.db_connection import ( + get_hook_and_engine_by_provider, + get_conn_type, +) +from fastetl.custom_functions.utils.get_table_cols_name import ( + get_table_cols_name, +) class TableComments: @@ -36,7 +41,7 @@ def __init__(self, conn_id: str, schema: str, table: str): self.schema = schema self.table = table conn_values = BaseHook.get_connection(conn_id) - self.conn_type = conn_values.conn_type + self.conn_type = get_conn_type(conn_id) self.conn_database = conn_values.schema self.table_comments_init = pd.DataFrame( columns=["database_level", "name", "comment"] @@ -91,7 +96,9 @@ def _get_mssql_table_comments(self) -> pd.DataFrame: ) rows_df["comment"] = rows_df["comment"].str.decode("utf-8") rows_df["database_level"] = database_level - table_comments = pd.concat([table_comments, rows_df], ignore_index=True) + table_comments = pd.concat( + [table_comments, rows_df], ignore_index=True + ) return table_comments @@ -129,7 +136,9 @@ def _get_pg_table_comments(self) -> pd.DataFrame: ignore_index=True, ) - columns_info = inspector.get_columns(table_name=self.table, schema=self.schema) + columns_info = inspector.get_columns( + table_name=self.table, schema=self.schema + ) for row in columns_info: table_comments = pd.concat( [ @@ -181,10 +190,13 @@ def _get_teiid_table_comments(self) -> pd.DataFrame: for database_level, query in queries.items(): rows_df = pg_hook.get_pandas_df(query) rows_df.rename( - columns={"Name": "name", "Description": "comment"}, inplace=True + columns={"Name": "name", "Description": "comment"}, + inplace=True, ) rows_df["database_level"] = database_level - table_comments = pd.concat([table_comments, rows_df], ignore_index=True) + table_comments = pd.concat( + [table_comments, rows_df], ignore_index=True + ) return table_comments @@ -383,7 +395,9 @@ def _put_pg_table_comments(self) -> None: if not comment.empty: op.create_table_comment( - table_name=self.table, schema=self.schema, comment=comment.values[0] + table_name=self.table, + schema=self.schema, + comment=comment.values[0], ) # Part 2 - write columns comments @@ -418,15 +432,10 @@ def get_table_comments_df(self): if self.conn_type == "mssql": table_comments = self._get_mssql_table_comments() - elif self.conn_type == "postgres": - # Postgres Connection - try: - table_comments = self._get_pg_table_comments() - - # teiid driver - except: - table_comments = self._get_teiid_table_comments() + table_comments = self._get_pg_table_comments() + elif self.conn_type == "teiid": + table_comments = self._get_teiid_table_comments() else: raise NotImplementedError( "Database connection type not implemented. PR for the best." diff --git a/fastetl/custom_functions/utils/teiid.py b/fastetl/custom_functions/utils/teiid.py deleted file mode 100644 index 1aa930a..0000000 --- a/fastetl/custom_functions/utils/teiid.py +++ /dev/null @@ -1,100 +0,0 @@ -""" -Class and functions for Teiid Database Server -""" - - -import pyodbc -import yaml -import os -import pandas as pd -from typing import Tuple -from sqlalchemy import create_engine -from sqlalchemy.engine import Engine, URL -from airflow.hooks.base import BaseHook -from airflow.providers.common.sql.hooks.sql import DbApiHook -from airflow.providers.postgres.hooks.postgres import PostgresHook -from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook -from airflow.providers.mysql.hooks.mysql import MySqlHook -from fastetl.custom_functions.fast_etl import SourceConnection, DestinationConnection -from fastetl.custom_functions.fast_etl import get_conn_type -# class Teiid: -# """ -# Gera as conexões de origem do Quartzo (Teiid) -# """ - -# def __init__(self, conn_id: str): -# self.conn_type = get_conn_type(conn_id) -# self.conn = None -# self.hook, _ = get_hook_and_engine_by_provider(conn_id) - - -def check_is_teiid(conn_id): - sql = "SELECT * FROM SYS.Tables WHERE 1=2" - hook = PostgresHook(conn_id) - check = hook.get_first(sql)[0] - if check: - return True - - return False - - -def _get_tables_n_columns_from_teiid(conn_id) -> pd.DataFrame: - """ - Pega uma lista de metadados das colunas do banco quartzo. - """ - - pg_hook = PostgresHook(conn_id) - rows = pg_hook.get_pandas_df( - f"""SELECT - TableName, - Name, - DataType, - Scale, - Length, - IsLengthFixed, - "Precision", - Description - FROM - SYS.Columns - WHERE - VDBName = '{vdb}' - and SchemaName = '{vdb}_VBL' - and TableName IN ('{tables}') - """ - ) - - rows.replace({'"': "", "'": ""}, regex=True, inplace=True) - - return rows - -def map_datatypes(mapping: dict, row: pd.Series, destination: str) -> pd.Series: - - - #row["DataType"] = mapping["teiid"][destination].get(row["DataType"], row["DataType"]) - - if row['DataType'] in mapping["teiid"][destination]: - - - - - rows.loc[index] = row - - return row - -def create_table(source: SourceConnection, destination: DestinationConnection): - df_source_columns = _get_tables_n_columns_from_teiid(source.conn_id) - - df_source_columns["new_width"] = None - current_path = os.path.dirname(__file__) - - mapping = yaml.safe_load( - open(os.path.join(current_path, "config", "types_mapping.yml")) - ) - destination_provider = get_conn_type(destination.conn_id) - - df_dest_columns = df_source_columns.apply(map_datatypes, axis=1, mapping=mapping, destination=destination_provider) - - - -# def __exit__(self, exc_type, exc_value, traceback): -# self.conn.close() diff --git a/setup.py b/setup.py index 9a84002..459f12b 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ with open("README.md", "r") as fh: long_description = fh.read() -__version__ = "0.0.11" +__version__ = "0.0.12" """Perform the package apache-airflow-providers-fastetl setup.""" setup(