Skip to content

Commit

Permalink
using mb_server_logger and mb_client_logger
Browse files Browse the repository at this point in the history
  • Loading branch information
dejandbt committed Feb 28, 2025
1 parent dc7da9d commit b9f1c20
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 41 deletions.
4 changes: 2 additions & 2 deletions src/matchbox/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from matchbox.common.exceptions import MatchboxClientSettingsException
from matchbox.common.logging import mb_logic_logger
from matchbox.common.logging import mb_server_logger

try:
# Environment variables must be loaded first for other imports to work
from matchbox.client import * # noqa: E402, F403
except MatchboxClientSettingsException:
mb_logic_logger.warning(
mb_server_logger.warning(
"Impossible to initialise client. "
"Please ignore if running in server mode. Otherwise, check your .env file.",
)
6 changes: 0 additions & 6 deletions src/matchbox/client/_logging.py

This file was deleted.

4 changes: 2 additions & 2 deletions src/matchbox/client/helpers/selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
from sqlalchemy import Engine, create_engine

from matchbox.client import _handler
from matchbox.client._logging import client_logger
from matchbox.client._settings import settings
from matchbox.common.graph import DEFAULT_RESOLUTION
from matchbox.common.logging import mb_client_logger
from matchbox.common.sources import Match, Source, SourceAddress


Expand Down Expand Up @@ -47,7 +47,7 @@ def select(
if not engine:
if default_engine := settings.default_warehouse:
engine = create_engine(default_engine)
client_logger.warning("Using default engine")
mb_client_logger.warning("Using default engine")
else:
raise ValueError(
"An engine needs to be provided if "
Expand Down
4 changes: 2 additions & 2 deletions src/matchbox/client/models/linkers/splinklinker.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from splink.internals.linker_components.training import LinkerTraining

from matchbox.client.models.linkers.base import Linker, LinkerSettings
from matchbox.common.logging import mb_logic_logger
from matchbox.common.logging import mb_client_logger


class SplinkLinkerFunction(BaseModel):
Expand Down Expand Up @@ -217,7 +217,7 @@ def prepare(self, left: DataFrame, right: DataFrame) -> None:

def link(self, left: DataFrame = None, right: DataFrame = None) -> DataFrame:
if left is not None or right is not None:
mb_logic_logger.warning(
mb_client_logger.warning(
"Left and right data are declared in .prepare() for SplinkLinker. "
"These values will be ignored"
)
Expand Down
4 changes: 3 additions & 1 deletion src/matchbox/common/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ def get_logger(name: str, custom_format: str = None) -> logging.Logger:
return logger


mb_logic_logger = get_logger("mb_logic")
mb_server_logger = get_logger("mb_server_logger")
mb_client_logger = get_logger("mb_client_logger")
mb_test_logger = get_logger("mb_test_logger")


def get_console():
Expand Down
12 changes: 6 additions & 6 deletions src/matchbox/common/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import rustworkx as rx

from matchbox.common.hash import hash_values
from matchbox.common.logging import build_progress_bar, mb_logic_logger
from matchbox.common.logging import build_progress_bar, mb_server_logger

T = TypeVar("T", bound=Hashable)

Expand Down Expand Up @@ -319,7 +319,7 @@ def to_hierarchical_clusters(
n_cores = multiprocessing.cpu_count()
n_components = len(components)

mb_logic_logger.info(
mb_server_logger.info(
f"Processing {n_components:,} components using {n_cores} workers"
)

Expand Down Expand Up @@ -369,12 +369,12 @@ def to_hierarchical_clusters(
results.append(result)
progress.update(process_task, advance=1)
except TimeoutError:
mb_logic_logger.error(
mb_server_logger.error(
f"Component processing timed out after {timeout} seconds"
)
raise
except Exception as e:
mb_logic_logger.error(f"Error processing component: {str(e)}")
mb_server_logger.error(f"Error processing component: {str(e)}")
raise
else:
results = [
Expand All @@ -383,13 +383,13 @@ def to_hierarchical_clusters(
if not progress.update(process_task, advance=1)
]

mb_logic_logger.info(
mb_server_logger.info(
f"Completed processing {len(results):,} components successfully"
)

# Create empty table if no results
if not results:
mb_logic_logger.warning("No results to concatenate")
mb_server_logger.warning("No results to concatenate")
return pa.table(
{
"parent": pa.array([], type=dtype()),
Expand Down
40 changes: 20 additions & 20 deletions src/matchbox/server/postgresql/utils/insert.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from matchbox.common.db import sql_to_df
from matchbox.common.graph import ResolutionNodeType
from matchbox.common.hash import hash_values
from matchbox.common.logging import WARNING, get_logger, mb_logic_logger
from matchbox.common.logging import WARNING, get_logger, mb_server_logger
from matchbox.common.sources import Source
from matchbox.common.transform import (
attach_components_to_probabilities,
Expand Down Expand Up @@ -124,7 +124,7 @@ def insert_dataset(
}

with engine.connect() as conn:
mb_logic_logger.info(f"Adding {source}")
mb_server_logger.info(f"Adding {source}")

# Generate existing max primary key values
next_cluster_id = Clusters.next_id()
Expand All @@ -144,7 +144,7 @@ def insert_dataset(
resolution_stmt = resolution_stmt.on_conflict_do_nothing()
conn.execute(resolution_stmt)

mb_logic_logger.info(f"{source} added to Resolutions table")
mb_server_logger.info(f"{source} added to Resolutions table")

# Upsert into Sources table
sources_stmt = insert(Sources).values([source_data])
Expand All @@ -153,7 +153,7 @@ def insert_dataset(

conn.commit()

mb_logic_logger.info(f"{source} added to Sources table")
mb_server_logger.info(f"{source} added to Sources table")

existing_hashes_statement = (
select(Clusters.cluster_hash)
Expand Down Expand Up @@ -196,11 +196,11 @@ def insert_dataset(
# Some edge cases, defined in tests, are not implemented yet
raise NotImplementedError from e

mb_logic_logger.info(
mb_server_logger.info(
f"{source} added {len(data_hashes)} objects to Clusters table"
)

mb_logic_logger.info(f"Finished {source}")
mb_server_logger.info(f"Finished {source}")


def insert_model(
Expand All @@ -223,7 +223,7 @@ def insert_model(
MatchboxResolutionNotFoundError: If the specified parent models don't exist.
MatchboxResolutionNotFoundError: If the specified model doesn't exist.
"""
mb_logic_logger.info(f"[{model}] Registering model")
mb_server_logger.info(f"[{model}] Registering model")
with Session(engine) as session:
resolution_hash = hash_values(
left.resolution_hash,
Expand Down Expand Up @@ -302,8 +302,8 @@ def _create_closure_entries(parent_resolution: Resolutions) -> None:
session.commit()

status = "Inserted new" if not exists else "Updated existing"
mb_logic_logger.info(f"[{model}] {status} model with ID {resolution_id}")
mb_logic_logger.info(f"[{model}] Done!")
mb_server_logger.info(f"[{model}] {status} model with ID {resolution_id}")
mb_server_logger.info(f"[{model}] Done!")


def _get_resolution_related_clusters(resolution_id: int) -> Select:
Expand Down Expand Up @@ -385,7 +385,7 @@ def _results_to_insert_tables(
* A Contains update Arrow table
* A Probabilities update Arrow table
"""
mb_logic_logger.info(f"[{resolution.name}] Wrangling data to insert tables")
mb_server_logger.info(f"[{resolution.name}] Wrangling data to insert tables")

# Create ID-Hash lookup for existing probabilities
lookup = sql_to_df(
Expand Down Expand Up @@ -451,7 +451,7 @@ def _results_to_insert_tables(
}
)

mb_logic_logger.info(f"[{resolution.name}] Wrangling complete!")
mb_server_logger.info(f"[{resolution.name}] Wrangling complete!")

return clusters, contains, probabilities

Expand Down Expand Up @@ -481,7 +481,7 @@ def insert_results(
Raises:
MatchboxResolutionNotFoundError: If the specified model doesn't exist.
"""
mb_logic_logger.info(
mb_server_logger.info(
f"[{resolution.name}] Writing results data with batch size {batch_size:,}"
)

Expand All @@ -499,18 +499,18 @@ def insert_results(
)

session.commit()
mb_logic_logger.info(f"[{resolution.name}] Removed old probabilities")
mb_server_logger.info(f"[{resolution.name}] Removed old probabilities")

except SQLAlchemyError as e:
session.rollback()
mb_logic_logger.error(
mb_server_logger.error(
f"[{resolution.name}] Failed to clear old probabilities: {str(e)}"
)
raise

with engine.connect() as conn:
try:
mb_logic_logger.info(
mb_server_logger.info(
f"[{resolution.name}] Inserting {clusters.shape[0]:,} results objects"
)

Expand All @@ -521,7 +521,7 @@ def insert_results(
batch_size=batch_size,
)

mb_logic_logger.info(
mb_server_logger.info(
f"[{resolution.name}] Successfully inserted {clusters.shape[0]} "
"objects into Clusters table"
)
Expand All @@ -533,7 +533,7 @@ def insert_results(
batch_size=batch_size,
)

mb_logic_logger.info(
mb_server_logger.info(
f"[{resolution.name}] Successfully inserted {contains.shape[0]} "
"objects into Contains table"
)
Expand All @@ -545,15 +545,15 @@ def insert_results(
batch_size=batch_size,
)

mb_logic_logger.info(
mb_server_logger.info(
f"[{resolution.name}] Successfully inserted "
f"{probabilities.shape[0]} objects into Probabilities table"
)

except SQLAlchemyError as e:
mb_logic_logger.error(
mb_server_logger.error(
f"[{resolution.name}] Failed to insert data: {str(e)}"
)
raise

mb_logic_logger.info(f"[{resolution.name}] Insert operation complete!")
mb_server_logger.info(f"[{resolution.name}] Insert operation complete!")
2 changes: 0 additions & 2 deletions test/fixtures/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@
from matchbox.client.clean import company_name
from matchbox.client.helpers import cleaner, cleaners, select
from matchbox.client.helpers.selector import Selector
from matchbox.common.logging import get_logger
from matchbox.common.sources import Source

LOGGER = get_logger(__name__)
TEST_ROOT = Path(__file__).resolve().parents[1]


Expand Down

0 comments on commit b9f1c20

Please sign in to comment.