Skip to content

Commit

Permalink
create function to create table from teiid
Browse files Browse the repository at this point in the history
  • Loading branch information
Vitor authored and vitorbellini committed Jun 16, 2023
1 parent 0048a4f commit 01aed4e
Show file tree
Hide file tree
Showing 7 changed files with 556 additions and 370 deletions.
203 changes: 9 additions & 194 deletions fastetl/custom_functions/fast_etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,14 @@
from typing import Union, Tuple, Dict
import logging
import pandas as pd
from pandas.io.sql import DatabaseError
from psycopg2 import OperationalError
from sqlalchemy.exc import NoSuchModuleError
from sqlalchemy import Table, Column, MetaData
from sqlalchemy.engine import reflection
from sqlalchemy.sql import sqltypes as sa_types
import sqlalchemy.dialects as sa_dialects

from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook

from fastetl.custom_functions.utils.db_connection import (
DbConnection,
SourceConnection,
DestinationConnection,
get_conn_type,
get_mssql_odbc_engine,
get_hook_and_engine_by_provider,
Expand All @@ -31,68 +26,7 @@
from fastetl.custom_functions.utils.get_table_cols_name import (
get_table_cols_name,
)


class SourceConnection:
"""Represents a source connection to a database, encapsulating the
connection details (e.g., connection ID, schema, table, query)
required to read data from a database.
Args:
conn_id (str): The unique identifier of the connection to use.
schema (str, optional): The name of the schema to use.
Default is None.
table (str, optional): The name of the table to use.
Default is None.
query (str, optional): The SQL query to use. Default is None.
Raises:
ValueError: If `conn_id` is empty or if neither `query` nor
(`schema` and `table`) is provided.
Attributes:
conn_id (str): The unique identifier of the connection.
schema (str): The name of the schema.
table (str): The name of the table.
query (str): The SQL query.
"""

def __init__(
self, conn_id: str, schema: str = None, table: str = None, query: str = None
):

if not conn_id:
raise ValueError("conn_id argument cannot be empty")
if not query and not (schema or table):
raise ValueError("must provide either schema and table or query")

self.conn_id = conn_id
self.schema = schema
self.table = table
self.query = query


class DestinationConnection:
"""Represents a destination connection to a database, encapsulating
the connection details (e.g., connection ID, schema, table) required
to write data to a database.
Args:
conn_id (str): The unique identifier of the connection to use.
schema (str): The name of the schema to use.
table (str): The name of the table to use.
Attributes:
conn_id (str): The unique identifier of the connection.
schema (str): The name of the schema.
table (str): The name of the table.
"""

def __init__(self, conn_id: str, schema: str, table: str):

self.conn_id = conn_id
self.schema = schema
self.table = table
from fastetl.custom_functions.utils.create_table import create_table_if_not_exists


def build_select_sql(schema: str, table: str, column_list: str) -> str:
Expand Down Expand Up @@ -161,121 +95,6 @@ def insert_df_to_db(
index=False,
)


def create_table_if_not_exists(
source: SourceConnection,
destination: DestinationConnection,
copy_table_comments: bool,
) -> None:
"""Creates a destination table if it does not already exist and copies
data from a source table to the destination.
Args:
source (SourceConnection): A `SourceConnection` object containing
the connection details for the source database.
destination (DestinationConnection): A `DestinationConnection`
object containing the connection details for the destination
database.
copy_table_comments (bool): A flag indicating whether to copy table
and columns comments/descriptions.
Returns:
None.
Raises:
DatabaseError: If there is an error with the database connection or
query.
OperationalError: If there is an error with the database operation.
NoSuchModuleError: If a required module is missing.
"""
def _check_is_teiid(conn_id):

sql = "SELECT * FROM SYS.Tables WHERE 1=2"
hook = PostgresHook(conn_id)
check = hook.get_first(sql)[0]
if check:
return True

return False

def _convert_column(old_col: Column, db_provider: str) -> Column:
"""Convert column type.
Args:
old_col (Column): Column to convert type.
db_provider (str): Connection type. If `mssql` or `postgres`.
Returns:
Column: Column with converted type.
"""

type_mapping = {
"NUMERIC": sa_types.Numeric(38, 13),
"BIT": sa_types.Boolean(),
}

if db_provider == "mssql":
type_mapping["DATETIME"] = sa_dialects.mssql.DATETIME2()

return Column(
old_col["name"],
type_mapping.get(
str(old_col["type"]._type_affinity()), old_col["type"]._type_affinity()
),
)
if _check_is_teiid(source.conn_id):
create_table_from_teiid
else:
destination_provider = get_conn_type(destination.conn_id)

ERROR_TABLE_DOES_NOT_EXIST = {
"mssql": "Invalid object name",
"postgres": "does not exist",
}
_, source_eng = get_hook_and_engine_by_provider(source.conn_id)
destination_hook, destination_eng = get_hook_and_engine_by_provider(
destination.conn_id
)
try:
destination_hook.get_pandas_df(
f"select * from {destination.schema}.{destination.table} where 1=2"
)
except (DatabaseError, OperationalError, NoSuchModuleError) as db_error:
if not ERROR_TABLE_DOES_NOT_EXIST[destination_provider] in str(db_error):
raise db_error
# Table does not exist so we create it
source_eng.echo = True
try:
insp = reflection.Inspector.from_engine(source_eng)
except AssertionError as e: # pylint: disable=invalid-name
logging.error(
"Cannot create the table automatically from this database."
"Please create the table manually to execute data copying."
)
raise e

generic_columns = insp.get_columns(source.table, source.schema)
dest_columns = [
_convert_column(c, destination_provider) for c in generic_columns
]

destination_meta = MetaData(bind=destination_eng)
Table(
destination.table,
destination_meta,
*dest_columns,
schema=destination.schema,
)

destination_meta.create_all(destination_eng)

if copy_table_comments:
_copy_table_comments(
source=source,
destination=destination,
)


def _copy_table_comments(
source: SourceConnection, destination: DestinationConnection
) -> None:
Expand Down Expand Up @@ -442,22 +261,18 @@ def copy_db_to_db(
destination = DestinationConnection(**destination)

# create table if not exists in destination db
create_table_if_not_exists(
source,
destination,
copy_table_comments,
)
create_table_if_not_exists(source, destination)

source_provider = get_conn_type(source.conn_id)
destination_provider = get_conn_type(destination.conn_id)
if copy_table_comments:
_copy_table_comments(source, destination)

# create connections
with DbConnection(source.conn_id) as source_conn:
with DbConnection(destination.conn_id) as destination_conn:
with source_conn.cursor() as source_cur:
with destination_conn.cursor() as destination_cur:
# Fast etl
if destination_provider == "mssql":
if destination.conn_type == "mssql":
destination_conn.autocommit = False
destination_cur.fast_executemany = True
wildcard_symbol = "?"
Expand Down Expand Up @@ -485,13 +300,13 @@ def copy_db_to_db(
)

# remove quotes for mysql compatibility
if source_provider == "mysql":
if source.conn_type == "mysql":
select_sql = select_sql.replace('"', "")

# truncate stage
if destination_truncate:
destination_cur.execute(truncate)
if destination_provider == "mssql":
if destination.conn_type == "mssql":
destination_cur.commit()

# download data
Expand Down
Loading

0 comments on commit 01aed4e

Please sign in to comment.