From 9694e21b67e65590323941c34e0dd09c5be781c6 Mon Sep 17 00:00:00 2001 From: Isaac Virshup Date: Sat, 2 Mar 2024 02:02:19 +1100 Subject: [PATCH] Backport PR #1348: Write dask arrays with 64bit indptr --- anndata/_core/sparse_dataset.py | 15 +++++++++-- anndata/_io/specs/methods.py | 8 +++++- anndata/tests/test_backed_sparse.py | 39 +++++++++++++++++++++++++++- anndata/tests/test_io_elementwise.py | 3 +++ docs/release-notes/0.10.6.md | 1 + 5 files changed, 62 insertions(+), 4 deletions(-) diff --git a/anndata/_core/sparse_dataset.py b/anndata/_core/sparse_dataset.py index 6dfb89745..53743384d 100644 --- a/anndata/_core/sparse_dataset.py +++ b/anndata/_core/sparse_dataset.py @@ -474,6 +474,15 @@ def append(self, sparse_matrix: ss.spmatrix): f"Matrices must have same format. Currently are " f"{self.format!r} and {get_format(sparse_matrix)!r}" ) + indptr_offset = len(self.group["indices"]) + if self.group["indptr"].dtype == np.int32: + new_nnz = indptr_offset + len(sparse_matrix.indices) + if new_nnz >= np.iinfo(np.int32).max: + raise OverflowError( + "This array was written with a 32 bit intptr, but is now large " + "enough to require 64 bit values. Please recreate the array with " + "a 64 bit indptr." + ) # shape if self.format == "csr": @@ -501,11 +510,13 @@ def append(self, sparse_matrix: ss.spmatrix): # indptr indptr = self.group["indptr"] orig_data_size = indptr.shape[0] - append_offset = indptr[-1] indptr.resize((orig_data_size + sparse_matrix.indptr.shape[0] - 1,)) indptr[orig_data_size:] = ( - sparse_matrix.indptr[1:].astype(np.int64) + append_offset + sparse_matrix.indptr[1:].astype(np.int64) + indptr_offset ) + # Clear cached property + if hasattr(self, "indptr"): + del self.indptr # indices indices = self.group["indices"] diff --git a/anndata/_io/specs/methods.py b/anndata/_io/specs/methods.py index 70bd36945..b63a7d04f 100644 --- a/anndata/_io/specs/methods.py +++ b/anndata/_io/specs/methods.py @@ -562,6 +562,12 @@ def write_sparse_dataset(f, k, elem, _writer, dataset_kwargs=MappingProxyType({} ) def write_dask_sparse(f, k, elem, _writer, dataset_kwargs=MappingProxyType({})): sparse_format = elem._meta.format + + def as_int64_indices(x): + x.indptr = x.indptr.astype(np.int64, copy=False) + x.indices = x.indices.astype(np.int64, copy=False) + return x + if sparse_format == "csr": axis = 0 elif sparse_format == "csc": @@ -583,7 +589,7 @@ def chunk_slice(start: int, stop: int) -> tuple[slice | None, slice | None]: _writer.write_elem( f, k, - elem[chunk_slice(chunk_start, chunk_stop)].compute(), + as_int64_indices(elem[chunk_slice(chunk_start, chunk_stop)].compute()), dataset_kwargs=dataset_kwargs, ) diff --git a/anndata/tests/test_backed_sparse.py b/anndata/tests/test_backed_sparse.py index 7ce6860d1..478b2a0e8 100644 --- a/anndata/tests/test_backed_sparse.py +++ b/anndata/tests/test_backed_sparse.py @@ -12,7 +12,7 @@ import anndata as ad from anndata._core.anndata import AnnData from anndata._core.sparse_dataset import sparse_dataset -from anndata.experimental import read_dispatched +from anndata.experimental import read_dispatched, write_elem from anndata.tests.helpers import AccessTrackingStore, assert_equal, subset_func if TYPE_CHECKING: @@ -433,3 +433,40 @@ def test_backed_sizeof( assert csr_disk.__sizeof__(with_disk=True) == csc_disk.__sizeof__(with_disk=True) assert csr_mem.__sizeof__() > csr_disk.__sizeof__() assert csr_mem.__sizeof__() > csc_disk.__sizeof__() + + +@pytest.mark.parametrize( + "group_fn", + [ + pytest.param(lambda _: zarr.group(), id="zarr"), + pytest.param(lambda p: h5py.File(p / "test.h5", mode="a"), id="h5py"), + ], +) +def test_append_overflow_check(group_fn, tmpdir): + group = group_fn(tmpdir) + typemax_int32 = np.iinfo(np.int32).max + orig_mtx = sparse.csr_matrix(np.ones((1, 1), dtype=bool)) + # Minimally allocating new matrix + new_mtx = sparse.csr_matrix( + ( + np.broadcast_to(True, typemax_int32 - 1), + np.broadcast_to(np.int32(1), typemax_int32 - 1), + [0, typemax_int32 - 1], + ), + shape=(1, 2), + ) + + write_elem(group, "mtx", orig_mtx) + backed = sparse_dataset(group["mtx"]) + + # Checking for correct caching behaviour + backed.indptr + + with pytest.raises( + OverflowError, + match=r"This array was written with a 32 bit intptr, but is now large.*", + ): + backed.append(new_mtx) + + # Check for any modification + assert_equal(backed, orig_mtx) diff --git a/anndata/tests/test_io_elementwise.py b/anndata/tests/test_io_elementwise.py index 7f7dac4dd..4ed4dddab 100644 --- a/anndata/tests/test_io_elementwise.py +++ b/anndata/tests/test_io_elementwise.py @@ -147,6 +147,9 @@ def test_dask_write_sparse(store, sparse_format): assert_equal(X_from_disk, X_dask_from_disk) assert_equal(dict(store["X"].attrs), dict(store["X_dask"].attrs)) + assert store["X_dask/indptr"].dtype == np.int64 + assert store["X_dask/indices"].dtype == np.int64 + def test_io_spec_raw(store): adata = gen_adata((3, 2)) diff --git a/docs/release-notes/0.10.6.md b/docs/release-notes/0.10.6.md index e8618118a..814e8c79d 100644 --- a/docs/release-notes/0.10.6.md +++ b/docs/release-notes/0.10.6.md @@ -7,6 +7,7 @@ * Writing a dataframe with non-unique column names now throws an error, instead of silently overwriting {pr}`1335` {user}`ivirshup` * Bring optimization from {pr}`1233` to indexing on the whole `AnnData` object, not just the sparse dataset itself {pr}`1365` {user}`ilan-gold` * Fix mean slice length checking to use improved performance when indexing backed sparse matrices with boolean masks along their major axis {pr}`1366` {user}`ilan-gold` +* Fixed overflow occurring when writing dask arrays with sparse chunks by always writing dask arrays with 64 bit indptr and indices, and adding an overflow check to `.append` method of sparse on disk structures {pr}`1348` {user}`ivirshup` ```{rubric} Documentation ```