diff --git a/codecov.yml b/codecov.yml new file mode 100644 index 000000000..35cde5cd5 --- /dev/null +++ b/codecov.yml @@ -0,0 +1,4 @@ +coverage: + status: + project: off + patch: off diff --git a/tools/cell_census_builder/README.md b/tools/cell_census_builder/README.md index 2a3e832df..7afb948b4 100644 --- a/tools/cell_census_builder/README.md +++ b/tools/cell_census_builder/README.md @@ -20,10 +20,13 @@ TL;DR: The build process: -- Pass 1: stage all source H5AD files -- Pass 2: build the axis dataframes for each experiment. This is a single-threaded pass, building dense dataframes. -- Pass 3: build the X layers for each experiment. This is a concurrent pass, reading/writing X layers in parallel. -- Pass 4: optional, validate the above +- Step 1: Retrieve all source H5AD files, storing locally (parallelized, I/O-bound) +- Step 2: Create root collection and child objects (fast). +- Step 3: Write the axis dataframes for each experiment, filtering the datasets and cells to include (serialized iteration of dataset H5ADs). +- Step 4: Write the X layers for each experiment (parallelized iteration of filtered dataset H5ADs). +- Step 5: Write datasets manifest and summary info. +- (Optional) Consolidate TileDB data +- (Optional) Validate the entire Cell Census, re-reading from storage. Modes of operation: a) (default) creating the entire "cell census" using all files currently in the CELLxGENE repository. diff --git a/tools/cell_census_builder/__main__.py b/tools/cell_census_builder/__main__.py index a1089aab1..a81639b85 100644 --- a/tools/cell_census_builder/__main__.py +++ b/tools/cell_census_builder/__main__.py @@ -5,15 +5,15 @@ import os.path import sys from datetime import datetime, timezone -from typing import List, Tuple +from typing import List import tiledbsoma as soma from .anndata import open_anndata from .census_summary import create_census_summary from .consolidate import consolidate -from .datasets import Dataset, assign_soma_joinids, create_dataset_manifest -from .experiment_builder import ExperimentBuilder, populate_X_layers +from .datasets import Dataset, assign_dataset_soma_joinids, create_dataset_manifest +from .experiment_builder import ExperimentBuilder, populate_X_layers, reopen_experiment_builders from .globals import ( CENSUS_DATA_NAME, CENSUS_INFO_NAME, @@ -30,7 +30,7 @@ from .validate import validate -def make_experiment_builders(base_uri: str, args: argparse.Namespace) -> List[ExperimentBuilder]: +def make_experiment_builders() -> List[ExperimentBuilder]: """ Define all soma.Experiments to build in the census. @@ -48,13 +48,11 @@ def make_experiment_builders(base_uri: str, args: argparse.Namespace) -> List[Ex ] experiment_builders = [ # The soma.Experiments we want to build ExperimentBuilder( - base_uri=base_uri, name="homo_sapiens", anndata_cell_filter_spec=dict(organism_ontology_term_id="NCBITaxon:9606", assay_ontology_term_ids=RNA_SEQ), gene_feature_length_uris=GENE_LENGTH_URIS, ), ExperimentBuilder( - base_uri=base_uri, name="mus_musculus", anndata_cell_filter_spec=dict(organism_ontology_term_id="NCBITaxon:10090", assay_ontology_term_ids=RNA_SEQ), gene_feature_length_uris=GENE_LENGTH_URIS, @@ -76,15 +74,12 @@ def main() -> int: assets_path = uricat(args.uri, args.build_tag, "h5ads") # create the experiment builders - experiment_builders = make_experiment_builders(uricat(soma_path, CENSUS_DATA_NAME), args) + experiment_builders = make_experiment_builders() cc = 0 if args.subcommand == "build": cc = build(args, soma_path, assets_path, experiment_builders) - # sanity check for build completion - assert cc != 0 or all(e.is_finished() for e in experiment_builders) - if cc == 0 and (args.subcommand == "validate" or args.validate): validate(args, soma_path, assets_path, experiment_builders) @@ -133,63 +128,56 @@ def build( logging.error(e) return 1 - # Step 1 - get all source assets - datasets = build_step1_get_source_assets(args, assets_path) + # Step 1 - get all source datasets + datasets = build_step1_get_source_datasets(args, assets_path) - # Step 2 - build axis dataframes - top_level_collection, filtered_datasets = build_step2_create_axis( - soma_path, assets_path, datasets, experiment_builders, args - ) - assign_soma_joinids(filtered_datasets) - logging.info(f"({len(filtered_datasets)} of {len(datasets)}) suitable for processing.") + # Step 2 - create root collection, and all child objects, but do not populate any dataframes or matrices + root_collection = build_step2_create_root_collection(soma_path, experiment_builders) gc.collect() - # Step 3- create X layers - build_step3_create_X_layers(assets_path, filtered_datasets, experiment_builders, args) + # Step 3 - populate axes + filtered_datasets = build_step3_populate_obs_and_var_axes(assets_path, datasets, experiment_builders) + + # Step 4 - populate X layers + build_step4_populate_X_layers(assets_path, filtered_datasets, experiment_builders, args) gc.collect() - # Write out dataset manifest and summary information - create_dataset_manifest(top_level_collection[CENSUS_INFO_NAME], filtered_datasets) - create_census_summary_cell_counts( - top_level_collection[CENSUS_INFO_NAME], [e.census_summary_cell_counts for e in experiment_builders] - ) - create_census_summary(top_level_collection[CENSUS_INFO_NAME], experiment_builders, args.build_tag) + # Step 5- write out dataset manifest and summary information + build_step5_populate_summary_info(root_collection, experiment_builders, filtered_datasets, args.build_tag) + for eb in experiment_builders: + eb.build_completed = True + + # consolidate TileDB data if args.consolidate: - consolidate(args, top_level_collection.uri) + consolidate(args, root_collection.uri) return 0 -def create_top_level_collections(soma_path: str) -> soma.Collection: +def populate_root_collection(root_collection: soma.Collection) -> soma.Collection: """ - Create the top-level SOMA collections for the Census. + Create the root SOMA collection for the Census. - Returns the top-most collection. + Returns the root collection. """ - top_level_collection = soma.Collection(soma_path, context=SOMA_TileDB_Context()) - if top_level_collection.exists(): - logging.error("Census already exists - aborting") - raise Exception("Census already exists - aborting") - top_level_collection.create() - # Set top-level metadata for the experiment - top_level_collection.metadata["created_on"] = datetime.now(tz=timezone.utc).isoformat(timespec="seconds") - top_level_collection.metadata["cxg_schema_version"] = CXG_SCHEMA_VERSION - top_level_collection.metadata["census_schema_version"] = CENSUS_SCHEMA_VERSION + # Set root metadata for the experiment + root_collection.metadata["created_on"] = datetime.now(tz=timezone.utc).isoformat(timespec="seconds") + root_collection.metadata["cxg_schema_version"] = CXG_SCHEMA_VERSION + root_collection.metadata["census_schema_version"] = CENSUS_SCHEMA_VERSION sha = get_git_commit_sha() - top_level_collection.metadata["git_commit_sha"] = sha + root_collection.metadata["git_commit_sha"] = sha # Create sub-collections for experiments, etc. for n in [CENSUS_INFO_NAME, CENSUS_DATA_NAME]: - cltn = soma.Collection(uricat(top_level_collection.uri, n), context=SOMA_TileDB_Context()).create() - top_level_collection.set(n, cltn, relative=True) + root_collection.add_new_collection(n) - return top_level_collection + return root_collection -def build_step1_get_source_assets(args: argparse.Namespace, assets_path: str) -> List[Dataset]: +def build_step1_get_source_datasets(args: argparse.Namespace, assets_path: str) -> List[Dataset]: logging.info("Build step 1 - get source assets - started") # Load manifest defining the datasets @@ -210,78 +198,122 @@ def build_step1_get_source_assets(args: argparse.Namespace, assets_path: str) -> return datasets -def build_step2_create_axis( - soma_path: str, - assets_path: str, - datasets: List[Dataset], - experiment_builders: List[ExperimentBuilder], - args: argparse.Namespace, -) -> Tuple[soma.Collection, List[Dataset]]: - """ - Create all objects, and populate the axis dataframes. - - Returns: the filtered datasets that will be included. This is simply - an optimization to allow subsequent X matrix writing to skip unused - datasets. - """ - logging.info("Build step 2 - axis creation - started") - - top_level_collection = create_top_level_collections(soma_path) - - # Create axis - for e in experiment_builders: - e.create(data_collection=top_level_collection[CENSUS_DATA_NAME]) - assert soma.Experiment(e.se_uri).exists() - - # Write obs axis and accumulate var axis (and remember the datasets that pass our filter) +def populate_obs_axis( + assets_path: str, datasets: List[Dataset], experiment_builders: List[ExperimentBuilder] +) -> List[Dataset]: filtered_datasets = [] N = len(datasets) * len(experiment_builders) - n = 1 + n = 0 + for dataset, ad in open_anndata(assets_path, datasets, backed="r"): dataset_total_cell_count = 0 - for e in experiment_builders: - dataset_total_cell_count += e.accumulate_axes(dataset, ad, progress=(n, N)) + + for eb in reopen_experiment_builders(experiment_builders): n += 1 + logging.info(f"{eb.name}: filtering dataset '{dataset.dataset_id}' ({n} of {N})") + ad_filtered = eb.filter_anndata_cells(ad) + + if len(ad_filtered.obs) == 0: # type:ignore + logging.info(f"{eb.name} - H5AD has no data after filtering, skipping {dataset.dataset_h5ad_path}") + continue + + # append to `obs`; accumulate `var` data + dataset_total_cell_count += eb.accumulate_axes(dataset, ad_filtered) - dataset.dataset_total_cell_count = dataset_total_cell_count + # dataset passes filter if either experiment includes cells from the dataset if dataset_total_cell_count > 0: filtered_datasets.append(dataset) + dataset.dataset_total_cell_count = dataset_total_cell_count + + for eb in experiment_builders: + logging.info(f"Experiment {eb.name} will contain {eb.n_obs} cells from {eb.n_datasets} datasets") + + return filtered_datasets + + +def populate_var_axis_and_presence(experiment_builders: List[ExperimentBuilder]) -> None: + for eb in reopen_experiment_builders(experiment_builders): + # populate `var`; create empty `presence` now that we have its dimensions + eb.populate_var_axis() + + +def build_step2_create_root_collection(soma_path: str, experiment_builders: List[ExperimentBuilder]) -> soma.Collection: + """ + Create all objects + + Returns: the root collection. + """ + logging.info("Build step 2 - Create root collection - started") + + with soma.Collection.create(soma_path, context=SOMA_TileDB_Context()) as root_collection: + populate_root_collection(root_collection) + + for e in experiment_builders: + e.create(census_data=root_collection[CENSUS_DATA_NAME]) + + logging.info("Build step 2 - Create root collection - finished") + return root_collection + + +def build_step3_populate_obs_and_var_axes( + assets_path: str, + datasets: List[Dataset], + experiment_builders: List[ExperimentBuilder], +) -> List[Dataset]: + """ + Populate obs and var axes. Filter cells from datasets for each experiment, as obs is built. + """ + logging.info("Build step 3 - Populate obs and var axes - started") + + filtered_datasets = populate_obs_axis(assets_path, datasets, experiment_builders) + logging.info(f"({len(filtered_datasets)} of {len(datasets)}) datasets suitable for processing.") + + populate_var_axis_and_presence(experiment_builders) + + assign_dataset_soma_joinids(filtered_datasets) - # Commit / write var - for e in experiment_builders: - e.commit_axis() - logging.info(f"Experiment {e.name} will contain {e.n_obs} cells from {e.n_datasets} datasets") + logging.info("Build step 3 - Populate obs and var axes - finished") - logging.info("Build step 2 - axis creation - finished") - return top_level_collection, filtered_datasets + return filtered_datasets -def build_step3_create_X_layers( +def build_step4_populate_X_layers( assets_path: str, filtered_datasets: List[Dataset], experiment_builders: List[ExperimentBuilder], args: argparse.Namespace, ) -> None: """ - Create and populate all X layers + Populate X layers. """ - logging.info("Build step 3 - X layer creation - started") - # base_path = args.uri - - # Create X layers - for e in experiment_builders: - e.create_X_layers(filtered_datasets) - e.create_joinid_metadata() + logging.info("Build step 4 - Populate X layers - started") # Process all X data + for eb in reopen_experiment_builders(experiment_builders): + eb.create_X_with_layers() + populate_X_layers(assets_path, filtered_datasets, experiment_builders, args) - # tidy up and finish - for e in experiment_builders: - e.commit_X(consolidate=args.consolidate) - e.commit_presence_matrix(filtered_datasets) + for eb in reopen_experiment_builders(experiment_builders): + eb.populate_presence_matrix() + + logging.info("Build step 4 - Populate X layers - finished") + + +def build_step5_populate_summary_info( + root_collection: soma.Collection, + experiment_builders: List[ExperimentBuilder], + filtered_datasets: List[Dataset], + build_tag: str, +) -> None: + logging.info("Build step 5 - Populate summary info - started") + + with soma.Collection.open(root_collection[CENSUS_INFO_NAME].uri, "w", context=SOMA_TileDB_Context()) as census_info: + create_dataset_manifest(census_info, filtered_datasets) + create_census_summary_cell_counts(census_info, [e.census_summary_cell_counts for e in experiment_builders]) + create_census_summary(census_info, experiment_builders, build_tag) - logging.info("Build step 3 - X layer creation - finished") + logging.info("Build step 5 - Populate summary info - finished") def create_args_parser() -> argparse.ArgumentParser: diff --git a/tools/cell_census_builder/census_summary.py b/tools/cell_census_builder/census_summary.py index dde275319..44a0a7b92 100644 --- a/tools/cell_census_builder/census_summary.py +++ b/tools/cell_census_builder/census_summary.py @@ -6,8 +6,7 @@ import tiledbsoma as soma from .experiment_builder import ExperimentBuilder, get_summary_stats -from .globals import CENSUS_SCHEMA_VERSION, CENSUS_SUMMARY_NAME, SOMA_TileDB_Context -from .util import uricat +from .globals import CENSUS_SCHEMA_VERSION, CENSUS_SUMMARY_NAME def create_census_summary( @@ -29,9 +28,7 @@ def create_census_summary( df["soma_joinid"] = range(len(df)) # write to a SOMA dataframe - summary_uri = uricat(info_collection.uri, CENSUS_SUMMARY_NAME) - summary = soma.DataFrame(summary_uri, context=SOMA_TileDB_Context()) - summary.create(pa.Schema.from_pandas(df, preserve_index=False), index_column_names=["soma_joinid"]) - for batch in pa.Table.from_pandas(df, preserve_index=False).to_batches(): - summary.write(batch) - info_collection.set(CENSUS_SUMMARY_NAME, summary, relative=True) + with info_collection.add_new_dataframe( + CENSUS_SUMMARY_NAME, schema=pa.Schema.from_pandas(df, preserve_index=False), index_column_names=["soma_joinid"] + ) as summary: + summary.write(pa.Table.from_pandas(df, preserve_index=False)) diff --git a/tools/cell_census_builder/consolidate.py b/tools/cell_census_builder/consolidate.py index 99e08d8c5..51b49bb32 100644 --- a/tools/cell_census_builder/consolidate.py +++ b/tools/cell_census_builder/consolidate.py @@ -5,10 +5,8 @@ import tiledbsoma as soma -from .mp import create_process_pool_executor - -if soma.get_storage_engine() == "tiledb": - import tiledb +from .globals import SOMA_TileDB_Context +from .mp import create_process_pool_executor, log_on_broken_process_pool def consolidate(args: argparse.Namespace, uri: str) -> None: @@ -18,36 +16,47 @@ def consolidate(args: argparse.Namespace, uri: str) -> None: if soma.get_storage_engine() != "tiledb": return - census = soma.Collection(uri) - if not census.exists(): - return + logging.info("Consolidate: started") + # Gather URIs for any arrays that potentially need consolidation + with soma.Collection.open(uri, context=SOMA_TileDB_Context()) as census: + uris_to_consolidate = _walk_tree(census) + logging.info(f"Consolidate: found {len(uris_to_consolidate)} TileDB objects to consolidate") + + # Queue consolidator for each array with create_process_pool_executor(args) as ppe: - futures = consolidate_collection(args, census, ppe) - for future in concurrent.futures.as_completed(futures): - uri = future.result() - logging.info(f"Consolidate: completed {uri}") + futures = [ppe.submit(consolidate_tiledb_object, uri) for uri in uris_to_consolidate] + logging.info(f"Consolidate: {len(futures)} consolidation jobs queued") + + # Wait for consolidation to complete + for n, future in enumerate(concurrent.futures.as_completed(futures), start=1): + log_on_broken_process_pool(ppe) + uri = future.result() + logging.info(f"Consolidate: completed [{n} of {len(futures)}]: {uri}") + logging.info("Consolidate: finished") -def consolidate_collection( - args: argparse.Namespace, collection: soma.Collection, ppe: concurrent.futures.ProcessPoolExecutor -) -> List[concurrent.futures.Future[str]]: - futures = [] + +def _walk_tree(collection: soma.Collection) -> List[str]: + uris = [] for soma_obj in collection.values(): type = soma_obj.soma_type if type in ["SOMADataFrame", "SOMASparseNDArray", "SOMADenseNDArray"]: - logging.info(f"Consolidate: queuing {type} {soma_obj.uri}") - futures.append(ppe.submit(consolidate_tiledb_object, soma_obj.uri)) + uris.append(soma_obj.uri) elif type in ["SOMACollection", "SOMAExperiment", "SOMAMeasurement"]: - futures += consolidate_collection(args, soma_obj, ppe) + uris += _walk_tree(soma_obj) else: raise TypeError(f"Unknown SOMA type {type}.") - - return futures + return uris def consolidate_tiledb_object(uri: str) -> str: - logging.info(f"Consolidate: starting {uri}") + assert soma.get_storage_engine() == "tiledb" + + import tiledb + + logging.info(f"Consolidate: start uri {uri}") tiledb.consolidate(uri, config=tiledb.Config({"sm.consolidation.buffer_size": 1 * 1024**3})) tiledb.vacuum(uri) + logging.info(f"Consolidate: end uri {uri}") return uri diff --git a/tools/cell_census_builder/datasets.py b/tools/cell_census_builder/datasets.py index 466dfb6a7..a98db52f5 100644 --- a/tools/cell_census_builder/datasets.py +++ b/tools/cell_census_builder/datasets.py @@ -6,13 +6,12 @@ import pyarrow as pa import tiledbsoma as soma -from .globals import CENSUS_DATASETS_COLUMNS, CENSUS_DATASETS_NAME, SOMA_TileDB_Context -from .util import uricat +from .globals import CENSUS_DATASETS_COLUMNS, CENSUS_DATASETS_NAME T = TypeVar("T", bound="Dataset") -@dataclasses.dataclass +@dataclasses.dataclass # TODO: use attrs class Dataset: """ Type used to handle source H5AD datasets read from manifest @@ -58,7 +57,7 @@ def from_dataframe(cls: Type[T], datasets: pd.DataFrame) -> List["Dataset"]: return [Dataset(**r) for r in datasets.to_dict("records")] # type: ignore[misc] -def assign_soma_joinids(datasets: List[Dataset]) -> None: +def assign_dataset_soma_joinids(datasets: List[Dataset]) -> None: for joinid, dataset in enumerate(datasets): dataset.soma_joinid = joinid @@ -72,9 +71,9 @@ def create_dataset_manifest(info_collection: soma.Collection, datasets: List[Dat manifest_df = manifest_df[CENSUS_DATASETS_COLUMNS + ["soma_joinid"]] # write to a SOMA dataframe - manifest_uri = uricat(info_collection.uri, CENSUS_DATASETS_NAME) - manifest = soma.DataFrame(manifest_uri, context=SOMA_TileDB_Context()) - manifest.create(pa.Schema.from_pandas(manifest_df, preserve_index=False), index_column_names=["soma_joinid"]) - for batch in pa.Table.from_pandas(manifest_df, preserve_index=False).to_batches(): - manifest.write(batch) - info_collection.set(CENSUS_DATASETS_NAME, manifest, relative=True) + with info_collection.add_new_dataframe( + CENSUS_DATASETS_NAME, + schema=pa.Schema.from_pandas(manifest_df, preserve_index=False), + index_column_names=["soma_joinid"], + ) as manifest: + manifest.write(pa.Table.from_pandas(manifest_df, preserve_index=False)) diff --git a/tools/cell_census_builder/experiment_builder.py b/tools/cell_census_builder/experiment_builder.py index eee0026c5..a3645f065 100644 --- a/tools/cell_census_builder/experiment_builder.py +++ b/tools/cell_census_builder/experiment_builder.py @@ -3,16 +3,18 @@ import gc import io import logging -from enum import IntEnum -from typing import Dict, List, Optional, Sequence, Tuple, TypedDict, Union, overload +from contextlib import ExitStack +from typing import Dict, Generator, List, Optional, Sequence, Tuple, TypedDict, Union, overload import anndata import numpy as np import numpy.typing as npt import pandas as pd import pyarrow as pa +import somacore import tiledbsoma as soma from scipy import sparse +from somacore.options import OpenMode from .anndata import AnnDataFilterSpec, make_anndata_cell_filter, open_anndata from .datasets import Dataset @@ -29,7 +31,7 @@ MEASUREMENT_RNA_NAME, SOMA_TileDB_Context, ) -from .mp import create_process_pool_executor +from .mp import create_process_pool_executor, log_on_broken_process_pool from .source_assets import cat_file from .summary_cell_counts import accumulate_summary_counts, init_summary_counts_accumulator from .tissue_mapper import TissueMapper # type: ignore @@ -56,6 +58,13 @@ tissue_mapper: TissueMapper = TissueMapper() +def _assert_open_for_write(obj: somacore.SOMAObject) -> None: + assert obj is not None + assert obj.exists(obj.uri) + assert obj.mode == "w" + assert not obj.closed + + class ExperimentBuilder: """ Class to help build a parameterized SOMA experiment, where key parameters are: @@ -70,29 +79,11 @@ class ExperimentBuilder: anndata_cell_filter_spec: AnnDataFilterSpec gene_feature_length_uris: List[str] gene_feature_length: pd.DataFrame - build_state: "ExperimentBuilder.BuildState" - - # builder state sanity check, used to catch usage errors. - - class BuildState(IntEnum): - Initialized = 0 - Created = 1 - AxisWritten = 2 - X_Created = 3 - X_JoinIdMetadataCreated = 4 - X_Written = 5 - X_Presence_Written = 6 - def next(self) -> "ExperimentBuilder.BuildState": - return ExperimentBuilder.BuildState(self.value + 1) - - def __init__( - self, base_uri: str, name: str, anndata_cell_filter_spec: AnnDataFilterSpec, gene_feature_length_uris: List[str] - ): + def __init__(self, name: str, anndata_cell_filter_spec: AnnDataFilterSpec, gene_feature_length_uris: List[str]): self.name = name self.anndata_cell_filter_spec = anndata_cell_filter_spec self.gene_feature_length_uris = gene_feature_length_uris - self.se_uri = uricat(base_uri, name) # accumulated state self.n_obs: int = 0 @@ -101,10 +92,13 @@ def __init__( self.n_datasets: int = 0 self.n_donors: int = 0 # Caution: defined as (unique dataset_id, donor_id) tuples, *excluding* some values self.var_df: pd.DataFrame = pd.DataFrame(columns=["feature_id", "feature_name"]) - self.dataset_obs_joinid_start: Dict[str, int] + self.dataset_obs_joinid_start: Dict[str, int] = {} self.census_summary_cell_counts = init_summary_counts_accumulator() + self.experiment: Optional[soma.Experiment] = None # initialized in create() + self.experiment_uri: Optional[str] = None # initialized in create() + self.global_var_joinids: Optional[pd.DataFrame] = None self.presence: Dict[int, Tuple[npt.NDArray[np.bool_], npt.NDArray[np.int64]]] = {} - self.build_state = ExperimentBuilder.BuildState.Initialized + self.build_completed: bool = False self.load_assets() @@ -123,74 +117,46 @@ def load_assets(self) -> None: ) logging.info(f"Loaded gene lengths external reference for {self.name}, {len(self.gene_feature_length)} genes.") - def is_finished(self) -> bool: - return self.build_state == ExperimentBuilder.BuildState.X_Presence_Written - - def create(self, data_collection: soma.Collection) -> None: - assert self.build_state == ExperimentBuilder.BuildState.Initialized + def create(self, census_data: soma.Collection) -> None: + """Create experiment within the specified Collection with a single Measurement.""" - """Make experiment at `uri` with a single Measurement and add to top-level collection.""" - logging.info(f"{self.name}: create experiment at {self.se_uri}") + logging.info(f"{self.name}: create experiment at {uricat(census_data.uri, self.name)}") - se = soma.Experiment(self.se_uri, context=SOMA_TileDB_Context()) - if se.exists(): - logging.error("Census already exists - aborting") - raise Exception("Census already exists") - se.create() - data_collection.set(self.name, se, relative=True) + self.experiment = census_data.add_new_collection(self.name, soma.Experiment) + self.experiment_uri = self.experiment.uri # create `ms` - se.set("ms", soma.Collection(uricat(se.uri, "ms")).create(), relative=True) + ms = self.experiment.add_new_collection("ms") # create `obs` obs_schema = pa.schema(list(CENSUS_OBS_TERM_COLUMNS.items())) - se.set( - "obs", - soma.DataFrame(uricat(se.uri, "obs")).create( - obs_schema, - index_column_names=["soma_joinid"], - platform_config=CENSUS_OBS_PLATFORM_CONFIG, - ), - relative=True, + self.experiment.add_new_dataframe( + "obs", schema=obs_schema, index_column_names=["soma_joinid"], platform_config=CENSUS_OBS_PLATFORM_CONFIG ) # make measurement and add to ms collection - measurement = soma.Measurement(uricat(se.ms.uri, MEASUREMENT_RNA_NAME)).create() - se.ms.set("RNA", measurement, relative=True) + rna_measurement = ms.add_new_collection(MEASUREMENT_RNA_NAME, soma.Measurement) - # make the `var` in the measurement + # create `var` in the measurement var_schema = pa.schema(list(CENSUS_VAR_TERM_COLUMNS.items())) - measurement.set( + rna_measurement.add_new_dataframe( "var", - soma.DataFrame(uricat(measurement.uri, "var")).create( - var_schema, - index_column_names=["soma_joinid"], - platform_config=CENSUS_VAR_PLATFORM_CONFIG, - ), - relative=True, + schema=var_schema, + index_column_names=["soma_joinid"], + platform_config=CENSUS_VAR_PLATFORM_CONFIG, ) - # make the `X` collection (but not the actual layers) - measurement.set("X", soma.Collection(uricat(measurement.uri, "X")).create(), relative=True) - - self.build_state = self.build_state.next() - return + def filter_anndata_cells(self, ad: anndata.AnnData) -> Union[None, anndata.AnnData]: + anndata_cell_filter = make_anndata_cell_filter(self.anndata_cell_filter_spec) + return anndata_cell_filter(ad, retain_X=False) - def accumulate_axes(self, dataset: Dataset, ad: anndata.AnnData, progress: Tuple[int, int] = (0, 0)) -> int: + def accumulate_axes(self, dataset: Dataset, ad: anndata.AnnData) -> int: """ - Write obs, accumate var. + Write obs, accumulate var. Returns: number of cells that make it past the experiment filter. """ - progmsg = f"({progress[0]} of {progress[1]})" - logging.info(f"{self.name}: accumulate axis for dataset '{dataset.dataset_id}' {progmsg}") - assert self.build_state == ExperimentBuilder.BuildState.Created - - anndata_cell_filter = make_anndata_cell_filter(self.anndata_cell_filter_spec) - ad = anndata_cell_filter(ad, retain_X=False) - if ad.n_obs == 0: - logging.info(f"{self.name} - H5AD has no data after filtering, skipping {dataset.dataset_h5ad_path}") - return 0 + assert len(ad.obs) > 0 # Narrow columns just to minimize memory footprint. Summary cell counting # requires 'organism', do be careful not to delete that. @@ -203,22 +169,15 @@ def accumulate_axes(self, dataset: Dataset, ad: anndata.AnnData, progress: Tuple add_tissue_mapping(obs_df, dataset.dataset_id) # Accumulate aggregation counts - self._accumulate_summary_cell_counts(obs_df) + self.census_summary_cell_counts = accumulate_summary_counts(self.census_summary_cell_counts, obs_df) # drop columns we don't want to write obs_df = obs_df[list(CENSUS_OBS_TERM_COLUMNS)] obs_df = anndata_ordered_bool_issue_853_workaround(obs_df) - se = soma.Experiment(self.se_uri, context=SOMA_TileDB_Context()) - assert se.exists() + self.populate_obs_axis(obs_df) - pa_table = pa.Table.from_pandas( - obs_df, - preserve_index=False, - columns=list(CENSUS_OBS_TERM_COLUMNS), - ) - for pa_batch in pa_table.to_batches(): - se.obs.write(pa_batch) + self.dataset_obs_joinid_start[dataset.dataset_id] = self.n_obs # Accumulate the union of all var ids/names (for raw and processed), to be later persisted. # NOTE: assumes raw.var is None, OR has same index as var. Currently enforced in open_anndata(), @@ -235,11 +194,21 @@ def accumulate_axes(self, dataset: Dataset, ad: anndata.AnnData, progress: Tuple self.n_datasets += 1 return len(obs_df) - def commit_axis(self) -> None: - logging.info(f"{self.name}: commit axes") - se = soma.Experiment(self.se_uri) - assert se.exists() - assert self.build_state == ExperimentBuilder.BuildState.Created + def populate_obs_axis(self, obs_df: pd.DataFrame) -> None: + _assert_open_for_write(self.experiment) + + logging.debug(f"experiment {self.name} obs = {obs_df.shape}") + pa_table = pa.Table.from_pandas( + obs_df, + preserve_index=False, + columns=list(CENSUS_OBS_TERM_COLUMNS), + ) + self.experiment.obs.write(pa_table) # type:ignore + + def populate_var_axis(self) -> None: + logging.info(f"{self.name}: populate var axis") + + _assert_open_for_write(self.experiment.ms["RNA"].var) # type:ignore[union-attr] # if is possible there is nothing to write if len(self.var_df) > 0: @@ -250,97 +219,53 @@ def commit_axis(self) -> None: self.var_df = anndata_ordered_bool_issue_853_workaround(self.var_df) - se.ms["RNA"].var.write( - pa.RecordBatch.from_pandas( + self.experiment.ms["RNA"].var.write( # type:ignore + pa.Table.from_pandas( self.var_df, preserve_index=False, columns=list(CENSUS_VAR_TERM_COLUMNS), ) ) + self.global_var_joinids = self.var_df[["feature_id", "soma_joinid"]].set_index("feature_id") + self.n_var = len(self.var_df) - self.build_state = self.build_state.next() - return - def create_X_layers(self, datasets: List[Dataset]) -> None: + def create_X_with_layers(self) -> None: """ Create layers in ms['RNA']/X """ logging.info(f"{self.name}: create X layers") - se = soma.Experiment(self.se_uri, context=SOMA_TileDB_Context()) - assert se.exists() - assert se.ms[MEASUREMENT_RNA_NAME].exists() - assert self.n_obs >= 0 and self.n_var >= 0 - assert self.build_state == ExperimentBuilder.BuildState.AxisWritten - assert self.n_obs == 0 or self.n_datasets > 0 + + rna_measurement = self.experiment.ms[MEASUREMENT_RNA_NAME] # type:ignore + _assert_open_for_write(rna_measurement) + + # make the `X` collection + rna_measurement.add_new_collection("X") # SOMA does not currently support empty arrays, so special case this corner-case. if self.n_obs > 0: assert self.n_var > 0 - measurement = se.ms[MEASUREMENT_RNA_NAME] for layer_name in CENSUS_X_LAYERS: - snda = soma.SparseNDArray(uricat(measurement.X.uri, layer_name), context=SOMA_TileDB_Context()).create( - CENSUS_X_LAYERS[layer_name], - (self.n_obs, self.n_var), + rna_measurement["X"].add_new_sparse_ndarray( + layer_name, + type=CENSUS_X_LAYERS[layer_name], + shape=(self.n_obs, self.n_var), platform_config=CENSUS_X_LAYERS_PLATFORM_CONFIG[layer_name], ) - measurement.X.set(layer_name, snda, relative=True) - - presence_matrix = soma.SparseNDArray( - uricat(measurement.uri, FEATURE_DATASET_PRESENCE_MATRIX_NAME), context=SOMA_TileDB_Context() - ) - max_dataset_joinid = max(d.soma_joinid for d in datasets) - presence_matrix.create(pa.bool_(), shape=(max_dataset_joinid + 1, self.n_var)) - measurement.set(FEATURE_DATASET_PRESENCE_MATRIX_NAME, presence_matrix, relative=True) - - self.build_state = self.build_state.next() - return - - def create_joinid_metadata(self) -> None: - logging.info(f"{self.name}: make joinid metadata") - assert self.build_state >= ExperimentBuilder.BuildState.AxisWritten - se = soma.Experiment(self.se_uri, context=SOMA_TileDB_Context()) - assert se.exists() - - # Map of dataset_id -> starting soma_joinid for obs axis. This code _assumes_ - # that soma_joinid is contiguous (ie, no deletions in obs), which is - # known true for our use case (aggregating h5ads). - self.dataset_obs_joinid_start = ( - se.obs.read(column_names=["dataset_id", "soma_joinid"]) - .concat() - .to_pandas() - .groupby("dataset_id") - .min() - .soma_joinid.to_dict() - ) - - self.build_state = self.build_state.next() - - def commit_X(self, *, consolidate: bool = False) -> None: - logging.info(f"{self.name}: commit X") - assert self.build_state == ExperimentBuilder.BuildState.X_JoinIdMetadataCreated - self.build_state = self.build_state.next() - def _accumulate_summary_cell_counts(self, obs_df: pd.DataFrame) -> None: - """ - Add summary counts to the census_summary_cell_counts dataframe - """ - assert "dataset_id" in obs_df - assert len(obs_df) > 0 - self.census_summary_cell_counts = accumulate_summary_counts(self.census_summary_cell_counts, obs_df) - - def commit_presence_matrix(self, datasets: List[Dataset]) -> None: + def populate_presence_matrix(self) -> None: """ Save presence matrix per Experiment """ - assert self.build_state == ExperimentBuilder.BuildState.X_Written + _assert_open_for_write(self.experiment) + # SOMA does not currently support empty arrays, so special case this corner-case. if len(self.presence) > 0: - max_dataset_joinid = max(d.soma_joinid for d in datasets) - - # A few sanity checks + # sanity check assert len(self.presence) == self.n_datasets - assert max_dataset_joinid >= max(self.presence.keys()) # key is dataset joinid + + max_dataset_joinid = max(self.presence.keys()) # LIL is fast way to create spmatrix pm = sparse.lil_array((max_dataset_joinid + 1, self.n_var), dtype=bool) @@ -352,11 +277,11 @@ def commit_presence_matrix(self, datasets: List[Dataset]) -> None: pm.eliminate_zeros() assert pm.count_nonzero() == pm.nnz assert pm.dtype == bool - se = soma.Experiment(self.se_uri, context=SOMA_TileDB_Context()) - fdpm: soma.SparseNDArray = se.ms[MEASUREMENT_RNA_NAME][FEATURE_DATASET_PRESENCE_MATRIX_NAME] - fdpm.write(pa.SparseCOOTensor.from_scipy(pm)) - self.build_state = self.build_state.next() + fdpm = self.experiment.ms["RNA"].add_new_sparse_ndarray( # type:ignore + FEATURE_DATASET_PRESENCE_MATRIX_NAME, type=pa.bool_(), shape=(max_dataset_joinid + 1, self.n_var) + ) + fdpm.write(pa.SparseCOOTensor.from_scipy(pm)) def _accumulate_all_X_layers( @@ -387,9 +312,9 @@ def _accumulate_all_X_layers( # this dataset has no data for this experiment continue - se = soma.Experiment(eb.se_uri, context=SOMA_TileDB_Context()) - assert se is not None - assert se.exists() + if eb.n_var == 0: + # edge case for test builds that have no data for an entire experiment (organism) + continue anndata_cell_filter = make_anndata_cell_filter(eb.anndata_cell_filter_spec) ad = anndata_cell_filter(unfiltered_ad) @@ -408,14 +333,7 @@ def _accumulate_all_X_layers( f"{eb.name}: saving X layer '{layer_name}' for dataset '{dataset.dataset_id}' " f"({progress[0]} of {progress[1]})" ) - global_var_joinids = ( - se.ms[ms_name] - .var.read(column_names=["feature_id", "soma_joinid"]) - .concat() - .to_pandas() - .set_index("feature_id") - ) - local_var_joinids = raw_var.join(global_var_joinids).soma_joinid.to_numpy() + local_var_joinids = raw_var.join(eb.global_var_joinids).soma_joinid.to_numpy() assert (local_var_joinids >= 0).all(), f"Illegal join id, {dataset.dataset_id}" for n, X in enumerate(array_chunker(raw_X), start=1): @@ -427,7 +345,8 @@ def _accumulate_all_X_layers( assert (col >= 0).all() X_remap = sparse.coo_array((X.data, (row, col)), shape=(eb.n_obs, eb.n_var)) X_remap.eliminate_zeros() - se.ms[ms_name].X[layer_name].write(pa.SparseCOOTensor.from_scipy(X_remap)) + with soma.Experiment.open(eb.experiment_uri, "w") as experiment: + experiment.ms[ms_name].X[layer_name].write(pa.SparseCOOTensor.from_scipy(X_remap)) gc.collect() # Save presence information by dataset_id @@ -489,12 +408,14 @@ def _accumulate_X( """ for eb in experiment_builders: # sanity checks - assert eb.build_state == ExperimentBuilder.BuildState.X_JoinIdMetadataCreated assert eb.dataset_obs_joinid_start is not None + # clear the experiment object to avoid pickling error + eb.experiment = None dataset_obs_joinid_starts = [ eb.dataset_obs_joinid_start.get(dataset.dataset_id, None) for eb in experiment_builders ] + if executor is not None: return executor.submit( _accumulate_all_X_layers, @@ -507,7 +428,7 @@ def _accumulate_X( ) else: return _accumulate_all_X_layers( - assets_path, dataset, experiment_builders, dataset_obs_joinid_starts, "RNA", progress + assets_path, dataset, experiment_builders, dataset_obs_joinid_starts, MEASUREMENT_RNA_NAME, progress ) @@ -515,9 +436,8 @@ def populate_X_layers( assets_path: str, datasets: List[Dataset], experiment_builders: List[ExperimentBuilder], args: argparse.Namespace ) -> None: """ - Do all X layer processing for all Experiments. + Do all X layer processing for all Experiments. Also accumulate presence matrix data for later writing. """ - # populate X layers presence: List[PresenceResult] = [] if args.multi_process: @@ -534,17 +454,18 @@ def populate_X_layers( } for n, f in enumerate(concurrent.futures.as_completed(futures), start=1): + log_on_broken_process_pool(pe) # propagate exceptions - not expecting any other return values presence += f.result() - logging.info(f"pass 2, {futures[f].dataset_id} ({n} of {len(futures)}) complete.") + logging.info(f"populate X for dataset {futures[f].dataset_id} ({n} of {len(futures)}) complete.") else: for n, dataset in enumerate(datasets, start=1): presence += _accumulate_X(assets_path, dataset, experiment_builders, progress=(n, len(datasets))) eb_by_name = {e.name: e for e in experiment_builders} - for _, dataset_soma_joinid, eb_name, pres_data, pres_col in presence: - eb_by_name[eb_name].presence[dataset_soma_joinid] = (pres_data, pres_col) + for _, dataset_soma_joinid, eb_name, pres_dataset, pres_col in presence: + eb_by_name[eb_name].presence[dataset_soma_joinid] = (pres_dataset, pres_col) class SummaryStats(TypedDict): @@ -577,3 +498,21 @@ def add_tissue_mapping(obs_df: pd.DataFrame, dataset_id: str) -> None: id: tissue_mapper.get_label_from_writable_id(id) for id in tissue_general_id_map.values() } obs_df["tissue_general"] = obs_df.tissue_general_ontology_term_id.map(tissue_general_label_map) + + +def reopen_experiment_builders( + experiment_builders: List[ExperimentBuilder], mode: OpenMode = "w" +) -> Generator[ExperimentBuilder, None, None]: + """ + Re-opens all ExperimentBuilder's `experiment` for writing as a Generator, allowing iterating code to use + the experiment for writing, without having to explicitly close it. + """ + with ExitStack() as experiments_stack: + for eb in experiment_builders: + # open experiments for write and ensure they are closed when exiting + assert eb.experiment is None or eb.experiment.closed + eb.experiment = soma.Experiment.open(eb.experiment_uri, mode, context=SOMA_TileDB_Context()) + experiments_stack.enter_context(eb.experiment) + + for eb in experiment_builders: + yield eb diff --git a/tools/cell_census_builder/globals.py b/tools/cell_census_builder/globals.py index 2771111d4..3bfa77eb9 100644 --- a/tools/cell_census_builder/globals.py +++ b/tools/cell_census_builder/globals.py @@ -1,10 +1,11 @@ +import time from typing import Set import pyarrow as pa import tiledb import tiledbsoma as soma -CENSUS_SCHEMA_VERSION = "0.1.0" +CENSUS_SCHEMA_VERSION = "0.1.1" CXG_SCHEMA_VERSION = "3.0.0" # version we write to the census CXG_SCHEMA_VERSION_IMPORT = [CXG_SCHEMA_VERSION] # versions we can ingest @@ -129,6 +130,8 @@ }, }, "offsets_filters": ["DoubleDeltaFilter", {"_type": "ZstdFilter", "level": 19}], + # TODO: enable when https://github.com/single-cell-data/TileDB-SOMA/issues/988 is fixed + # "allows_duplicates": True, } } } @@ -145,6 +148,8 @@ "capacity": 2**16, "dims": {"soma_joinid": {"filters": ["DoubleDeltaFilter", {"_type": "ZstdFilter", "level": 19}]}}, "offsets_filters": ["DoubleDeltaFilter", {"_type": "ZstdFilter", "level": 19}], + # TODO: enable when https://github.com/single-cell-data/TileDB-SOMA/issues/988 is fixed + # "allows_duplicates": True, } } } @@ -167,6 +172,8 @@ "attrs": {"soma_data": {"filters": ["ByteShuffleFilter", {"_type": "ZstdFilter", "level": 5}]}}, "cell_order": "row-major", "tile_order": "row-major", + # TODO: enable when https://github.com/single-cell-data/TileDB-SOMA/issues/988 is fixed + # "allows_duplicates": True, }, } } @@ -216,11 +223,28 @@ # Global TileDB context _TileDB_Ctx: tiledb.Ctx = None +# The logical timestamp at which all builder data should be recorded +WRITE_TIMESTAMP = int(time.time() * 1000) + +# Using "end of time" for read_timestamp means that all writes are visible, no matter what write timestamp was used +END_OF_TIME = 0xFFFFFFFFFFFFFFFF + def SOMA_TileDB_Context() -> soma.options.SOMATileDBContext: global _SOMA_TileDB_Context - if _SOMA_TileDB_Context is None: - _SOMA_TileDB_Context = soma.options.SOMATileDBContext(tiledb_ctx=TileDB_Ctx()) + if _SOMA_TileDB_Context is None or _SOMA_TileDB_Context != TileDB_Ctx(): + # Set write timestamp to "now", so that we use consistent timestamps across all writes (mostly for aesthetic + # reasons). Set read timestamps to be same as write timestamp so that post-build validation reads can "see" + # the writes. Without setting read timestamp explicitly, the read timestamp would default to a time that + # prevents seeing the builder's writes. + _SOMA_TileDB_Context = soma.options.SOMATileDBContext( + tiledb_ctx=TileDB_Ctx(), + # TODO: Setting an explicit write timestamp causes later reads to fail! + # write_timestamp=write_timestamp, + # TODO: We *should* be able to set this equal to WRITE_TIMESTAMP, but as specifying a write_timestamp is + # problematic, we must use "end of time" for now + read_timestamp=END_OF_TIME, + ) return _SOMA_TileDB_Context diff --git a/tools/cell_census_builder/mp.py b/tools/cell_census_builder/mp.py index 9c7fee506..d00052bc5 100644 --- a/tools/cell_census_builder/mp.py +++ b/tools/cell_census_builder/mp.py @@ -49,3 +49,26 @@ def create_process_pool_executor( initializer=process_initializer, initargs=(args.verbose,), ) + + +def log_on_broken_process_pool(ppe: concurrent.futures.ProcessPoolExecutor) -> None: + """ + There are a number of conditions where the Process Pool can be broken, + such that it will hang in a shutdown. This will cause the context __exit__ + to hang indefinitely, as it calls ProcessPoolExecutor.shutdown with + `wait=True`. + + An example condition which can cause a deadlock is an OOM, where a the + repear kills a process. + + This routine is used to detect the condition and log the error, so a + human has a chance of detecting/diagnosing. + + Caution: uses ProcessPoolExecutor internal API, as this state is not + otherwise visible. + """ + + if ppe._broken: + logging.critical(f"Process pool broken and may fail or hang: {ppe._broken}") + + return diff --git a/tools/cell_census_builder/summary_cell_counts.py b/tools/cell_census_builder/summary_cell_counts.py index b51163cde..ab744d6cf 100644 --- a/tools/cell_census_builder/summary_cell_counts.py +++ b/tools/cell_census_builder/summary_cell_counts.py @@ -6,10 +6,9 @@ import pyarrow as pa import tiledbsoma as soma -from .globals import CENSUS_SUMMARY_CELL_COUNTS_COLUMNS, CENSUS_SUMMARY_CELL_COUNTS_NAME, SOMA_TileDB_Context +from .globals import CENSUS_SUMMARY_CELL_COUNTS_COLUMNS, CENSUS_SUMMARY_CELL_COUNTS_NAME from .util import ( anndata_ordered_bool_issue_853_workaround, - uricat, ) @@ -31,12 +30,12 @@ def create_census_summary_cell_counts( df = anndata_ordered_bool_issue_853_workaround(df) # write to a SOMA dataframe - summary_counts_uri = uricat(info_collection.uri, CENSUS_SUMMARY_CELL_COUNTS_NAME) - summary_counts = soma.DataFrame(summary_counts_uri, context=SOMA_TileDB_Context()) - summary_counts.create(pa.Schema.from_pandas(df, preserve_index=False), index_column_names=["soma_joinid"]) - for batch in pa.Table.from_pandas(df, preserve_index=False).to_batches(): - summary_counts.write(batch) - info_collection.set(CENSUS_SUMMARY_CELL_COUNTS_NAME, summary_counts, relative=True) + with info_collection.add_new_dataframe( + CENSUS_SUMMARY_CELL_COUNTS_NAME, + schema=pa.Schema.from_pandas(df, preserve_index=False), + index_column_names=["soma_joinid"], + ) as cell_counts: + cell_counts.write(pa.Table.from_pandas(df, preserve_index=False)) def init_summary_counts_accumulator() -> pd.DataFrame: @@ -56,7 +55,9 @@ def accumulate_summary_counts(current: pd.DataFrame, obs_df: pd.DataFrame) -> pd Add summary counts to the census_summary_cell_counts dataframe """ assert "dataset_id" in obs_df - assert len(obs_df) > 0 + + if len(obs_df) == 0: + return current CATEGORIES = [ # term_id, label diff --git a/tools/cell_census_builder/tests/conftest.py b/tools/cell_census_builder/tests/conftest.py index 8a36c4d19..8463fa338 100644 --- a/tools/cell_census_builder/tests/conftest.py +++ b/tools/cell_census_builder/tests/conftest.py @@ -3,11 +3,12 @@ from typing import List import anndata +import attrs import numpy as np import pandas as pd import pytest from _pytest.monkeypatch import MonkeyPatch -from scipy.sparse import csc_matrix +from scipy import sparse from tools.cell_census_builder.datasets import Dataset from tools.cell_census_builder.globals import ( @@ -16,10 +17,19 @@ from tools.cell_census_builder.mp import process_initializer -def h5ad() -> anndata.AnnData: +@attrs.define(frozen=True) +class Organism: + name: str + organism_ontology_term_id: str + + +ORGANISMS = [Organism("homo_sapiens", "NCBITaxon:9606"), Organism("mus_musculus", "NCBITaxon:10090")] + + +def get_h5ad(organism: Organism) -> anndata.AnnData: X = np.random.randint(5, size=(4, 4)) # The builder only supports sparse matrices - X = csc_matrix(X) + X = sparse.csr_matrix(X) # Create obs obs_dataframe = pd.DataFrame( @@ -28,7 +38,7 @@ def h5ad() -> anndata.AnnData: "cell_type_ontology_term_id": "CL:0000192", "assay_ontology_term_id": "EFO:0008720", "disease_ontology_term_id": "PATO:0000461", - "organism_ontology_term_id": "NCBITaxon:9606", # TODO: add one that fails the filter + "organism_ontology_term_id": organism.organism_ontology_term_id, "sex_ontology_term_id": "unknown", "tissue_ontology_term_id": "CL:0000192", "is_primary_data": False, @@ -49,13 +59,13 @@ def h5ad() -> anndata.AnnData: obs = obs_dataframe # Create vars - feature_name = pd.Series(data=["a", "b", "c", "d"]) + feature_name = pd.Series(data=[f"{organism.name}_{g}" for g in ["a", "b", "c", "d"]]) var_dataframe = pd.DataFrame( data={ "feature_biotype": "gene", "feature_is_filtered": False, "feature_name": "ERCC-00002 (spike-in control)", - "feature_reference": "NCBITaxon:9606", + "feature_reference": organism.organism_ontology_term_id, }, index=feature_name, ) @@ -91,19 +101,21 @@ def soma_path(tmp_path: pathlib.Path) -> str: @pytest.fixture def datasets(assets_path: str) -> List[Dataset]: - first_h5ad = h5ad() - second_h5ad = h5ad() - - first_h5ad_path = f"{assets_path}/first.h5ad" - second_h5ad_path = f"{assets_path}/second.h5ad" - - first_h5ad.write_h5ad(first_h5ad_path) - second_h5ad.write_h5ad(second_h5ad_path) - - return [ - Dataset(dataset_id="first_id", corpora_asset_h5ad_uri="mock", dataset_h5ad_path=first_h5ad_path), - Dataset(dataset_id="second_id", corpora_asset_h5ad_uri="mock", dataset_h5ad_path=second_h5ad_path), - ] + datasets = [] + for organism in ORGANISMS: + for dataset_id in range(2): + h5ad = get_h5ad(organism) + h5ad_path = f"{assets_path}/{organism.name}_{dataset_id}.h5ad" + h5ad.write_h5ad(h5ad_path) + datasets.append( + Dataset( + dataset_id=f"{organism.name}_{dataset_id}", + corpora_asset_h5ad_uri="mock", + dataset_h5ad_path=h5ad_path, + ), + ) + + return datasets @pytest.fixture() diff --git a/tools/cell_census_builder/tests/test_builder.py b/tools/cell_census_builder/tests/test_builder.py index 63571ef7e..273d4244c 100644 --- a/tools/cell_census_builder/tests/test_builder.py +++ b/tools/cell_census_builder/tests/test_builder.py @@ -3,6 +3,7 @@ from typing import List from unittest.mock import patch +import numpy as np import pandas as pd import pyarrow as pa import tiledb @@ -13,8 +14,9 @@ from tools.cell_census_builder.globals import ( CENSUS_DATA_NAME, CENSUS_INFO_NAME, + FEATURE_DATASET_PRESENCE_MATRIX_NAME, + MEASUREMENT_RNA_NAME, ) -from tools.cell_census_builder.util import uricat from tools.cell_census_builder.validate import validate @@ -25,43 +27,64 @@ def test_base_builder_creation( Runs the builder, queries the census and performs a set of base assertions. """ with patch("tools.cell_census_builder.__main__.prepare_file_system"), patch( - "tools.cell_census_builder.__main__.build_step1_get_source_assets", return_value=datasets + "tools.cell_census_builder.__main__.build_step1_get_source_datasets", return_value=datasets ): - experiment_builders = make_experiment_builders(uricat(soma_path, CENSUS_DATA_NAME), []) # type: ignore + experiment_builders = make_experiment_builders() from types import SimpleNamespace args = SimpleNamespace(multi_process=False, consolidate=False, build_tag="test_tag") - return_value = build(args, soma_path, assets_path, experiment_builders) # type: ignore + return_value = build(args, soma_path, assets_path, experiment_builders) # type: ignore[arg-type] # return_value = 0 means that the build succeeded assert return_value == 0 # validate the cell_census - return_value = validate(args, soma_path, assets_path, experiment_builders) # type: ignore + return_value = validate(args, soma_path, assets_path, experiment_builders) # type: ignore[arg-type] assert return_value is True # Query the census and do assertions - census = soma.Collection( + with soma.Collection.open( uri=soma_path, context=soma.options.SOMATileDBContext(tiledb_ctx=tiledb.Ctx({"vfs.s3.region": "us-west-2"})), - ) - - # There are 8 cells in total (4 from the first and 4 from the second datasets). They all belong to homo_sapiens - human_obs = census[CENSUS_DATA_NAME]["homo_sapiens"]["obs"].read().concat().to_pandas() - assert human_obs.shape[0] == 8 - - # mus_musculus should have 0 cells - mouse_obs = census[CENSUS_DATA_NAME]["mus_musculus"]["obs"].read().concat().to_pandas() - assert mouse_obs.shape[0] == 0 - - # There are only 4 unique genes - var = census[CENSUS_DATA_NAME]["homo_sapiens"]["ms"]["RNA"]["var"].read().concat().to_pandas() - assert var.shape[0] == 4 - - # There should be 2 datasets - returned_datasets = census[CENSUS_INFO_NAME]["datasets"].read().concat().to_pandas() - assert returned_datasets.shape[0] == 2 - assert list(returned_datasets["dataset_id"]) == ["first_id", "second_id"] + ) as census: + # There are 8 cells in total (4 from the first and 4 from the second datasets). They all belong to homo_sapiens + human_obs = census[CENSUS_DATA_NAME]["homo_sapiens"]["obs"].read().concat().to_pandas() + assert human_obs.shape[0] == 8 + assert list(np.unique(human_obs["dataset_id"])) == ["homo_sapiens_0", "homo_sapiens_1"] + + # mus_musculus should have 8 cells + mouse_obs = census[CENSUS_DATA_NAME]["mus_musculus"]["obs"].read().concat().to_pandas() + assert mouse_obs.shape[0] == 8 + assert list(np.unique(mouse_obs["dataset_id"])) == ["mus_musculus_0", "mus_musculus_1"] + + # There are only 4 unique genes + var = census[CENSUS_DATA_NAME]["homo_sapiens"]["ms"]["RNA"]["var"].read().concat().to_pandas() + assert var.shape[0] == 4 + assert all(var["feature_id"].str.startswith("homo_sapiens")) + + var = census[CENSUS_DATA_NAME]["mus_musculus"]["ms"]["RNA"]["var"].read().concat().to_pandas() + assert var.shape[0] == 4 + assert all(var["feature_id"].str.startswith("mus_musculus")) + + # There should be 4 total datasets + returned_datasets = census[CENSUS_INFO_NAME]["datasets"].read().concat().to_pandas() + assert returned_datasets.shape[0] == 4 + assert list(returned_datasets["dataset_id"]) == [ + "homo_sapiens_0", + "homo_sapiens_1", + "mus_musculus_0", + "mus_musculus_1", + ] + + # Presence matrix should exist with the correct dimensions + for exp_name in ["homo_sapiens", "mus_musculus"]: + fdpm = census[CENSUS_DATA_NAME][exp_name].ms[MEASUREMENT_RNA_NAME][FEATURE_DATASET_PRESENCE_MATRIX_NAME] + fdpm_df = fdpm.read().tables().concat().to_pandas() + n_datasets = fdpm_df["soma_dim_0"].nunique() + n_features = fdpm_df["soma_dim_1"].nunique() + assert n_datasets == 2 + assert n_features == 4 + assert fdpm.nnz == 8 def test_unicode_support(tmp_path: pathlib.Path) -> None: @@ -72,11 +95,12 @@ def test_unicode_support(tmp_path: pathlib.Path) -> None: """ pd_df = pd.DataFrame(data={"value": ["Ünicode", "S̈upport"]}, columns=["value"]) pd_df["soma_joinid"] = pd_df.index - s_df = soma.DataFrame(uri=os.path.join(tmp_path, "unicode_support")).create( - pa.Schema.from_pandas(pd_df, preserve_index=False), index_column_names=["soma_joinid"] - ) - s_df.write(pa.Table.from_pandas(pd_df, preserve_index=False)) - - pd_df_in = soma.DataFrame(uri=os.path.join(tmp_path, "unicode_support")).read().concat().to_pandas() - - assert pd_df_in["value"].to_list() == ["Ünicode", "S̈upport"] + with soma.DataFrame.create( + uri=os.path.join(tmp_path, "unicode_support"), + schema=pa.Schema.from_pandas(pd_df, preserve_index=False), + index_column_names=["soma_joinid"], + ) as s_df: + s_df.write(pa.Table.from_pandas(pd_df, preserve_index=False)) + + with soma.DataFrame.open(uri=os.path.join(tmp_path, "unicode_support")) as pd_df_in: + assert pd_df_in.read().concat().to_pandas()["value"].to_list() == ["Ünicode", "S̈upport"] diff --git a/tools/cell_census_builder/validate.py b/tools/cell_census_builder/validate.py index 5febc3fbc..60dc50a7b 100644 --- a/tools/cell_census_builder/validate.py +++ b/tools/cell_census_builder/validate.py @@ -16,7 +16,7 @@ from .anndata import make_anndata_cell_filter, open_anndata from .datasets import Dataset -from .experiment_builder import ExperimentBuilder +from .experiment_builder import ExperimentBuilder, reopen_experiment_builders from .globals import ( CENSUS_DATA_NAME, CENSUS_DATASETS_COLUMNS, @@ -34,11 +34,11 @@ FEATURE_DATASET_PRESENCE_MATRIX_NAME, SOMA_TileDB_Context, ) -from .mp import create_process_pool_executor +from .mp import create_process_pool_executor, log_on_broken_process_pool from .util import uricat -@dataclass +@dataclass # TODO: use attrs class EbInfo: """Class used to collect information about axis (for validation code)""" @@ -66,101 +66,103 @@ def validate_all_soma_objects_exist(soma_path: str, experiment_builders: List[Ex | +-- homo_sapiens: soma.Experiment | +-- mus_musculus: soma.Experiment """ - census = soma.Collection(soma_path, context=SOMA_TileDB_Context()) - assert census.exists() and census.soma_type == "SOMACollection" - assert "cxg_schema_version" in census.metadata and census.metadata["cxg_schema_version"] == CXG_SCHEMA_VERSION - assert ( - "census_schema_version" in census.metadata and census.metadata["census_schema_version"] == CENSUS_SCHEMA_VERSION - ) - assert "created_on" in census.metadata and datetime.fromisoformat(census.metadata["created_on"]) - assert "git_commit_sha" in census.metadata - - for name in [CENSUS_INFO_NAME, CENSUS_DATA_NAME]: - assert name in census - assert census[name].soma_type == "SOMACollection" - assert census[name].exists() - - census_info = census[CENSUS_INFO_NAME] - for name in [CENSUS_DATASETS_NAME, CENSUS_SUMMARY_NAME, CENSUS_SUMMARY_CELL_COUNTS_NAME]: - assert name in census_info, f"`{name}` missing from census_info" - assert census_info[name].soma_type == "SOMADataFrame" - assert census_info[name].exists() - - assert sorted(census_info[CENSUS_DATASETS_NAME].keys()) == sorted(CENSUS_DATASETS_COLUMNS + ["soma_joinid"]) - assert sorted(census_info[CENSUS_SUMMARY_CELL_COUNTS_NAME].keys()) == sorted( - list(CENSUS_SUMMARY_CELL_COUNTS_COLUMNS) + ["soma_joinid"] - ) - assert sorted(census_info[CENSUS_SUMMARY_NAME].keys()) == sorted(["label", "value", "soma_joinid"]) - - # there should be an experiment for each builder - census_data = census[CENSUS_DATA_NAME] - for eb in experiment_builders: - assert ( - eb.name in census_data - and census_data[eb.name].exists() - and census_data[eb.name].soma_type == "SOMAExperiment" + with soma.Collection.open(soma_path, context=SOMA_TileDB_Context()) as census: + assert census.soma_type == "SOMACollection" + assert soma.Collection.exists(census.uri) + assert census.metadata["cxg_schema_version"] == CXG_SCHEMA_VERSION + assert census.metadata["census_schema_version"] == CENSUS_SCHEMA_VERSION + assert datetime.fromisoformat(census.metadata["created_on"]) + assert "git_commit_sha" in census.metadata + + for name in [CENSUS_INFO_NAME, CENSUS_DATA_NAME]: + assert soma.Collection.exists(census[name].uri) + assert census[name].soma_type == "SOMACollection" + + census_info = census[CENSUS_INFO_NAME] + for name in [CENSUS_DATASETS_NAME, CENSUS_SUMMARY_NAME, CENSUS_SUMMARY_CELL_COUNTS_NAME]: + assert name in census_info, f"`{name}` missing from census_info" + assert soma.DataFrame.exists(census_info[name].uri) + assert census_info[name].soma_type == "SOMADataFrame" + + assert sorted(census_info[CENSUS_DATASETS_NAME].keys()) == sorted(CENSUS_DATASETS_COLUMNS + ["soma_joinid"]) + assert sorted(census_info[CENSUS_SUMMARY_CELL_COUNTS_NAME].keys()) == sorted( + list(CENSUS_SUMMARY_CELL_COUNTS_COLUMNS) + ["soma_joinid"] ) + assert sorted(census_info[CENSUS_SUMMARY_NAME].keys()) == sorted(["label", "value", "soma_joinid"]) - e = census_data[eb.name] - assert "obs" in e and e.obs.exists() and e.obs.soma_type == "SOMADataFrame" - assert "ms" in e and e.ms.exists() and e.ms.soma_type == "SOMACollection" - - # there should be a single measurement called 'RNA' - assert "RNA" in e.ms and e.ms["RNA"].exists() and e.ms["RNA"].soma_type == "SOMAMeasurement" - - # The measurement should contain all X layers where n_obs > 0 (existence checked elsewhere) - rna = e.ms["RNA"] - assert "var" in rna and rna["var"].exists() and rna["var"].soma_type == "SOMADataFrame" - assert "X" in rna and rna["X"].exists() and rna["X"].soma_type == "SOMACollection" - for lyr in CENSUS_X_LAYERS: - # layers only exist if there are cells in the measurement - if lyr in rna.X: - assert rna.X[lyr].exists() and rna.X[lyr].soma_type == "SOMASparseNDArray" - - # and a dataset presence matrix - # dataset presence only exists if there are cells in the measurement - if FEATURE_DATASET_PRESENCE_MATRIX_NAME in rna: - assert rna[FEATURE_DATASET_PRESENCE_MATRIX_NAME].exists() - assert rna[FEATURE_DATASET_PRESENCE_MATRIX_NAME].soma_type == "SOMASparseNDArray" - # TODO(atolopko): validate 1) shape, 2) joinids exist in datsets and var + # there should be an experiment for each builder + census_data = census[CENSUS_DATA_NAME] + for eb in experiment_builders: + assert soma.Experiment.exists(census_data[eb.name].uri) + assert census_data[eb.name].soma_type == "SOMAExperiment" + + e = census_data[eb.name] + assert soma.DataFrame.exists(e.obs.uri) + assert e.obs.soma_type == "SOMADataFrame" + assert soma.Collection.exists(e.ms.uri) + assert e.ms.soma_type == "SOMACollection" + + # there should be a single measurement called 'RNA' + assert soma.Measurement.exists(e.ms["RNA"].uri) + assert e.ms["RNA"].soma_type == "SOMAMeasurement" + + # The measurement should contain all X layers where n_obs > 0 (existence checked elsewhere) + rna = e.ms["RNA"] + assert soma.DataFrame.exists(rna["var"].uri) + assert rna["var"].soma_type == "SOMADataFrame" + assert soma.Collection.exists(rna["X"].uri) + assert rna["X"].soma_type == "SOMACollection" + for lyr in CENSUS_X_LAYERS: + # layers only exist if there are cells in the measurement + if lyr in rna.X: + assert soma.SparseNDArray.exists(rna.X[lyr].uri) + assert rna.X[lyr].soma_type == "SOMASparseNDArray" + + # and a dataset presence matrix + # dataset presence only exists if there are cells in the measurement + if eb.n_obs > 0: + assert soma.SparseNDArray.exists(rna[FEATURE_DATASET_PRESENCE_MATRIX_NAME].uri) + assert rna[FEATURE_DATASET_PRESENCE_MATRIX_NAME].soma_type == "SOMASparseNDArray" + assert sum([c.non_zero_length for c in rna["feature_dataset_presence_matrix"].read().coos()]) > 0 + # TODO(atolopko): validate 1) shape, 2) joinids exist in datsets and var - return True + return True def _validate_axis_dataframes(args: Tuple[str, str, Dataset, List[ExperimentBuilder]]) -> Dict[str, EbInfo]: assets_path, soma_path, dataset, experiment_builders = args - census = soma.Collection(soma_path, context=SOMA_TileDB_Context()) - census_data = census[CENSUS_DATA_NAME] - dataset_id = dataset.dataset_id - _, unfiltered_ad = next(open_anndata(assets_path, [dataset], backed="r")) - eb_info: Dict[str, EbInfo] = {} - for eb in experiment_builders: - eb_info[eb.name] = EbInfo() - anndata_cell_filter = make_anndata_cell_filter(eb.anndata_cell_filter_spec) - se = census_data[eb.name] - ad = anndata_cell_filter(unfiltered_ad, retain_X=False) - dataset_obs = ( - se.obs.read( - column_names=list(CENSUS_OBS_TERM_COLUMNS), - value_filter=f"dataset_id == '{dataset_id}'", + with soma.Collection.open(soma_path, context=SOMA_TileDB_Context()) as census: + census_data = census[CENSUS_DATA_NAME] + dataset_id = dataset.dataset_id + _, unfiltered_ad = next(open_anndata(assets_path, [dataset], backed="r")) + eb_info: Dict[str, EbInfo] = {} + for eb in experiment_builders: + eb_info[eb.name] = EbInfo() + anndata_cell_filter = make_anndata_cell_filter(eb.anndata_cell_filter_spec) + se = census_data[eb.name] + ad = anndata_cell_filter(unfiltered_ad, retain_X=False) + dataset_obs = ( + se.obs.read( + column_names=list(CENSUS_OBS_TERM_COLUMNS), + value_filter=f"dataset_id == '{dataset_id}'", + ) + .concat() + .to_pandas() + .drop(columns=["dataset_id", "tissue_general", "tissue_general_ontology_term_id"]) + .sort_values(by="soma_joinid") + .drop(columns=["soma_joinid"]) + .reset_index(drop=True) ) - .concat() - .to_pandas() - .drop(columns=["dataset_id", "tissue_general", "tissue_general_ontology_term_id"]) - .sort_values(by="soma_joinid") - .drop(columns=["soma_joinid"]) - .reset_index(drop=True) - ) - assert len(dataset_obs) == len(ad.obs), f"{dataset.dataset_id}/{eb.name} obs length mismatch" - if ad.n_obs > 0: - eb_info[eb.name].n_obs += ad.n_obs - eb_info[eb.name].dataset_ids.add(dataset_id) - eb_info[eb.name].vars |= set(ad.var.index.array) - ad_obs = ad.obs[list(CXG_OBS_TERM_COLUMNS)].reset_index(drop=True) - assert (dataset_obs == ad_obs).all().all(), f"{dataset.dataset_id}/{eb.name} obs content, mismatch" + assert len(dataset_obs) == len(ad.obs), f"{dataset.dataset_id}/{eb.name} obs length mismatch" + if ad.n_obs > 0: + eb_info[eb.name].n_obs += ad.n_obs + eb_info[eb.name].dataset_ids.add(dataset_id) + eb_info[eb.name].vars |= set(ad.var.index.array) + ad_obs = ad.obs[list(CXG_OBS_TERM_COLUMNS)].reset_index(drop=True) + assert (dataset_obs == ad_obs).all().all(), f"{dataset.dataset_id}/{eb.name} obs content, mismatch" - return eb_info + return eb_info def validate_axis_dataframes( @@ -176,25 +178,29 @@ def validate_axis_dataframes( Raises on error. Returns True on success. """ logging.debug("validate_axis_dataframes") - census = soma.Collection(soma_path, context=SOMA_TileDB_Context()) - census_data = census[CENSUS_DATA_NAME] + with soma.Collection.open(soma_path, context=SOMA_TileDB_Context()) as census: + census_data = census[CENSUS_DATA_NAME] - # check schema - expected_obs_columns = CENSUS_OBS_TERM_COLUMNS - expected_var_columns = CENSUS_VAR_TERM_COLUMNS - for eb in experiment_builders: - obs = census_data[eb.name].obs - var = census_data[eb.name].ms["RNA"].var - assert sorted(obs.keys()) == sorted(expected_obs_columns.keys()) - assert sorted(var.keys()) == sorted(expected_var_columns.keys()) - for field in obs.schema: - assert field.name in expected_obs_columns - assert field.type == expected_obs_columns[field.name], f"Unexpected type in {field.name}: {field.type}" - for field in var.schema: - assert field.name in expected_var_columns - assert field.type == expected_var_columns[field.name], f"Unexpected type in {field.name}: {field.type}" + # check schema + expected_obs_columns = CENSUS_OBS_TERM_COLUMNS + expected_var_columns = CENSUS_VAR_TERM_COLUMNS + for eb in experiment_builders: + obs = census_data[eb.name].obs + var = census_data[eb.name].ms["RNA"].var + assert sorted(obs.keys()) == sorted(expected_obs_columns.keys()) + assert sorted(var.keys()) == sorted(expected_var_columns.keys()) + for field in obs.schema: + assert field.type == expected_obs_columns[field.name], f"Unexpected type in {field.name}: {field.type}" + for field in var.schema: + assert field.type == expected_var_columns[field.name], f"Unexpected type in {field.name}: {field.type}" # check shapes & perform weak test of contents + + # clear the TileDBObject instance variables to avoid pickling error + for eb in experiment_builders: + eb.experiment = None + eb.presence = {} + eb_info = {eb.name: EbInfo() for eb in experiment_builders} if args.multi_process: with create_process_pool_executor(args) as ppe: @@ -215,16 +221,17 @@ def validate_axis_dataframes( eb_info[eb_name].update(ebi) logging.info(f"validate_axis {n} of {len(datasets)} complete.") - for eb in experiment_builders: - se = census_data[eb.name] + for eb in reopen_experiment_builders(experiment_builders, "r"): + assert eb.experiment is not None + exp: soma.Experiment = eb.experiment n_vars = len(eb_info[eb.name].vars) - census_obs_df = se.obs.read(column_names=["soma_joinid", "dataset_id"]).concat().to_pandas() + census_obs_df = exp.obs.read(column_names=["soma_joinid", "dataset_id"]).concat().to_pandas() assert eb_info[eb.name].n_obs == len(census_obs_df) assert (len(census_obs_df) == 0) or (census_obs_df.soma_joinid.max() + 1 == eb_info[eb.name].n_obs) assert eb_info[eb.name].dataset_ids == set(census_obs_df.dataset_id.unique()) - census_var_df = se.ms["RNA"].var.read(column_names=["feature_id", "soma_joinid"]).concat().to_pandas() + census_var_df = exp.ms["RNA"].var.read(column_names=["feature_id", "soma_joinid"]).concat().to_pandas() assert n_vars == len(census_var_df) assert eb_info[eb.name].vars == set(census_var_df.feature_id.array) assert (len(census_var_df) == 0) or (census_var_df.soma_joinid.max() + 1 == n_vars) @@ -246,26 +253,27 @@ def validate_axis_dataframes( return True -def _validate_X_layers_contents_by_dataset(args: Tuple[str, str, Dataset, List[ExperimentBuilder]]) -> bool: +def _validate_X_layers_contents_by_dataset(args: Tuple[str, Dataset, List[ExperimentBuilder]]) -> bool: """ Validate that a single dataset is correctly represented in the census. Intended to be dispatched from validate_X_layers. - Currently implements weak tests: + Currently, implements weak tests: * the nnz is correct * there are no zeros explicitly saved (this is mandated by cell census schema) """ - assets_path, soma_path, dataset, experiment_builders = args - census = soma.Collection(soma_path, context=SOMA_TileDB_Context()) - census_data = census[CENSUS_DATA_NAME] + assets_path, dataset, experiment_builders = args _, unfiltered_ad = next(open_anndata(assets_path, [dataset])) - for eb in experiment_builders: - se = census_data[eb.name] + + for eb in reopen_experiment_builders(experiment_builders, "r"): + assert eb.experiment is not None + exp: soma.Experiment = eb.experiment + anndata_cell_filter = make_anndata_cell_filter(eb.anndata_cell_filter_spec) ad = anndata_cell_filter(unfiltered_ad, retain_X=True) soma_joinids: npt.NDArray[np.int64] = ( - se.obs.read( + exp.obs.read( column_names=["soma_joinid", "dataset_id"], value_filter=f"dataset_id == '{dataset.dataset_id}'" ) .concat() @@ -275,14 +283,12 @@ def _validate_X_layers_contents_by_dataset(args: Tuple[str, str, Dataset, List[E raw_nnz = 0 if len(soma_joinids) > 0: - assert "raw" in se.ms["RNA"].X and se.ms["RNA"].X["raw"].exists() + assert soma.SparseNDArray.exists(exp.ms["RNA"].X["raw"].uri) def count_elements(arr: soma.SparseNDArray, join_ids: npt.NDArray[np.int64]) -> int: - # TODO XXX: Work-around for regression TileDB-SOMA#473 - # return sum(t.non_zero_length for t in arr.read((join_ids, slice(None)))) - return sum(t.non_zero_length for t in arr.read((pa.array(join_ids), slice(None))).csrs()) + return sum(t.non_zero_length for t in arr.read((join_ids, slice(None))).coos()) - raw_nnz = count_elements(se.ms["RNA"].X["raw"], soma_joinids) + raw_nnz = count_elements(exp.ms["RNA"].X["raw"], soma_joinids) def count_nonzero(arr: Union[sparse.spmatrix, npt.NDArray[Any]]) -> int: """Return _actual_ non-zero count, NOT the stored value count.""" @@ -302,40 +308,37 @@ def count_nonzero(arr: Union[sparse.spmatrix, npt.NDArray[Any]]) -> int: return True -def _validate_X_layer_has_unique_coords(args: Tuple[str, ExperimentBuilder, str, int, int]) -> bool: +def _validate_X_layer_has_unique_coords(args: Tuple[ExperimentBuilder, str, int, int]) -> bool: """Validate that all X layers have no duplicate coordinates""" - soma_path, experiment_builder, layer_name, row_range_start, row_range_stop = args - census = soma.Collection(soma_path, context=SOMA_TileDB_Context()) - census_data = census[CENSUS_DATA_NAME] - se = census_data[experiment_builder.name] - logging.info( - f"validate_no_dups_X start, {experiment_builder.name}, {layer_name}, rows [{row_range_start}, {row_range_stop})" - ) - if layer_name not in se.ms["RNA"].X: - return True + experiment_builder, layer_name, row_range_start, row_range_stop = args + with soma.Experiment.open(experiment_builder.experiment_uri, context=SOMA_TileDB_Context()) as exp: + logging.info( + f"validate_no_dups_X start, {experiment_builder.name}, {layer_name}, rows [{row_range_start}, {row_range_stop})" + ) + if layer_name not in exp.ms["RNA"].X: + return True - X_layer = se.ms["RNA"].X[layer_name] - n_rows, n_cols = X_layer.shape - ROW_SLICE_SIZE = 100_000 + X_layer = exp.ms["RNA"].X[layer_name] + n_rows, n_cols = X_layer.shape + ROW_SLICE_SIZE = 100_000 - for row in range(row_range_start, min(row_range_stop, n_rows), ROW_SLICE_SIZE): - # work around TileDB-SOMA bug #900 which errors if we slice beyond end of shape. - # TODO: remove when issue is resolved. - end_row = min(row + ROW_SLICE_SIZE, X_layer.shape[0] - 1) + for row in range(row_range_start, min(row_range_stop, n_rows), ROW_SLICE_SIZE): + # work around TileDB-SOMA bug #900 which errors if we slice beyond end of shape. + # TODO: remove when issue is resolved. + end_row = min(row + ROW_SLICE_SIZE, X_layer.shape[0] - 1) - slice_of_X = X_layer.read(coords=(slice(row, end_row),)).tables().concat() + slice_of_X = X_layer.read(coords=(slice(row, end_row),)).tables().concat() - # Use C layout offset for unique test - offsets = (slice_of_X["soma_dim_0"].to_numpy() * n_cols) + slice_of_X["soma_dim_1"].to_numpy() - unique_offsets = np.unique(offsets) - assert len(offsets) == len(unique_offsets) + # Use C layout offset for unique test + offsets = (slice_of_X["soma_dim_0"].to_numpy() * n_cols) + slice_of_X["soma_dim_1"].to_numpy() + unique_offsets = np.unique(offsets) + assert len(offsets) == len(unique_offsets) - return True + return True def validate_X_layers( assets_path: str, - soma_path: str, datasets: List[Dataset], experiment_builders: List[ExperimentBuilder], args: argparse.Namespace, @@ -346,51 +349,62 @@ def validate_X_layers( Raises on error. Returns True on success. """ logging.debug("validate_X_layers") - census = soma.Collection(soma_path, context=SOMA_TileDB_Context()) - census_data = census[CENSUS_DATA_NAME] + for eb in reopen_experiment_builders(experiment_builders, "r"): + assert eb.experiment is not None + exp = eb.experiment - for eb in experiment_builders: - se = census_data[eb.name] - assert se.ms["RNA"].X.exists() + assert soma.Collection.exists(exp.ms["RNA"].X.uri) - census_obs_df = se.obs.read(column_names=["soma_joinid"]).concat().to_pandas() + census_obs_df = exp.obs.read(column_names=["soma_joinid"]).concat().to_pandas() n_obs = len(census_obs_df) - assert eb.n_obs == n_obs - census_var_df = se.ms["RNA"].var.read(column_names=["feature_id", "soma_joinid"]).concat().to_pandas() + logging.info(f"uri = {exp.obs.uri}, eb.n_obs = {eb.n_obs}; n_obs = {n_obs}") + # TODO: Only works when run as builder, not standalone validator + # https://github.com/chanzuckerberg/cell-census/issues/179 + if eb.build_completed: + assert eb.n_obs == n_obs + census_var_df = exp.ms["RNA"].var.read(column_names=["feature_id", "soma_joinid"]).concat().to_pandas() n_vars = len(census_var_df) - assert eb.n_var == n_vars + # TODO: Only works when run as builder, not standalone validator + # https://github.com/chanzuckerberg/cell-census/issues/179 + if eb.build_completed: + assert eb.n_var == n_vars if n_obs > 0: for lyr in CENSUS_X_LAYERS: - assert se.ms["RNA"].X[lyr].exists() - X = se.ms["RNA"].X[lyr] + assert soma.SparseNDArray.exists(exp.ms["RNA"].X[lyr].uri) + X = exp.ms["RNA"].X[lyr] assert X.schema.field("soma_dim_0").type == pa.int64() assert X.schema.field("soma_dim_1").type == pa.int64() assert X.schema.field("soma_data").type == CENSUS_X_LAYERS[lyr] assert X.shape == (n_obs, n_vars) + # clear the TileDBObject instance variables to avoid pickling error + for eb in experiment_builders: + eb.experiment = None + eb.presence = {} + if args.multi_process: with create_process_pool_executor(args) as ppe: ROWS_PER_PROCESS = 1_000_000 dup_coord_futures = [ ppe.submit( _validate_X_layer_has_unique_coords, - (soma_path, eb, layer_name, row_start, row_start + ROWS_PER_PROCESS), + (eb, layer_name, row_start, row_start + ROWS_PER_PROCESS), ) for eb in experiment_builders for layer_name in CENSUS_X_LAYERS - for row_start in range(0, eb.n_obs, ROWS_PER_PROCESS) + for row_start in range(0, n_obs, ROWS_PER_PROCESS) ] per_dataset_futures = [ - ppe.submit( - _validate_X_layers_contents_by_dataset, (assets_path, soma_path, dataset, experiment_builders) - ) + ppe.submit(_validate_X_layers_contents_by_dataset, (assets_path, dataset, experiment_builders)) for dataset in datasets ] for n, future in enumerate(concurrent.futures.as_completed(dup_coord_futures), start=1): + log_on_broken_process_pool(ppe) assert future.result() logging.info(f"validate_no_dups_X {n} of {len(dup_coord_futures)} complete.") for n, future in enumerate(concurrent.futures.as_completed(per_dataset_futures), start=1): + log_on_broken_process_pool(ppe) assert future.result() logging.info(f"validate_X {n} of {len(datasets)} complete.") @@ -398,10 +412,10 @@ def validate_X_layers( for eb in experiment_builders: for layer_name in CENSUS_X_LAYERS: logging.info(f"Validating no duplicate coordinates in X layer {eb.name} layer {layer_name}") - assert _validate_X_layer_has_unique_coords((soma_path, eb, layer_name, 0, eb.n_obs)) + assert _validate_X_layer_has_unique_coords((eb, layer_name, 0, n_obs)) for n, vld in enumerate( ( - _validate_X_layers_contents_by_dataset((assets_path, soma_path, dataset, experiment_builders)) + _validate_X_layers_contents_by_dataset((assets_path, dataset, experiment_builders)) for dataset in datasets ), start=1, @@ -415,11 +429,12 @@ def validate_X_layers( def load_datasets_from_census(assets_path: str, soma_path: str) -> List[Dataset]: # Datasets are pulled from the census datasets manifest, validating the SOMA # census against the snapshot assets. - df = soma.Collection(soma_path)[CENSUS_INFO_NAME][CENSUS_DATASETS_NAME].read().concat().to_pandas() - df.drop(columns=["soma_joinid"], inplace=True) - df["corpora_asset_h5ad_uri"] = df.dataset_h5ad_path.map(lambda p: uricat(assets_path, p)) - datasets = Dataset.from_dataframe(df) - return datasets + with soma.Collection.open(soma_path, context=SOMA_TileDB_Context()) as census: + df = census[CENSUS_INFO_NAME][CENSUS_DATASETS_NAME].read().concat().to_pandas() + df.drop(columns=["soma_joinid"], inplace=True) + df["corpora_asset_h5ad_uri"] = df.dataset_h5ad_path.map(lambda p: uricat(assets_path, p)) + datasets = Dataset.from_dataframe(df) + return datasets def validate_manifest_contents(assets_path: str, datasets: List[Dataset]) -> bool: @@ -443,6 +458,15 @@ def validate( logging.info("Validation start") assert soma_path.startswith(assets_path.rsplit("/", maxsplit=1)[0]) assert os.path.exists(soma_path) and os.path.exists(assets_path) + + # HACK: experiment_uri is only initialized in create(), which is only called if census is built before calling + # the validator; fails in validator-only mode + # TODO: This should be handled more elegantly, say, by initializing + # in the ExperimentBuilder constructor. https://github.com/chanzuckerberg/cell-census/issues/179 + for eb in experiment_builders: + if not eb.build_completed: + eb.experiment_uri = f"{soma_path}/{CENSUS_DATA_NAME}/{eb.name}" + assert soma_path.endswith("soma") and assets_path.endswith("h5ads") assert validate_all_soma_objects_exist(soma_path, experiment_builders) @@ -450,6 +474,6 @@ def validate( assert validate_manifest_contents(assets_path, datasets) assert validate_axis_dataframes(assets_path, soma_path, datasets, experiment_builders, args) - assert validate_X_layers(assets_path, soma_path, datasets, experiment_builders, args) - logging.info("Validation success") + assert validate_X_layers(assets_path, datasets, experiment_builders, args) + logging.info("Validation finished (success)") return True diff --git a/tools/scripts/requirements.txt b/tools/scripts/requirements.txt index a24f5482e..57dc5a903 100644 --- a/tools/scripts/requirements.txt +++ b/tools/scripts/requirements.txt @@ -5,7 +5,7 @@ numpy # NOTE: You can also build this dependency from source, per ./notebooks/README.md. # NOTE: The builder's version of tiledbsoma MUST be <= the API's tiledbsoma version, to ensure reader compatibility # of TileDB on-disk storage format -tiledbsoma==0.5.0a6 +tiledbsoma==1.0rc0 # NOTE: tiledb is also a requirement of the builder, but builder must not use a tiledb version that is ahead of # tiledbsoma's tiledb version (so just use the same version) # tiledb