Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#33 Implementation of buffered file read (WIP) #34

Merged
merged 5 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions sigpyproc/core/kernels.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@
from scipy import constants


@njit("u1[:](u1[:])", cache=True, parallel=True)
def unpack1_8(array):
@njit("u1[:](u1[:], u1[:])", cache=True, parallel=True)
def unpack1_8(array, unpacked):
bitfact = 8
unpacked = np.zeros(shape=array.size * bitfact, dtype=np.uint8)
for ii in prange(array.size):
unpacked[ii * bitfact + 0] = (array[ii] >> 7) & 1
unpacked[ii * bitfact + 1] = (array[ii] >> 6) & 1
Expand All @@ -21,10 +20,9 @@ def unpack1_8(array):
return unpacked


@njit("u1[:](u1[:])", cache=True, parallel=True)
def unpack2_8(array):
@njit("u1[:](u1[:], u1[:])", cache=True, parallel=True)
def unpack2_8(array, unpacked):
bitfact = 8 // 2
unpacked = np.zeros(shape=array.size * bitfact, dtype=np.uint8)
for ii in prange(array.size):
unpacked[ii * bitfact + 0] = (array[ii] & 0xC0) >> 6
unpacked[ii * bitfact + 1] = (array[ii] & 0x30) >> 4
Expand All @@ -33,10 +31,9 @@ def unpack2_8(array):
return unpacked


@njit("u1[:](u1[:])", cache=True, parallel=True)
def unpack4_8(array):
@njit("u1[:](u1[:], u1[:])", cache=True, parallel=True)
def unpack4_8(array, unpacked):
bitfact = 8 // 4
unpacked = np.zeros(shape=array.size * bitfact, dtype=np.uint8)
for ii in prange(array.size):
unpacked[ii * bitfact + 0] = (array[ii] & 0xF0) >> 4
unpacked[ii * bitfact + 1] = (array[ii] & 0x0F) >> 0
Expand Down
14 changes: 10 additions & 4 deletions sigpyproc/io/bits.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
nbits_to_dtype = {1: "<u1", 2: "<u1", 4: "<u1", 8: "<u1", 16: "<u2", 32: "<f4"}


def unpack(array: np.ndarray, nbits: int) -> np.ndarray:
def unpack(array: np.ndarray, nbits: int, unpacked: np.ndarray = None) -> np.ndarray:
"""Unpack 1, 2 and 4 bit array. Only unpacks in big endian bit ordering.

Parameters
Expand All @@ -30,12 +30,18 @@
if nbits is not 1, 2, or 4
"""
assert array.dtype == np.uint8, "Array must be uint8"
bitfact = 8//nbits
if unpacked is None:
unpacked = np.zeros(shape=array.size * bitfact, dtype=np.uint8)
else:
if unpacked.size != array.size * bitfact:
raise ValueError(f"Unpacking array must be {bitfact}x input size")

Check warning on line 38 in sigpyproc/io/bits.py

View check run for this annotation

Codecov / codecov/patch

sigpyproc/io/bits.py#L38

Added line #L38 was not covered by tests
if nbits == 1:
unpacked = np.unpackbits(array, bitorder="big")
unpacked = kernels.unpack1_8(array, unpacked)
elif nbits == 2:
unpacked = kernels.unpack2_8(array)
unpacked = kernels.unpack2_8(array, unpacked)
elif nbits == 4:
unpacked = kernels.unpack4_8(array)
unpacked = kernels.unpack4_8(array, unpacked)
else:
raise ValueError("nbits must be 1, 2, or 4")
return unpacked
Expand Down
59 changes: 58 additions & 1 deletion sigpyproc/io/fileio.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@
import warnings
import numpy as np

try:
from collections.abc import Buffer
except ImportError:
from typing import Any, NewType
Buffer = NewType("Buffer", Any)


from sigpyproc.io.bits import BitsInfo, unpack, pack
from sigpyproc.utils import get_logger

Expand Down Expand Up @@ -32,7 +39,15 @@
self._close_current()
self.file_obj = file_obj
self.ifile_cur = ifile


def eos(self):
"""Check if the end of the file stream has been reached"""
# First check if we are at the end of the current file
eof = self.file_obj.tell() == os.fstat(self.file_obj.fileno()).st_size
# Now check if we are at the end of the list of files
eol = self.ifile_cur == len(self.files) - 1
return eof & eol

def _close_current(self):
"""Close the currently open local file, and therewith the set."""
if self.ifile_cur is not None:
Expand Down Expand Up @@ -118,6 +133,48 @@
if self.bitsinfo.unpack:
return unpack(data_ar, self.nbits)
return data_ar

