Skip to content

Commit

Permalink
Write dask arrays with 64bit indptr (#1348)
Browse files Browse the repository at this point in the history
* Write dask arrays with 64bit indptr

* Write both indices and indptr at 64 bit

* easier test

* Overflow checking SparseDataset.append

* Add test

* Release note

* Actually add test

* Update anndata/_io/specs/methods.py

* More specific test

* Check for modification

* Update anndata/_core/sparse_dataset.py

* maybe cheaper operation

* remove dead code
  • Loading branch information
ivirshup authored Mar 1, 2024
1 parent a478647 commit f0d3a6e
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 4 deletions.
15 changes: 13 additions & 2 deletions anndata/_core/sparse_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,15 @@ def append(self, sparse_matrix: ss.spmatrix):
f"Matrices must have same format. Currently are "
f"{self.format!r} and {get_format(sparse_matrix)!r}"
)
indptr_offset = len(self.group["indices"])
if self.group["indptr"].dtype == np.int32:
new_nnz = indptr_offset + len(sparse_matrix.indices)
if new_nnz >= np.iinfo(np.int32).max:
raise OverflowError(
"This array was written with a 32 bit intptr, but is now large "
"enough to require 64 bit values. Please recreate the array with "
"a 64 bit indptr."
)

# shape
if self.format == "csr":
Expand Down Expand Up @@ -501,11 +510,13 @@ def append(self, sparse_matrix: ss.spmatrix):
# indptr
indptr = self.group["indptr"]
orig_data_size = indptr.shape[0]
append_offset = indptr[-1]
indptr.resize((orig_data_size + sparse_matrix.indptr.shape[0] - 1,))
indptr[orig_data_size:] = (
sparse_matrix.indptr[1:].astype(np.int64) + append_offset
sparse_matrix.indptr[1:].astype(np.int64) + indptr_offset
)
# Clear cached property
if hasattr(self, "indptr"):
del self.indptr

# indices
indices = self.group["indices"]
Expand Down
8 changes: 7 additions & 1 deletion anndata/_io/specs/methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,12 @@ def write_sparse_dataset(f, k, elem, _writer, dataset_kwargs=MappingProxyType({}
)
def write_dask_sparse(f, k, elem, _writer, dataset_kwargs=MappingProxyType({})):
sparse_format = elem._meta.format

def as_int64_indices(x):
x.indptr = x.indptr.astype(np.int64, copy=False)
x.indices = x.indices.astype(np.int64, copy=False)
return x

if sparse_format == "csr":
axis = 0
elif sparse_format == "csc":
Expand All @@ -583,7 +589,7 @@ def chunk_slice(start: int, stop: int) -> tuple[slice | None, slice | None]:
_writer.write_elem(
f,
k,
elem[chunk_slice(chunk_start, chunk_stop)].compute(),
as_int64_indices(elem[chunk_slice(chunk_start, chunk_stop)].compute()),
dataset_kwargs=dataset_kwargs,
)

Expand Down
39 changes: 38 additions & 1 deletion anndata/tests/test_backed_sparse.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import anndata as ad
from anndata._core.anndata import AnnData
from anndata._core.sparse_dataset import sparse_dataset
from anndata.experimental import read_dispatched
from anndata.experimental import read_dispatched, write_elem
from anndata.tests.helpers import AccessTrackingStore, assert_equal, subset_func

if TYPE_CHECKING:
Expand Down Expand Up @@ -433,3 +433,40 @@ def test_backed_sizeof(
assert csr_disk.__sizeof__(with_disk=True) == csc_disk.__sizeof__(with_disk=True)
assert csr_mem.__sizeof__() > csr_disk.__sizeof__()
assert csr_mem.__sizeof__() > csc_disk.__sizeof__()


@pytest.mark.parametrize(
"group_fn",
[
pytest.param(lambda _: zarr.group(), id="zarr"),
pytest.param(lambda p: h5py.File(p / "test.h5", mode="a"), id="h5py"),
],
)
def test_append_overflow_check(group_fn, tmpdir):
group = group_fn(tmpdir)
typemax_int32 = np.iinfo(np.int32).max
orig_mtx = sparse.csr_matrix(np.ones((1, 1), dtype=bool))
# Minimally allocating new matrix
new_mtx = sparse.csr_matrix(
(
np.broadcast_to(True, typemax_int32 - 1),
np.broadcast_to(np.int32(1), typemax_int32 - 1),
[0, typemax_int32 - 1],
),
shape=(1, 2),
)

write_elem(group, "mtx", orig_mtx)
backed = sparse_dataset(group["mtx"])

# Checking for correct caching behaviour
backed.indptr

with pytest.raises(
OverflowError,
match=r"This array was written with a 32 bit intptr, but is now large.*",
):
backed.append(new_mtx)

# Check for any modification
assert_equal(backed, orig_mtx)
3 changes: 3 additions & 0 deletions anndata/tests/test_io_elementwise.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ def test_dask_write_sparse(store, sparse_format):
assert_equal(X_from_disk, X_dask_from_disk)
assert_equal(dict(store["X"].attrs), dict(store["X_dask"].attrs))

assert store["X_dask/indptr"].dtype == np.int64
assert store["X_dask/indices"].dtype == np.int64


def test_io_spec_raw(store):
adata = gen_adata((3, 2))
Expand Down
1 change: 1 addition & 0 deletions docs/release-notes/0.10.6.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* Writing a dataframe with non-unique column names now throws an error, instead of silently overwriting {pr}`1335` {user}`ivirshup`
* Bring optimization from {pr}`1233` to indexing on the whole `AnnData` object, not just the sparse dataset itself {pr}`1365` {user}`ilan-gold`
* Fix mean slice length checking to use improved performance when indexing backed sparse matrices with boolean masks along their major axis {pr}`1366` {user}`ilan-gold`
* Fixed overflow occurring when writing dask arrays with sparse chunks by always writing dask arrays with 64 bit indptr and indices, and adding an overflow check to `.append` method of sparse on disk structures {pr}`1348` {user}`ivirshup`

```{rubric} Documentation
```
Expand Down

0 comments on commit f0d3a6e

Please sign in to comment.