From e375961531fda9c77c13c6415634c8822832dfad Mon Sep 17 00:00:00 2001 From: Sophie Glinton Date: Wed, 20 Nov 2024 13:29:20 +0000 Subject: [PATCH 1/8] admin.py update --- justfile | 2 +- sample.datasets.toml | 2 +- sample.env | 2 +- src/matchbox/admin.py | 14 +++++++++----- src/matchbox/server/base.py | 6 +++++- 5 files changed, 17 insertions(+), 9 deletions(-) diff --git a/justfile b/justfile index d70cb517..facbdd5e 100644 --- a/justfile +++ b/justfile @@ -1,6 +1,6 @@ # Make datasets table matchbox: - uv run python src/matchbox/admin.py --datasets datasets.toml + uv run python src/matchbox/admin.py datasets.toml # Delete all compiled Python files clean: diff --git a/sample.datasets.toml b/sample.datasets.toml index ccc84a0d..38bb62f8 100644 --- a/sample.datasets.toml +++ b/sample.datasets.toml @@ -1,6 +1,6 @@ [warehouses.pg_warehouse] db_type = "postgresql" -username = "your_username" +user = "your_username" password = "your_password" host = "localhost" port = 5432 diff --git a/sample.env b/sample.env index 999b0986..ed8f5c5b 100644 --- a/sample.env +++ b/sample.env @@ -1,5 +1,5 @@ MB__BATCH_SIZE=250_000 -MB__BACKEND_TYPE=postgres +MB__BACKEND_TYPE=postgresql MB__DATASETS_CONFIG=datasets.toml MB__POSTGRES__HOST= diff --git a/src/matchbox/admin.py b/src/matchbox/admin.py index 725eb1da..00f56c58 100644 --- a/src/matchbox/admin.py +++ b/src/matchbox/admin.py @@ -5,19 +5,21 @@ import tomli from dotenv import find_dotenv, load_dotenv +from matchbox.server import MatchboxDBAdapter, inject_backend from matchbox.server.base import ( - MatchboxSettings, Source, ) from matchbox.server.models import SourceWarehouse +logger = logging.getLogger("admin_pipeline") + dotenv_path = find_dotenv(usecwd=True) load_dotenv(dotenv_path) def load_datasets_from_config(datasets: Path) -> dict[str, Source]: """Loads datasets for indexing from the datasets settings TOML file.""" - config = tomli.load(datasets) + config = tomli.loads(datasets.read_text()) warehouses: dict[str, SourceWarehouse] = {} for alias, warehouse_config in config["warehouses"].items(): @@ -36,12 +38,14 @@ def load_datasets_from_config(datasets: Path) -> dict[str, Source]: @click.argument( "datasets", type=click.Path(exists=True, dir_okay=False, path_type=Path) ) -def make_cmf(datasets: Path) -> None: - backend = MatchboxSettings().backend +@inject_backend +def make_cmf(backend: MatchboxDBAdapter, datasets: Path) -> None: dataset_dict = load_datasets_from_config(datasets=datasets) for dataset in dataset_dict.values(): - backend.index(dataset=dataset, engine=dataset.database.engine) + logger.info(f"Indexing {dataset}") + backend.index(dataset=dataset) + logger.info(f"Finished indexing {dataset}") if __name__ == "__main__": diff --git a/src/matchbox/server/base.py b/src/matchbox/server/base.py index 9e17a81d..cdd17825 100644 --- a/src/matchbox/server/base.py +++ b/src/matchbox/server/base.py @@ -14,6 +14,7 @@ cast, ) +from dotenv import find_dotenv, load_dotenv from pydantic import Field from pydantic_settings import BaseSettings, SettingsConfigDict from rustworkx import PyDiGraph @@ -36,6 +37,9 @@ Results = Any +dotenv_path = find_dotenv(usecwd=True) +load_dotenv(dotenv_path, override=True) + R = TypeVar("R") P = ParamSpec("P") @@ -43,7 +47,7 @@ class MatchboxBackends(StrEnum): """The available backends for Matchbox.""" - POSTGRES = "postgres" + POSTGRES = "postgresql" class MatchboxSettings(BaseSettings): From 7a23b5a4efe7e2213df6cd88dc41314ad77607c7 Mon Sep 17 00:00:00 2001 From: Sophie Glinton Date: Fri, 29 Nov 2024 10:34:26 +0000 Subject: [PATCH 2/8] Logging for benchmarking --- src/matchbox/admin.py | 4 +++- src/matchbox/common/hash.py | 10 ++++++++ src/matchbox/helpers/selector.py | 11 ++++++++- src/matchbox/server/models.py | 23 ++++++++++++++++++- src/matchbox/server/postgresql/adapter.py | 6 +++++ .../server/postgresql/utils/insert.py | 8 ++++++- src/matchbox/server/postgresql/utils/query.py | 12 ++++++++++ 7 files changed, 70 insertions(+), 4 deletions(-) diff --git a/src/matchbox/admin.py b/src/matchbox/admin.py index 00f56c58..0d538cc7 100644 --- a/src/matchbox/admin.py +++ b/src/matchbox/admin.py @@ -1,5 +1,6 @@ import logging from pathlib import Path +from datetime import datetime import click import tomli @@ -50,7 +51,8 @@ def make_cmf(backend: MatchboxDBAdapter, datasets: Path) -> None: if __name__ == "__main__": logging.basicConfig( + filename=f".logs/admin_pipeline_{datetime.now()}.log", level=logging.INFO, - format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + format="%(asctime)s - %(name)s - %(filename)s->%(funcName)s():%(lineno)s - %(levelname)s - %(message)s", ) make_cmf() diff --git a/src/matchbox/common/hash.py b/src/matchbox/common/hash.py index 3150f95b..577bcffe 100644 --- a/src/matchbox/common/hash.py +++ b/src/matchbox/common/hash.py @@ -16,10 +16,16 @@ HASH_FUNC = hashlib.sha256 +# TEMPORARY +import logging + +logger = logging.getLogger("cmf_pipelines") + def dataset_to_hashlist(dataset: Source, model_hash: bytes) -> list[dict[str, Any]]: """Retrieve and hash a dataset from its warehouse, ready to be inserted.""" with Session(dataset.database.engine) as warehouse_session: + logger.info("Matchbox: Start of dataset_to_hashlist") source_table = dataset.to_table() # Exclude the primary key from the columns to be hashed @@ -27,13 +33,16 @@ def dataset_to_hashlist(dataset: Source, model_hash: bytes) -> list[dict[str, An [col for col in list(source_table.c.keys()) if col != dataset.db_pk] ) + logger.info("Matchbox: select()") slct_stmt = select( func.concat(*source_table.c[cols]).label("raw"), func.array_agg(source_table.c[dataset.db_pk].cast(String)).label("id"), ).group_by(*source_table.c[cols]) + logger.info("Matchbox: execute() slct_stmt with array_agg of cols") raw_result = warehouse_session.execute(slct_stmt) + logger.info("Matchbox: hash_data()") to_insert = [ { "hash": hash_data(data.raw), @@ -43,6 +52,7 @@ def dataset_to_hashlist(dataset: Source, model_hash: bytes) -> list[dict[str, An for data in raw_result.all() ] + logger.info("Matchbox: End of dataset_to_hashlist") return to_insert diff --git a/src/matchbox/helpers/selector.py b/src/matchbox/helpers/selector.py index 41b3632b..57301ef6 100644 --- a/src/matchbox/helpers/selector.py +++ b/src/matchbox/helpers/selector.py @@ -8,6 +8,11 @@ from matchbox.server import MatchboxDBAdapter, inject_backend from matchbox.server.models import Source +# TEMPORARY +import logging + +logger = logging.getLogger("cmf_pipelines") + @inject_backend def selector( @@ -26,10 +31,14 @@ def selector( Returns: A dictionary of the validated Source and fields """ + logger.info("Matchbox: Start of selector()") + logger.info("Matchbox: get_schema_table_names()") db_schema, db_table = get_schema_table_names(table, validate=True) + logger.info("Matchbox: backend.get_dataset()") dataset = backend.get_dataset(db_schema=db_schema, db_table=db_table, engine=engine) # Validate the fields + logger.info("Matchbox: inspect()") inspector = inspect(engine) all_cols = set( column["name"] @@ -38,7 +47,7 @@ def selector( selected_cols = set(fields) if not selected_cols <= all_cols: raise ValueError(f"{selected_cols.difference(all_cols)} not found in {dataset}") - + logger.info("Matchbox: End of selector()") return {dataset: fields} diff --git a/src/matchbox/server/models.py b/src/matchbox/server/models.py index 4c63ff11..9b290caa 100644 --- a/src/matchbox/server/models.py +++ b/src/matchbox/server/models.py @@ -20,6 +20,11 @@ T = TypeVar("T") +# TEMPORARY +import logging + +logger = logging.getLogger("cmf_pipelines") + class Probability(BaseModel): """A probability of a match in the Matchbox database. @@ -126,8 +131,12 @@ def __hash__(self) -> int: def to_table(self) -> Table: """Returns the dataset as a SQLAlchemy Table object.""" + logger.info("Matchbox: Start of to_table()") + logger.info("Matchbox: MetaData()") metadata = MetaData(schema=self.db_schema) + logger.info("Matchbox: Table()") table = Table(self.db_table, metadata, autoload_with=self.database.engine) + logger.info("Matchbox: End of to_table()") return table def _select( @@ -154,10 +163,19 @@ def _select( def to_hash(self) -> bytes: """Generate a unique hash based on the table's columns and datatypes.""" + logger.info("Matchbox: Start of to_hash()") + logger.info("Matchbox: to_table()") table = self.to_table() - schema_representation = ",".join( + + # Original + # schema_representation = ",".join( + # f"{col.name}:{str(col.type)}" for col in table.columns + # ) + + schema_representation = f"{str(self)}: " + ",".join( f"{col.name}:{str(col.type)}" for col in table.columns ) + logger.info("Matchbox: End of to_hash()") return HASH_FUNC(schema_representation.encode("utf-8")).digest() def to_arrow( @@ -177,5 +195,8 @@ def to_pandas( limit: int | None = None, ) -> DataFrame: """Returns the dataset as a pandas DataFrame.""" + logger.info("Matchbox: Start of to_pandas()") + logger.info("Matchbox: _select()") stmt = self._select(fields=fields, pks=pks, limit=limit) + logger.info("Matchbox: sql_to_df") return sql_to_df(stmt, self.database.engine, return_type="pandas") diff --git a/src/matchbox/server/postgresql/adapter.py b/src/matchbox/server/postgresql/adapter.py index e7f56127..0fab04f1 100644 --- a/src/matchbox/server/postgresql/adapter.py +++ b/src/matchbox/server/postgresql/adapter.py @@ -34,6 +34,11 @@ get_model_probabilities, ) +# TEMPORARY +import logging + +logger = logging.getLogger("cmf_pipelines") + if TYPE_CHECKING: from pandas import DataFrame as PandasDataFrame from polars import DataFrame as PolarsDataFrame @@ -299,6 +304,7 @@ def index(self, dataset: Source) -> None: dataset: The dataset to index. engine: The SQLAlchemy engine of your data warehouse. """ + logger.info("Matchbox: insert_dataset()") insert_dataset( dataset=dataset, engine=MBDB.get_engine(), diff --git a/src/matchbox/server/postgresql/utils/insert.py b/src/matchbox/server/postgresql/utils/insert.py index 65d8b8f3..2a9bd2a4 100644 --- a/src/matchbox/server/postgresql/utils/insert.py +++ b/src/matchbox/server/postgresql/utils/insert.py @@ -30,6 +30,7 @@ def insert_dataset(dataset: Source, engine: Engine, batch_size: int) -> None: """Indexes a dataset from your data warehouse within Matchbox.""" + logic_logger.info("Matchbox: Start of insert_dataset()") db_logger = logging.getLogger("sqlalchemy.engine") db_logger.setLevel(logging.WARNING) @@ -37,6 +38,7 @@ def insert_dataset(dataset: Source, engine: Engine, batch_size: int) -> None: # Insert dataset # ################## + logic_logger.info("Matchbox: to_hash()") model_hash = dataset.to_hash() model_data = { @@ -52,6 +54,7 @@ def insert_dataset(dataset: Source, engine: Engine, batch_size: int) -> None: "id": dataset.db_pk, } + logic_logger.info("Matchbox: dataset_to_hashlist()") clusters = dataset_to_hashlist(dataset=dataset, model_hash=model_hash) with engine.connect() as conn: @@ -59,6 +62,7 @@ def insert_dataset(dataset: Source, engine: Engine, batch_size: int) -> None: # Upsert into Models table models_stmt = insert(Models).values([model_data]) + logic_logger.info("Matchbox: on_conflict_do_update()") models_stmt = models_stmt.on_conflict_do_update( index_elements=["hash"], set_={ @@ -80,13 +84,15 @@ def insert_dataset(dataset: Source, engine: Engine, batch_size: int) -> None: "id": sources_stmt.excluded.id, }, ) - conn.execute(sources_stmt) + logic_logger.info("Matchbox: sources_stmt") + result = conn.execute(sources_stmt) conn.commit() logic_logger.info(f"{dataset} added to Sources table") # Upsert into Clusters table + logic_logger.info("Matchbox: batch_ingest()") batch_ingest( records=[(clus["hash"], clus["dataset"], clus["id"]) for clus in clusters], table=Clusters, diff --git a/src/matchbox/server/postgresql/utils/query.py b/src/matchbox/server/postgresql/utils/query.py index 18add154..f9ad0443 100644 --- a/src/matchbox/server/postgresql/utils/query.py +++ b/src/matchbox/server/postgresql/utils/query.py @@ -31,6 +31,9 @@ logic_logger = logging.getLogger("mb_logic") +# Temporary +pipelines_logger = logging.getLogger("cmf_pipelines") + def hash_to_hex_decode(hash: bytes) -> bytes: """A workround for PostgreSQL so we can compile the query and use ConnectorX.""" @@ -258,6 +261,7 @@ def query( A table containing the requested data from each table, unioned together, with the hash key of each row in Matchbox, in the requested return type """ + pipelines_logger.info("Matchbox: Start of query()") tables: list[pa.Table] = [] if limit: @@ -266,6 +270,7 @@ def query( with Session(engine) as session: # If a model was specified, validate and retrieve it + pipelines_logger.info("Matchbox: query() - Retrive model") truth_model = None if model is not None: truth_model = session.query(Models).filter(Models.name == model).first() @@ -275,6 +280,7 @@ def query( # Process each source dataset for source, fields in selector.items(): # Get the dataset model + pipelines_logger.info("Matchbox: query() - Retrieve dataset model") dataset = ( session.query(Models) .join(Sources, Sources.model == Models.hash) @@ -291,6 +297,7 @@ def query( db_schema=source.db_schema, db_table=source.db_table ) + pipelines_logger.info("Matchbox: _resolve_cluster_hierarchy()") hash_query = _resolve_cluster_hierarchy( dataset_hash=dataset.hash, model=truth_model if truth_model else dataset, @@ -305,14 +312,17 @@ def query( limit_remainder -= 1 hash_query = hash_query.limit(limit_base + remain) + pipelines_logger.info("Matchbox: sql_to_df()") # Get cluster assignments mb_hashes = sql_to_df(hash_query, engine, return_type="arrow") + pipelines_logger.info("Matchbox: to_arrow()") # Get source data raw_data = source.to_arrow( fields=set([source.db_pk] + fields), pks=mb_hashes["id"].to_pylist() ) + pipelines_logger.info("Matchbox: key_to_sqlalchemy_label()") # Join and select columns joined_table = raw_data.join( right_table=mb_hashes, @@ -321,6 +331,7 @@ def query( join_type="inner", ) + pipelines_logger.info("Matchbox: key_to_sqlalchemy_label() - keep cols") keep_cols = ["hash"] + [key_to_sqlalchemy_label(f, source) for f in fields] match_cols = [col for col in joined_table.column_names if col in keep_cols] @@ -329,6 +340,7 @@ def query( # Combine results result = pa.concat_tables(tables, promote_options="default") + pipelines_logger.info("Matchbox: End of query()") # Return in requested format if return_type == "arrow": return result From 5129f94e847e00a7d740082cb64e65a5e3ba689e Mon Sep 17 00:00:00 2001 From: Sophie Glinton Date: Mon, 2 Dec 2024 13:03:24 +0000 Subject: [PATCH 3/8] index_dataset added to admin.py --- justfile | 2 +- src/matchbox/admin.py | 19 +++++++++++-------- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/justfile b/justfile index facbdd5e..d70cb517 100644 --- a/justfile +++ b/justfile @@ -1,6 +1,6 @@ # Make datasets table matchbox: - uv run python src/matchbox/admin.py datasets.toml + uv run python src/matchbox/admin.py --datasets datasets.toml # Delete all compiled Python files clean: diff --git a/src/matchbox/admin.py b/src/matchbox/admin.py index 0d538cc7..c93dda4d 100644 --- a/src/matchbox/admin.py +++ b/src/matchbox/admin.py @@ -18,6 +18,11 @@ load_dotenv(dotenv_path) +@inject_backend +def index_dataset(backend: MatchboxDBAdapter, dataset: Source) -> None: + backend.index(dataset=dataset, engine=dataset.database.engine) + + def load_datasets_from_config(datasets: Path) -> dict[str, Source]: """Loads datasets for indexing from the datasets settings TOML file.""" config = tomli.loads(datasets.read_text()) @@ -40,19 +45,17 @@ def load_datasets_from_config(datasets: Path) -> dict[str, Source]: "datasets", type=click.Path(exists=True, dir_okay=False, path_type=Path) ) @inject_backend -def make_cmf(backend: MatchboxDBAdapter, datasets: Path) -> None: - dataset_dict = load_datasets_from_config(datasets=datasets) - - for dataset in dataset_dict.values(): +def make_matchbox(backend: MatchboxDBAdapter, datasets: Path) -> None: + + for dataset in datasets: logger.info(f"Indexing {dataset}") - backend.index(dataset=dataset) + index_dataset(Source(dataset)) logger.info(f"Finished indexing {dataset}") if __name__ == "__main__": logging.basicConfig( - filename=f".logs/admin_pipeline_{datetime.now()}.log", level=logging.INFO, - format="%(asctime)s - %(name)s - %(filename)s->%(funcName)s():%(lineno)s - %(levelname)s - %(message)s", + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) - make_cmf() + make_matchbox() From c01d23dec531d48d54fa7919e6f7b6bdb36922f2 Mon Sep 17 00:00:00 2001 From: Sophie Glinton Date: Mon, 2 Dec 2024 13:33:32 +0000 Subject: [PATCH 4/8] Remove logging --- src/matchbox/admin.py | 10 +++++----- src/matchbox/common/hash.py | 10 ---------- src/matchbox/helpers/selector.py | 11 +--------- src/matchbox/server/models.py | 20 ------------------- src/matchbox/server/postgresql/adapter.py | 6 ------ .../server/postgresql/utils/insert.py | 8 +------- src/matchbox/server/postgresql/utils/query.py | 12 ----------- 7 files changed, 7 insertions(+), 70 deletions(-) diff --git a/src/matchbox/admin.py b/src/matchbox/admin.py index c93dda4d..f153bc10 100644 --- a/src/matchbox/admin.py +++ b/src/matchbox/admin.py @@ -1,6 +1,5 @@ import logging from pathlib import Path -from datetime import datetime import click import tomli @@ -20,7 +19,7 @@ @inject_backend def index_dataset(backend: MatchboxDBAdapter, dataset: Source) -> None: - backend.index(dataset=dataset, engine=dataset.database.engine) + backend.index(dataset=dataset) def load_datasets_from_config(datasets: Path) -> dict[str, Source]: @@ -46,10 +45,11 @@ def load_datasets_from_config(datasets: Path) -> dict[str, Source]: ) @inject_backend def make_matchbox(backend: MatchboxDBAdapter, datasets: Path) -> None: - - for dataset in datasets: + dataset_dict = load_datasets_from_config(datasets=datasets) + + for dataset in dataset_dict.values(): logger.info(f"Indexing {dataset}") - index_dataset(Source(dataset)) + index_dataset(dataset) logger.info(f"Finished indexing {dataset}") diff --git a/src/matchbox/common/hash.py b/src/matchbox/common/hash.py index 577bcffe..3150f95b 100644 --- a/src/matchbox/common/hash.py +++ b/src/matchbox/common/hash.py @@ -16,16 +16,10 @@ HASH_FUNC = hashlib.sha256 -# TEMPORARY -import logging - -logger = logging.getLogger("cmf_pipelines") - def dataset_to_hashlist(dataset: Source, model_hash: bytes) -> list[dict[str, Any]]: """Retrieve and hash a dataset from its warehouse, ready to be inserted.""" with Session(dataset.database.engine) as warehouse_session: - logger.info("Matchbox: Start of dataset_to_hashlist") source_table = dataset.to_table() # Exclude the primary key from the columns to be hashed @@ -33,16 +27,13 @@ def dataset_to_hashlist(dataset: Source, model_hash: bytes) -> list[dict[str, An [col for col in list(source_table.c.keys()) if col != dataset.db_pk] ) - logger.info("Matchbox: select()") slct_stmt = select( func.concat(*source_table.c[cols]).label("raw"), func.array_agg(source_table.c[dataset.db_pk].cast(String)).label("id"), ).group_by(*source_table.c[cols]) - logger.info("Matchbox: execute() slct_stmt with array_agg of cols") raw_result = warehouse_session.execute(slct_stmt) - logger.info("Matchbox: hash_data()") to_insert = [ { "hash": hash_data(data.raw), @@ -52,7 +43,6 @@ def dataset_to_hashlist(dataset: Source, model_hash: bytes) -> list[dict[str, An for data in raw_result.all() ] - logger.info("Matchbox: End of dataset_to_hashlist") return to_insert diff --git a/src/matchbox/helpers/selector.py b/src/matchbox/helpers/selector.py index 57301ef6..41b3632b 100644 --- a/src/matchbox/helpers/selector.py +++ b/src/matchbox/helpers/selector.py @@ -8,11 +8,6 @@ from matchbox.server import MatchboxDBAdapter, inject_backend from matchbox.server.models import Source -# TEMPORARY -import logging - -logger = logging.getLogger("cmf_pipelines") - @inject_backend def selector( @@ -31,14 +26,10 @@ def selector( Returns: A dictionary of the validated Source and fields """ - logger.info("Matchbox: Start of selector()") - logger.info("Matchbox: get_schema_table_names()") db_schema, db_table = get_schema_table_names(table, validate=True) - logger.info("Matchbox: backend.get_dataset()") dataset = backend.get_dataset(db_schema=db_schema, db_table=db_table, engine=engine) # Validate the fields - logger.info("Matchbox: inspect()") inspector = inspect(engine) all_cols = set( column["name"] @@ -47,7 +38,7 @@ def selector( selected_cols = set(fields) if not selected_cols <= all_cols: raise ValueError(f"{selected_cols.difference(all_cols)} not found in {dataset}") - logger.info("Matchbox: End of selector()") + return {dataset: fields} diff --git a/src/matchbox/server/models.py b/src/matchbox/server/models.py index 9b290caa..62603a00 100644 --- a/src/matchbox/server/models.py +++ b/src/matchbox/server/models.py @@ -20,11 +20,6 @@ T = TypeVar("T") -# TEMPORARY -import logging - -logger = logging.getLogger("cmf_pipelines") - class Probability(BaseModel): """A probability of a match in the Matchbox database. @@ -131,12 +126,8 @@ def __hash__(self) -> int: def to_table(self) -> Table: """Returns the dataset as a SQLAlchemy Table object.""" - logger.info("Matchbox: Start of to_table()") - logger.info("Matchbox: MetaData()") metadata = MetaData(schema=self.db_schema) - logger.info("Matchbox: Table()") table = Table(self.db_table, metadata, autoload_with=self.database.engine) - logger.info("Matchbox: End of to_table()") return table def _select( @@ -163,19 +154,11 @@ def _select( def to_hash(self) -> bytes: """Generate a unique hash based on the table's columns and datatypes.""" - logger.info("Matchbox: Start of to_hash()") - logger.info("Matchbox: to_table()") table = self.to_table() - # Original - # schema_representation = ",".join( - # f"{col.name}:{str(col.type)}" for col in table.columns - # ) - schema_representation = f"{str(self)}: " + ",".join( f"{col.name}:{str(col.type)}" for col in table.columns ) - logger.info("Matchbox: End of to_hash()") return HASH_FUNC(schema_representation.encode("utf-8")).digest() def to_arrow( @@ -195,8 +178,5 @@ def to_pandas( limit: int | None = None, ) -> DataFrame: """Returns the dataset as a pandas DataFrame.""" - logger.info("Matchbox: Start of to_pandas()") - logger.info("Matchbox: _select()") stmt = self._select(fields=fields, pks=pks, limit=limit) - logger.info("Matchbox: sql_to_df") return sql_to_df(stmt, self.database.engine, return_type="pandas") diff --git a/src/matchbox/server/postgresql/adapter.py b/src/matchbox/server/postgresql/adapter.py index 0fab04f1..e7f56127 100644 --- a/src/matchbox/server/postgresql/adapter.py +++ b/src/matchbox/server/postgresql/adapter.py @@ -34,11 +34,6 @@ get_model_probabilities, ) -# TEMPORARY -import logging - -logger = logging.getLogger("cmf_pipelines") - if TYPE_CHECKING: from pandas import DataFrame as PandasDataFrame from polars import DataFrame as PolarsDataFrame @@ -304,7 +299,6 @@ def index(self, dataset: Source) -> None: dataset: The dataset to index. engine: The SQLAlchemy engine of your data warehouse. """ - logger.info("Matchbox: insert_dataset()") insert_dataset( dataset=dataset, engine=MBDB.get_engine(), diff --git a/src/matchbox/server/postgresql/utils/insert.py b/src/matchbox/server/postgresql/utils/insert.py index 2a9bd2a4..65d8b8f3 100644 --- a/src/matchbox/server/postgresql/utils/insert.py +++ b/src/matchbox/server/postgresql/utils/insert.py @@ -30,7 +30,6 @@ def insert_dataset(dataset: Source, engine: Engine, batch_size: int) -> None: """Indexes a dataset from your data warehouse within Matchbox.""" - logic_logger.info("Matchbox: Start of insert_dataset()") db_logger = logging.getLogger("sqlalchemy.engine") db_logger.setLevel(logging.WARNING) @@ -38,7 +37,6 @@ def insert_dataset(dataset: Source, engine: Engine, batch_size: int) -> None: # Insert dataset # ################## - logic_logger.info("Matchbox: to_hash()") model_hash = dataset.to_hash() model_data = { @@ -54,7 +52,6 @@ def insert_dataset(dataset: Source, engine: Engine, batch_size: int) -> None: "id": dataset.db_pk, } - logic_logger.info("Matchbox: dataset_to_hashlist()") clusters = dataset_to_hashlist(dataset=dataset, model_hash=model_hash) with engine.connect() as conn: @@ -62,7 +59,6 @@ def insert_dataset(dataset: Source, engine: Engine, batch_size: int) -> None: # Upsert into Models table models_stmt = insert(Models).values([model_data]) - logic_logger.info("Matchbox: on_conflict_do_update()") models_stmt = models_stmt.on_conflict_do_update( index_elements=["hash"], set_={ @@ -84,15 +80,13 @@ def insert_dataset(dataset: Source, engine: Engine, batch_size: int) -> None: "id": sources_stmt.excluded.id, }, ) - logic_logger.info("Matchbox: sources_stmt") - result = conn.execute(sources_stmt) + conn.execute(sources_stmt) conn.commit() logic_logger.info(f"{dataset} added to Sources table") # Upsert into Clusters table - logic_logger.info("Matchbox: batch_ingest()") batch_ingest( records=[(clus["hash"], clus["dataset"], clus["id"]) for clus in clusters], table=Clusters, diff --git a/src/matchbox/server/postgresql/utils/query.py b/src/matchbox/server/postgresql/utils/query.py index f9ad0443..18add154 100644 --- a/src/matchbox/server/postgresql/utils/query.py +++ b/src/matchbox/server/postgresql/utils/query.py @@ -31,9 +31,6 @@ logic_logger = logging.getLogger("mb_logic") -# Temporary -pipelines_logger = logging.getLogger("cmf_pipelines") - def hash_to_hex_decode(hash: bytes) -> bytes: """A workround for PostgreSQL so we can compile the query and use ConnectorX.""" @@ -261,7 +258,6 @@ def query( A table containing the requested data from each table, unioned together, with the hash key of each row in Matchbox, in the requested return type """ - pipelines_logger.info("Matchbox: Start of query()") tables: list[pa.Table] = [] if limit: @@ -270,7 +266,6 @@ def query( with Session(engine) as session: # If a model was specified, validate and retrieve it - pipelines_logger.info("Matchbox: query() - Retrive model") truth_model = None if model is not None: truth_model = session.query(Models).filter(Models.name == model).first() @@ -280,7 +275,6 @@ def query( # Process each source dataset for source, fields in selector.items(): # Get the dataset model - pipelines_logger.info("Matchbox: query() - Retrieve dataset model") dataset = ( session.query(Models) .join(Sources, Sources.model == Models.hash) @@ -297,7 +291,6 @@ def query( db_schema=source.db_schema, db_table=source.db_table ) - pipelines_logger.info("Matchbox: _resolve_cluster_hierarchy()") hash_query = _resolve_cluster_hierarchy( dataset_hash=dataset.hash, model=truth_model if truth_model else dataset, @@ -312,17 +305,14 @@ def query( limit_remainder -= 1 hash_query = hash_query.limit(limit_base + remain) - pipelines_logger.info("Matchbox: sql_to_df()") # Get cluster assignments mb_hashes = sql_to_df(hash_query, engine, return_type="arrow") - pipelines_logger.info("Matchbox: to_arrow()") # Get source data raw_data = source.to_arrow( fields=set([source.db_pk] + fields), pks=mb_hashes["id"].to_pylist() ) - pipelines_logger.info("Matchbox: key_to_sqlalchemy_label()") # Join and select columns joined_table = raw_data.join( right_table=mb_hashes, @@ -331,7 +321,6 @@ def query( join_type="inner", ) - pipelines_logger.info("Matchbox: key_to_sqlalchemy_label() - keep cols") keep_cols = ["hash"] + [key_to_sqlalchemy_label(f, source) for f in fields] match_cols = [col for col in joined_table.column_names if col in keep_cols] @@ -340,7 +329,6 @@ def query( # Combine results result = pa.concat_tables(tables, promote_options="default") - pipelines_logger.info("Matchbox: End of query()") # Return in requested format if return_type == "arrow": return result From c52f8e5aa2df27b4b87825f4ff7207152e0f8fb2 Mon Sep 17 00:00:00 2001 From: Sophie Glinton Date: Mon, 2 Dec 2024 14:05:49 +0000 Subject: [PATCH 5/8] Update github workflow env --- .github/workflows/pytest.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index 57fa983f..5dcce43f 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -7,7 +7,7 @@ on: env: MB__BATCH_SIZE: 10_000 - MB__BACKEND_TYPE: postgres + MB__BACKEND_TYPE: postgresql MB__DATASETS_CONFIG: datasets.toml # PostgreSQL backend settings MB__POSTGRES__HOST: localhost From 5e71c0e875fb6278d70475df827f06085771e1a1 Mon Sep 17 00:00:00 2001 From: Sophie Glinton Date: Mon, 2 Dec 2024 14:40:50 +0000 Subject: [PATCH 6/8] Admin logger rename --- src/matchbox/admin.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/matchbox/admin.py b/src/matchbox/admin.py index f153bc10..2a93d292 100644 --- a/src/matchbox/admin.py +++ b/src/matchbox/admin.py @@ -11,7 +11,7 @@ ) from matchbox.server.models import SourceWarehouse -logger = logging.getLogger("admin_pipeline") +logger = logging.getLogger("mb_logic") dotenv_path = find_dotenv(usecwd=True) load_dotenv(dotenv_path) From 490fb9a65849decf1f4f38644b249761d0eb4dc1 Mon Sep 17 00:00:00 2001 From: Sophie Glinton Date: Mon, 2 Dec 2024 15:18:16 +0000 Subject: [PATCH 7/8] Rename to postgres --- .github/workflows/pytest.yml | 2 +- sample.env | 2 +- src/matchbox/server/base.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index 5dcce43f..57fa983f 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -7,7 +7,7 @@ on: env: MB__BATCH_SIZE: 10_000 - MB__BACKEND_TYPE: postgresql + MB__BACKEND_TYPE: postgres MB__DATASETS_CONFIG: datasets.toml # PostgreSQL backend settings MB__POSTGRES__HOST: localhost diff --git a/sample.env b/sample.env index ed8f5c5b..999b0986 100644 --- a/sample.env +++ b/sample.env @@ -1,5 +1,5 @@ MB__BATCH_SIZE=250_000 -MB__BACKEND_TYPE=postgresql +MB__BACKEND_TYPE=postgres MB__DATASETS_CONFIG=datasets.toml MB__POSTGRES__HOST= diff --git a/src/matchbox/server/base.py b/src/matchbox/server/base.py index cdd17825..9b472f47 100644 --- a/src/matchbox/server/base.py +++ b/src/matchbox/server/base.py @@ -47,7 +47,7 @@ class MatchboxBackends(StrEnum): """The available backends for Matchbox.""" - POSTGRES = "postgresql" + POSTGRES = "postgres" class MatchboxSettings(BaseSettings): From 52efea4a216cd1972e03aafbfd6ecf4f879c8920 Mon Sep 17 00:00:00 2001 From: Sophie Glinton Date: Mon, 2 Dec 2024 15:20:40 +0000 Subject: [PATCH 8/8] Sample warehouse config update --- sample.datasets.toml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sample.datasets.toml b/sample.datasets.toml index 38bb62f8..e0dda57e 100644 --- a/sample.datasets.toml +++ b/sample.datasets.toml @@ -1,10 +1,10 @@ [warehouses.pg_warehouse] db_type = "postgresql" -user = "your_username" -password = "your_password" +database = "warehouse" +user = "warehouse_user" +password = "warehouse_password" host = "localhost" -port = 5432 -database = "companies_db" +port = 7654 [datasets.companies_house] database = "pg_warehouse"