def creadinto(self, read_buffer: Buffer, unpack_buffer: Buffer = None) -> int:
"""Read from file stream into a buffer of pre-defined length

Parameters
----------
buffer : Buffer
An object exposing the Python Buffer Protocol interface [PEP 3118]

Returns
-------
int
The number of bytes readinto the buffer

Raises
------
IOError
if file is closed.

Detail
------
It is the responsibility of the caller to handle the case than fewer bytes
than requested are read into the buffer. When at the end of the file stream
the number of bytes returned will be zero.
"""
if self.file_obj.closed:
raise IOError("Cannot read closed file.")

Check warning on line 162 in sigpyproc/io/fileio.py

View check run for this annotation

Codecov / codecov/patch

sigpyproc/io/fileio.py#L162

Added line #L162 was not covered by tests
nbytes = 0
view = memoryview(read_buffer)
while True:
nbytes += self.file_obj.readinto(view[nbytes:])
if nbytes == len(read_buffer) or self.eos():
# We have either filled the buffer or reached the end of the stream
break
else:
self._seek2hdr(self.ifile_cur + 1)
if self.bitsinfo.unpack:
unpack_ar = np.frombuffer(unpack_buffer, dtype=np.uint8)
read_ar = np.frombuffer(read_buffer, dtype=np.uint8)
unpack(read_ar, self.nbits, unpack_ar)
return nbytes


def seek(self, offset: int, whence: int = 0) -> None:
"""Change the multifile stream position to the given data offset.
Expand Down
102 changes: 100 additions & 2 deletions sigpyproc/readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@
import numpy as np
import inspect

from collections.abc import Iterator
from collections.abc import Iterator, Callable
try:
from collections.abc import Buffer
except ImportError:
from typing import Any, NewType
Buffer = NewType("Buffer", Any)

from rich.progress import track

from sigpyproc.io import pfits
Expand Down Expand Up @@ -119,7 +125,7 @@
nsamps: int | None = None,
skipback: int = 0,
description: str | None = None,
quiet: bool = False,
quiet: bool = False
) -> Iterator[tuple[int, int, np.ndarray]]:
if nsamps is None:
nsamps = self.header.nsamples - start
Expand Down Expand Up @@ -152,6 +158,98 @@
)
yield block // self.header.nchans, ii, data

def _safe_allocate(self, allocator, nbytes):
buffer = allocator(nbytes)
if len(buffer) != nbytes:
raise ValueError(f"Allocated buffer is not the expected size {len(buffer)} (actual) != {nbytes} (expected)")

Check warning on line 164 in sigpyproc/readers.py

View check run for this annotation

Codecov / codecov/patch

sigpyproc/readers.py#L164

Added line #L164 was not covered by tests
return buffer

def read_plan_buffered(
self,
gulp: int = 16384,
start: int = 0,
nsamps: int | None = None,
skipback: int = 0,
description: str | None = None,
quiet: bool = False,
allocator: Callable[[int], Buffer] = None,
) -> Iterator[tuple[int, int, np.ndarray]]:
"""
Iterate over the file, reading into a pre-allocated buffer

Parameters
----------
gulp: int
The number of samples (spectra) to read in each iteration
start: int
The starting sample (spectrum) to read from
nsamps: int
The total number of samples (spectra) to read
skipback: int = 0
The number of samples (spectra) to seek back after each read
description: str
Annotation for progress bar
quiet: bool
Disable/Enable progress bar
allocator: Callable[[int], Buffer]
An allocator callback that returns an object implementing
the Python Buffer Protocol interface (PEP 3118)

