From 7e82adb288951fa5a40a20be61bb0086101f51d3 Mon Sep 17 00:00:00 2001 From: Ewan Barr Date: Tue, 2 Apr 2024 23:11:35 +0200 Subject: [PATCH 1/5] #33 Implementation of buffered file read (WIP) --- sigpyproc/core/kernels.py | 15 +++---- sigpyproc/io/bits.py | 14 +++++-- sigpyproc/io/fileio.py | 60 +++++++++++++++++++++++++++- sigpyproc/readers.py | 84 ++++++++++++++++++++++++++++++++++++++- 4 files changed, 157 insertions(+), 16 deletions(-) diff --git a/sigpyproc/core/kernels.py b/sigpyproc/core/kernels.py index d7dd748..90b79a0 100644 --- a/sigpyproc/core/kernels.py +++ b/sigpyproc/core/kernels.py @@ -5,10 +5,9 @@ from scipy import constants -@njit("u1[:](u1[:])", cache=True, parallel=True) -def unpack1_8(array): +@njit("u1[:](u1[:], u1[:])", cache=True, parallel=True) +def unpack1_8(array, unpacked): bitfact = 8 - unpacked = np.zeros(shape=array.size * bitfact, dtype=np.uint8) for ii in prange(array.size): unpacked[ii * bitfact + 0] = (array[ii] >> 7) & 1 unpacked[ii * bitfact + 1] = (array[ii] >> 6) & 1 @@ -21,10 +20,9 @@ def unpack1_8(array): return unpacked -@njit("u1[:](u1[:])", cache=True, parallel=True) -def unpack2_8(array): +@njit("u1[:](u1[:], u1[:])", cache=True, parallel=True) +def unpack2_8(array, unpacked): bitfact = 8 // 2 - unpacked = np.zeros(shape=array.size * bitfact, dtype=np.uint8) for ii in prange(array.size): unpacked[ii * bitfact + 0] = (array[ii] & 0xC0) >> 6 unpacked[ii * bitfact + 1] = (array[ii] & 0x30) >> 4 @@ -33,10 +31,9 @@ def unpack2_8(array): return unpacked -@njit("u1[:](u1[:])", cache=True, parallel=True) -def unpack4_8(array): +@njit("u1[:](u1[:], u1[:])", cache=True, parallel=True) +def unpack4_8(array, unpacked): bitfact = 8 // 4 - unpacked = np.zeros(shape=array.size * bitfact, dtype=np.uint8) for ii in prange(array.size): unpacked[ii * bitfact + 0] = (array[ii] & 0xF0) >> 4 unpacked[ii * bitfact + 1] = (array[ii] & 0x0F) >> 0 diff --git a/sigpyproc/io/bits.py b/sigpyproc/io/bits.py index cc22d13..47d5248 100644 --- a/sigpyproc/io/bits.py +++ b/sigpyproc/io/bits.py @@ -9,7 +9,7 @@ nbits_to_dtype = {1: " np.ndarray: +def unpack(array: np.ndarray, nbits: int, unpacked: np.ndarray = None) -> np.ndarray: """Unpack 1, 2 and 4 bit array. Only unpacks in big endian bit ordering. Parameters @@ -30,12 +30,18 @@ def unpack(array: np.ndarray, nbits: int) -> np.ndarray: if nbits is not 1, 2, or 4 """ assert array.dtype == np.uint8, "Array must be uint8" + bitfact = 8//nbits + if unpacked is None: + unpacked = np.zeros(shape=array.size * bitfact, dtype=np.uint8) + else: + if unpacked.size != array.size * bitfact: + raise ValueError(f"Unpacking array must be {bitfact}x input size") if nbits == 1: - unpacked = np.unpackbits(array, bitorder="big") + unpacked = kernels.unpack1_8(array, unpacked) elif nbits == 2: - unpacked = kernels.unpack2_8(array) + unpacked = kernels.unpack2_8(array, unpacked) elif nbits == 4: - unpacked = kernels.unpack4_8(array) + unpacked = kernels.unpack4_8(array, unpacked) else: raise ValueError("nbits must be 1, 2, or 4") return unpacked diff --git a/sigpyproc/io/fileio.py b/sigpyproc/io/fileio.py index 32ef252..684e65d 100644 --- a/sigpyproc/io/fileio.py +++ b/sigpyproc/io/fileio.py @@ -4,6 +4,14 @@ import warnings import numpy as np +try: + from collections.abc import Buffer +except ImportError: + from typing import Any + class Buffer(Any): + pass + + from sigpyproc.io.bits import BitsInfo, unpack, pack from sigpyproc.utils import get_logger @@ -32,7 +40,15 @@ def _open(self, ifile): self._close_current() self.file_obj = file_obj self.ifile_cur = ifile - + + def eos(self): + """Check if the end of the file stream has been reached""" + # First check if we are at the end of the current file + eof = self.file_obj.tell() == os.fstat(self.file_obj.fileno()).st_size + # Now check if we are at the end of the list of files + eol = self.ifile_cur == len(self.files) - 1 + return eof & eol + def _close_current(self): """Close the currently open local file, and therewith the set.""" if self.ifile_cur is not None: @@ -118,6 +134,48 @@ def cread(self, nunits: int) -> np.ndarray: if self.bitsinfo.unpack: return unpack(data_ar, self.nbits) return data_ar + + def creadinto(self, read_buffer: Buffer, unpack_buffer: Buffer = None) -> int: + """Read from file stream into a buffer of pre-defined length + + Parameters + ---------- + buffer : Buffer + An object exposing the Python Buffer Protocol interface [PEP 3118] + + Returns + ------- + int + The number of bytes readinto the buffer + + Raises + ------ + IOError + if file is closed. + + Detail + ------ + It is the responsibility of the caller to handle the case than fewer bytes + than requested are read into the buffer. When at the end of the file stream + the number of bytes returned will be zero. + """ + if self.file_obj.closed: + raise IOError("Cannot read closed file.") + nbytes = 0 + view = memoryview(read_buffer) + while True: + nbytes += self.file_obj.readinto(view[nbytes:]) + if nbytes == len(read_buffer) or self.eos(): + # We have either filled the buffer or reached the end of the stream + break + else: + self._seek2hdr(self.ifile_cur + 1) + if self.bitsinfo.unpack: + unpack_ar = np.frombuffer(unpack_buffer, dtype=np.uint8) + read_ar = np.frombuffer(read_buffer, dtype=np.uint8) + unpack(read_ar, self.nbits, unpack_ar) + return nbytes + def seek(self, offset: int, whence: int = 0) -> None: """Change the multifile stream position to the given data offset. diff --git a/sigpyproc/readers.py b/sigpyproc/readers.py index 76f5e15..36c15ed 100644 --- a/sigpyproc/readers.py +++ b/sigpyproc/readers.py @@ -2,7 +2,14 @@ import numpy as np import inspect -from collections.abc import Iterator +from collections.abc import Iterator, Callable +try: + from collections.abc import Buffer +except ImportError: + from typing import Any + class Buffer(Any): + pass + from rich.progress import track from sigpyproc.io import pfits @@ -119,7 +126,7 @@ def read_plan( nsamps: int | None = None, skipback: int = 0, description: str | None = None, - quiet: bool = False, + quiet: bool = False ) -> Iterator[tuple[int, int, np.ndarray]]: if nsamps is None: nsamps = self.header.nsamples - start @@ -152,6 +159,79 @@ def read_plan( ) yield block // self.header.nchans, ii, data + def _safe_allocate(self, allocator, nbytes): + buffer = allocator(nbytes) + if len(buffer) != nbytes: + raise ValueError(f"Allocated buffer is not the expected size {len(buffer)} (actual) != {nbytes} (expected)") + return buffer + + def read_plan_buffered( + self, + gulp: int = 16384, + start: int = 0, + nsamps: int | None = None, + skipback: int = 0, + description: str | None = None, + quiet: bool = False, + allocator: Callable[[int], Buffer] = None, + ) -> Iterator[tuple[int, int, np.ndarray]]: + """ + Iterate over the file, reading into a pre-allocated buffer + + Parameters + ---------- + ... + """ + if nsamps is None: + nsamps = self.header.nsamples - start + if description is None: + description = f"{inspect.stack()[1][3]} : " + + gulp = min(nsamps, gulp) + skipback = abs(skipback) + if skipback >= gulp: + raise ValueError(f"readsamps ({gulp}) must be > skipback ({skipback})") + + # Here we set the allocator + allocator = allocator if allocator is not None else bytearray + + # Here we allocate the buffer that will be readinto from file + read_buffer = self._safe_allocate(allocator, gulp * self.sampsize) + # Here we allocate (if needed) a buffer for unpacking the data + if self.bitsinfo.unpack: + # The unpacking always unpacks up to 8-bits, so the size should be nsamps * nchans + # Is there a way to guarantee that the behaviour is correct here? + unpack_buffer = self._safe_allocate(allocator, gulp * self.header.nchans) + data = np.frombuffer(unpack_buffer, dtype=self._file.bitsinfo.dtype) + else: + unpack_buffer = None + data = np.frombuffer(read_buffer, dtype=self._file.bitsinfo.dtype) + + self._file.seek(start * self.sampsize) + nreads, lastread = divmod(nsamps, (gulp - skipback)) + if lastread < skipback: + nreads -= 1 + lastread = nsamps - (nreads * (gulp - skipback)) + blocks = [ + (ii, gulp * self.header.nchans, -skipback * self.header.nchans) + for ii in range(nreads) + ] + if lastread != 0: + blocks.append((nreads, lastread * self.header.nchans, 0)) + + # / self.logger.debug(f"Reading plan: nsamps = {nsamps}, nreads = {nreads}") + # / self.logger.debug(f"Reading plan: gulp = {gulp}, lastread = {lastread}, skipback = {skipback}") + for ii, block, skip in track(blocks, description=description, disable=quiet): + nbytes = self._file.creadinto(read_buffer, unpack_buffer) + expected_nbytes = block * self.sampsize / self.header.nchans + if nbytes != expected_nbytes : + raise ValueError(f"Unexpected number of bytes read from file {nbytes} (actual) != {expected_nbytes} (expected)") + if skip != 0: + self._file.seek( + skip * self.bitsinfo.itemsize // self.bitsinfo.bitfact, whence=1 + ) + yield block // self.header.nchans, ii, data[:block] + class PFITSReader(Filterbank): """ From a79cc170a3a01a534bf60fa612b177185319746c Mon Sep 17 00:00:00 2001 From: Ewan Barr Date: Wed, 3 Apr 2024 00:05:39 +0200 Subject: [PATCH 2/5] #33 changed to older way of supporting buffer type annotations --- sigpyproc/io/fileio.py | 5 ++--- sigpyproc/readers.py | 5 ++--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/sigpyproc/io/fileio.py b/sigpyproc/io/fileio.py index 684e65d..6d79850 100644 --- a/sigpyproc/io/fileio.py +++ b/sigpyproc/io/fileio.py @@ -7,9 +7,8 @@ try: from collections.abc import Buffer except ImportError: - from typing import Any - class Buffer(Any): - pass + from typing import Any, NewType + Buffer = NewType("Buffer", Any) from sigpyproc.io.bits import BitsInfo, unpack, pack diff --git a/sigpyproc/readers.py b/sigpyproc/readers.py index 36c15ed..38e03ab 100644 --- a/sigpyproc/readers.py +++ b/sigpyproc/readers.py @@ -6,9 +6,8 @@ try: from collections.abc import Buffer except ImportError: - from typing import Any - class Buffer(Any): - pass + from typing import Any, NewType + Buffer = NewType("Buffer", Any) from rich.progress import track From ade309b2d0cfc551e92fe5583d8bc27e644390c4 Mon Sep 17 00:00:00 2001 From: Ewan Barr Date: Wed, 3 Apr 2024 00:06:55 +0200 Subject: [PATCH 3/5] added tests for buffered reader --- tests/conftest.py | 12 +++++++++- tests/test_readers.py | 55 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 66 insertions(+), 1 deletion(-) diff --git a/tests/conftest.py b/tests/conftest.py index e13de89..9004fd2 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -12,7 +12,6 @@ def tmpfile(tmp_path_factory, content=""): fn.write_text(content) return fn.as_posix() - @pytest.fixture(scope="session", autouse=True) def filfile_1bit(): return Path(_datadir / "parkes_1bit.fil").as_posix() @@ -37,6 +36,17 @@ def filfile_8bit_1(): def filfile_8bit_2(): return Path(_datadir / "parkes_8bit_2.fil").as_posix() +@pytest.fixture(scope="session", autouse=True) +def filterbank_files(): + return [ + Path(_datadir / "parkes_1bit.fil").as_posix(), + Path(_datadir / "parkes_2bit.fil").as_posix(), + Path(_datadir / "parkes_4bit.fil").as_posix(), + [Path(_datadir / "parkes_8bit_1.fil").as_posix(), + Path(_datadir / "parkes_8bit_2.fil").as_posix()], + Path(_datadir / "tutorial.fil").as_posix(), + Path(_datadir / "tutorial_2bit.fil").as_posix(), + ] @pytest.fixture(scope="session", autouse=True) def fitsfile_4bit(): diff --git a/tests/test_readers.py b/tests/test_readers.py index e8fc5d6..be601b1 100644 --- a/tests/test_readers.py +++ b/tests/test_readers.py @@ -1,3 +1,4 @@ +import pytest import numpy as np from sigpyproc.readers import FilReader, PFITSReader, PulseExtractor from sigpyproc.header import Header @@ -51,6 +52,60 @@ def test_read_dedisp_block_outofrange_dm(self, filfile_8bit_1): fil = FilReader(filfile_8bit_1) with np.testing.assert_raises(ValueError): fil.read_dedisp_block(0, 100, 10000) + + def test_read_plan(self, filfile_8bit_1): + fil = FilReader(filfile_8bit_1) + for nsamps, ii, data in fil.read_plan(gulp=512, nsamps=1024): + assert isinstance(data, np.ndarray) + + def test_read_plan(self, filterbank_files): + for filfile in filterbank_files: + fil = FilReader(filfile) + for nsamps, ii, data in fil.read_plan(gulp=512, nsamps=1024): + assert isinstance(data, np.ndarray) + + def test_read_plan_buffered(self, filterbank_files): + for filfile in filterbank_files: + fil = FilReader(filfile) + for nsamps, ii, data in fil.read_plan_buffered(gulp=512, nsamps=1024): + assert isinstance(data, np.ndarray) + + def test_compare_buffered_and_nonbuffered(self, filterbank_files): + def allocator(nbytes): + buffer = np.zeros(nbytes, dtype="ubyte") + return memoryview(buffer) + + for filfile in filterbank_files: + fil0 = FilReader(filfile) + plan0 = fil0.read_plan(quiet=True) + fil1 = FilReader(filfile) + plan1 = fil1.read_plan_buffered(quiet=True, allocator=allocator) + for (nsamps0, ii0, data0), (nsamps1, ii1, data1) in zip(plan0, plan1): + assert nsamps0 == nsamps1 + assert ii0 == ii1 + assert np.array_equal(data0, data1) + + def test_nsamps_eq_skipback(self, filfile_8bit_1): + fil = FilReader(filfile_8bit_1) + with pytest.raises(ValueError): + for nsamps, ii, data in fil.read_plan_buffered(gulp=512, nsamps=1024, skipback=1024): + pass + + def test_read_plan_custom_allocator(self, filfile_8bit_1): + fil = FilReader(filfile_8bit_1) + def allocator(nbytes): + buffer = np.zeros(nbytes, dtype="ubyte") + return memoryview(buffer) + for nsamps, ii, data in fil.read_plan_buffered(allocator=allocator): + pass + + def test_read_plan_invalid_custom_allocator(self, filfile_8bit_1): + fil = FilReader(filfile_8bit_1) + def allocator(): + pass + with pytest.raises(TypeError): + for nsamps, ii, data in fil.read_plan_buffered(allocator=allocator): + pass class TestPFITSReader(object): From e03de442dfd02c68ca87a37f7f981a853c397dbd Mon Sep 17 00:00:00 2001 From: Ewan Barr Date: Wed, 3 Apr 2024 00:13:26 +0200 Subject: [PATCH 4/5] #33 updated doc strings --- sigpyproc/readers.py | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/sigpyproc/readers.py b/sigpyproc/readers.py index 38e03ab..d4839d8 100644 --- a/sigpyproc/readers.py +++ b/sigpyproc/readers.py @@ -179,7 +179,26 @@ def read_plan_buffered( Parameters ---------- - ... + gulp: int + The number of samples (spectra) to read in each iteration + start: int + The starting sample (spectrum) to read from + nsamps: int + The total number of samples (spectra) to read + skipback: int = 0 + The number of samples (spectra) to seek back after each read + description: str + Annotation for progress bar + quiet: bool + Disable/Enable progress bar + allocator: Callable[[int], Buffer] + An allocator callback that returns an object implementing + the Python Buffer Protocol interface (PEP 3118) + + Yields + ------- + Tuple[int, int, np.ndarray] + The number of samples read, the index of the read and the unpacked data """ if nsamps is None: nsamps = self.header.nsamples - start From 3f2b03b1b08c3ceac405cb0a7b5248f0a130ffaa Mon Sep 17 00:00:00 2001 From: Ewan Barr Date: Wed, 3 Apr 2024 00:18:15 +0200 Subject: [PATCH 5/5] #33 updated kernel tests to use new interface --- tests/test_kernels.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/tests/test_kernels.py b/tests/test_kernels.py index f007933..3b169ac 100644 --- a/tests/test_kernels.py +++ b/tests/test_kernels.py @@ -10,28 +10,33 @@ def test_unpack1_8(self): [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 1, 1, 0, 0, 0, 1, 0, 1, 1, 1], dtype=np.uint8 ) - np.testing.assert_array_equal(expected_bit1, kernels.unpack1_8(input_arr)) + unpacked = np.empty_like(expected_bit1) + np.testing.assert_array_equal(expected_bit1, kernels.unpack1_8(input_arr, unpacked)) def test_unpack2_8(self): input_arr = np.array([0, 2, 7, 23], dtype=np.uint8) expected_bit2 = np.array( [0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 1, 3, 0, 1, 1, 3], dtype=np.uint8 ) - np.testing.assert_array_equal(expected_bit2, kernels.unpack2_8(input_arr)) + unpacked = np.empty_like(expected_bit2) + np.testing.assert_array_equal(expected_bit2, kernels.unpack2_8(input_arr, unpacked)) def test_unpack4_8(self): input_arr = np.array([0, 2, 7, 23], dtype=np.uint8) expected_bit4 = np.array([0, 0, 0, 2, 0, 7, 1, 7], dtype=np.uint8) - np.testing.assert_array_equal(expected_bit4, kernels.unpack4_8(input_arr)) + unpacked = np.empty_like(expected_bit4) + np.testing.assert_array_equal(expected_bit4, kernels.unpack4_8(input_arr, unpacked)) def test_pack2_8(self): input_arr = np.arange(255, dtype=np.uint8) - output = kernels.pack2_8(kernels.unpack2_8(input_arr)) + unpacked = np.empty(input_arr.size * 4, dtype="ubyte") + output = kernels.pack2_8(kernels.unpack2_8(input_arr, unpacked)) np.testing.assert_array_equal(input_arr, output) def test_pack4_8(self): input_arr = np.arange(255, dtype=np.uint8) - output = kernels.pack4_8(kernels.unpack4_8(input_arr)) + unpacked = np.empty(input_arr.size * 2, dtype="ubyte") + output = kernels.pack4_8(kernels.unpack4_8(input_arr, unpacked)) np.testing.assert_array_equal(input_arr, output)