Yields
-------
Tuple[int, int, np.ndarray]
The number of samples read, the index of the read and the unpacked data
"""
if nsamps is None:
nsamps = self.header.nsamples - start
if description is None:
description = f"{inspect.stack()[1][3]} : "

gulp = min(nsamps, gulp)
skipback = abs(skipback)
if skipback >= gulp:
raise ValueError(f"readsamps ({gulp}) must be > skipback ({skipback})")

# Here we set the allocator
allocator = allocator if allocator is not None else bytearray

# Here we allocate the buffer that will be readinto from file
read_buffer = self._safe_allocate(allocator, gulp * self.sampsize)
# Here we allocate (if needed) a buffer for unpacking the data
if self.bitsinfo.unpack:
# The unpacking always unpacks up to 8-bits, so the size should be nsamps * nchans
# Is there a way to guarantee that the behaviour is correct here?
unpack_buffer = self._safe_allocate(allocator, gulp * self.header.nchans)
data = np.frombuffer(unpack_buffer, dtype=self._file.bitsinfo.dtype)
else:
unpack_buffer = None
data = np.frombuffer(read_buffer, dtype=self._file.bitsinfo.dtype)

self._file.seek(start * self.sampsize)
nreads, lastread = divmod(nsamps, (gulp - skipback))
if lastread < skipback:
nreads -= 1
lastread = nsamps - (nreads * (gulp - skipback))

Check warning on line 232 in sigpyproc/readers.py

View check run for this annotation

Codecov / codecov/patch

sigpyproc/readers.py#L231-L232

Added lines #L231 - L232 were not covered by tests
blocks = [
(ii, gulp * self.header.nchans, -skipback * self.header.nchans)
for ii in range(nreads)
]
if lastread != 0:
blocks.append((nreads, lastread * self.header.nchans, 0))

# / self.logger.debug(f"Reading plan: nsamps = {nsamps}, nreads = {nreads}")
# / self.logger.debug(f"Reading plan: gulp = {gulp}, lastread = {lastread}, skipback = {skipback}")
for ii, block, skip in track(blocks, description=description, disable=quiet):
nbytes = self._file.creadinto(read_buffer, unpack_buffer)
expected_nbytes = block * self.sampsize / self.header.nchans
if nbytes != expected_nbytes :
raise ValueError(f"Unexpected number of bytes read from file {nbytes} (actual) != {expected_nbytes} (expected)")

Check warning on line 246 in sigpyproc/readers.py

View check run for this annotation

Codecov / codecov/patch

sigpyproc/readers.py#L246

Added line #L246 was not covered by tests
if skip != 0:
self._file.seek(

Check warning on line 248 in sigpyproc/readers.py

View check run for this annotation

Codecov / codecov/patch

sigpyproc/readers.py#L248

Added line #L248 was not covered by tests
skip * self.bitsinfo.itemsize // self.bitsinfo.bitfact, whence=1
)
yield block // self.header.nchans, ii, data[:block]


class PFITSReader(Filterbank):
"""
Expand Down
12 changes: 11 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ def tmpfile(tmp_path_factory, content=""):
fn.write_text(content)
return fn.as_posix()


@pytest.fixture(scope="session", autouse=True)
def filfile_1bit():
return Path(_datadir / "parkes_1bit.fil").as_posix()
Expand All @@ -37,6 +36,17 @@ def filfile_8bit_1():
def filfile_8bit_2():
return Path(_datadir / "parkes_8bit_2.fil").as_posix()

@pytest.fixture(scope="session", autouse=True)
def filterbank_files():
return [
Path(_datadir / "parkes_1bit.fil").as_posix(),
Path(_datadir / "parkes_2bit.fil").as_posix(),
Path(_datadir / "parkes_4bit.fil").as_posix(),
[Path(_datadir / "parkes_8bit_1.fil").as_posix(),
Path(_datadir / "parkes_8bit_2.fil").as_posix()],
Path(_datadir / "tutorial.fil").as_posix(),
Path(_datadir / "tutorial_2bit.fil").as_posix(),
]

@pytest.fixture(scope="session", autouse=True)
def fitsfile_4bit():
Expand Down
15 changes: 10 additions & 5 deletions tests/test_kernels.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,33 @@ def test_unpack1_8(self):
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0,
0, 0, 0, 0, 0, 1, 1, 1, 0, 0, 0, 1, 0, 1, 1, 1], dtype=np.uint8
)
np.testing.assert_array_equal(expected_bit1, kernels.unpack1_8(input_arr))
unpacked = np.empty_like(expected_bit1)
np.testing.assert_array_equal(expected_bit1, kernels.unpack1_8(input_arr, unpacked))

def test_unpack2_8(self):
input_arr = np.array([0, 2, 7, 23], dtype=np.uint8)
expected_bit2 = np.array(
[0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 1, 3, 0, 1, 1, 3], dtype=np.uint8
)
np.testing.assert_array_equal(expected_bit2, kernels.unpack2_8(input_arr))
unpacked = np.empty_like(expected_bit2)
np.testing.assert_array_equal(expected_bit2, kernels.unpack2_8(input_arr, unpacked))

def test_unpack4_8(self):
input_arr = np.array([0, 2, 7, 23], dtype=np.uint8)
expected_bit4 = np.array([0, 0, 0, 2, 0, 7, 1, 7], dtype=np.uint8)
np.testing.assert_array_equal(expected_bit4, kernels.unpack4_8(input_arr))
unpacked = np.empty_like(expected_bit4)
np.testing.assert_array_equal(expected_bit4, kernels.unpack4_8(input_arr, unpacked))

def test_pack2_8(self):
input_arr = np.arange(255, dtype=np.uint8)
output = kernels.pack2_8(kernels.unpack2_8(input_arr))
unpacked = np.empty(input_arr.size * 4, dtype="ubyte")
output = kernels.pack2_8(kernels.unpack2_8(input_arr, unpacked))
np.testing.assert_array_equal(input_arr, output)

def test_pack4_8(self):
input_arr = np.arange(255, dtype=np.uint8)
output = kernels.pack4_8(kernels.unpack4_8(input_arr))
unpacked = np.empty(input_arr.size * 2, dtype="ubyte")
output = kernels.pack4_8(kernels.unpack4_8(input_arr, unpacked))
np.testing.assert_array_equal(input_arr, output)


Expand Down
Loading
Loading