Skip to content

Commit

Permalink
Merge pull request #34 from ewanbarr/main
Browse files Browse the repository at this point in the history
#33 Implementation of buffered file read (WIP)
  • Loading branch information
pravirkr authored Apr 4, 2024
2 parents 9f95279 + 3f2b03b commit e962bba
Show file tree
Hide file tree
Showing 7 changed files with 250 additions and 22 deletions.
15 changes: 6 additions & 9 deletions sigpyproc/core/kernels.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@
from scipy import constants


@njit("u1[:](u1[:])", cache=True, parallel=True)
def unpack1_8(array):
@njit("u1[:](u1[:], u1[:])", cache=True, parallel=True)
def unpack1_8(array, unpacked):
bitfact = 8
unpacked = np.zeros(shape=array.size * bitfact, dtype=np.uint8)
for ii in prange(array.size):
unpacked[ii * bitfact + 0] = (array[ii] >> 7) & 1
unpacked[ii * bitfact + 1] = (array[ii] >> 6) & 1
Expand All @@ -21,10 +20,9 @@ def unpack1_8(array):
return unpacked


@njit("u1[:](u1[:])", cache=True, parallel=True)
def unpack2_8(array):
@njit("u1[:](u1[:], u1[:])", cache=True, parallel=True)
def unpack2_8(array, unpacked):
bitfact = 8 // 2
unpacked = np.zeros(shape=array.size * bitfact, dtype=np.uint8)
for ii in prange(array.size):
unpacked[ii * bitfact + 0] = (array[ii] & 0xC0) >> 6
unpacked[ii * bitfact + 1] = (array[ii] & 0x30) >> 4
Expand All @@ -33,10 +31,9 @@ def unpack2_8(array):
return unpacked


@njit("u1[:](u1[:])", cache=True, parallel=True)
def unpack4_8(array):
@njit("u1[:](u1[:], u1[:])", cache=True, parallel=True)
def unpack4_8(array, unpacked):
bitfact = 8 // 4
unpacked = np.zeros(shape=array.size * bitfact, dtype=np.uint8)
for ii in prange(array.size):
unpacked[ii * bitfact + 0] = (array[ii] & 0xF0) >> 4
unpacked[ii * bitfact + 1] = (array[ii] & 0x0F) >> 0
Expand Down
14 changes: 10 additions & 4 deletions sigpyproc/io/bits.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
nbits_to_dtype = {1: "<u1", 2: "<u1", 4: "<u1", 8: "<u1", 16: "<u2", 32: "<f4"}


def unpack(array: np.ndarray, nbits: int) -> np.ndarray:
def unpack(array: np.ndarray, nbits: int, unpacked: np.ndarray = None) -> np.ndarray:
"""Unpack 1, 2 and 4 bit array. Only unpacks in big endian bit ordering.
Parameters
Expand All @@ -30,12 +30,18 @@ def unpack(array: np.ndarray, nbits: int) -> np.ndarray:
if nbits is not 1, 2, or 4
"""
assert array.dtype == np.uint8, "Array must be uint8"
bitfact = 8//nbits
if unpacked is None:
unpacked = np.zeros(shape=array.size * bitfact, dtype=np.uint8)
else:
if unpacked.size != array.size * bitfact:
raise ValueError(f"Unpacking array must be {bitfact}x input size")
if nbits == 1:
unpacked = np.unpackbits(array, bitorder="big")
unpacked = kernels.unpack1_8(array, unpacked)
elif nbits == 2:
unpacked = kernels.unpack2_8(array)
unpacked = kernels.unpack2_8(array, unpacked)
elif nbits == 4:
unpacked = kernels.unpack4_8(array)
unpacked = kernels.unpack4_8(array, unpacked)
else:
raise ValueError("nbits must be 1, 2, or 4")
return unpacked
Expand Down
59 changes: 58 additions & 1 deletion sigpyproc/io/fileio.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@
import warnings
import numpy as np

try:
from collections.abc import Buffer
except ImportError:
from typing import Any, NewType
Buffer = NewType("Buffer", Any)


from sigpyproc.io.bits import BitsInfo, unpack, pack
from sigpyproc.utils import get_logger

Expand Down Expand Up @@ -32,7 +39,15 @@ def _open(self, ifile):
self._close_current()
self.file_obj = file_obj
self.ifile_cur = ifile


def eos(self):
"""Check if the end of the file stream has been reached"""
# First check if we are at the end of the current file
eof = self.file_obj.tell() == os.fstat(self.file_obj.fileno()).st_size
# Now check if we are at the end of the list of files
eol = self.ifile_cur == len(self.files) - 1
return eof & eol

def _close_current(self):
"""Close the currently open local file, and therewith the set."""
if self.ifile_cur is not None:
Expand Down Expand Up @@ -118,6 +133,48 @@ def cread(self, nunits: int) -> np.ndarray:
if self.bitsinfo.unpack:
return unpack(data_ar, self.nbits)
return data_ar

def creadinto(self, read_buffer: Buffer, unpack_buffer: Buffer = None) -> int:
"""Read from file stream into a buffer of pre-defined length
Parameters
----------
buffer : Buffer
An object exposing the Python Buffer Protocol interface [PEP 3118]
Returns
-------
int
The number of bytes readinto the buffer
Raises
------
IOError
if file is closed.
Detail
------
It is the responsibility of the caller to handle the case than fewer bytes
than requested are read into the buffer. When at the end of the file stream
the number of bytes returned will be zero.
"""
if self.file_obj.closed:
raise IOError("Cannot read closed file.")
nbytes = 0
view = memoryview(read_buffer)
while True:
nbytes += self.file_obj.readinto(view[nbytes:])
if nbytes == len(read_buffer) or self.eos():
# We have either filled the buffer or reached the end of the stream
break
else:
self._seek2hdr(self.ifile_cur + 1)
if self.bitsinfo.unpack:
unpack_ar = np.frombuffer(unpack_buffer, dtype=np.uint8)
read_ar = np.frombuffer(read_buffer, dtype=np.uint8)
unpack(read_ar, self.nbits, unpack_ar)
return nbytes


def seek(self, offset: int, whence: int = 0) -> None:
"""Change the multifile stream position to the given data offset.
Expand Down
102 changes: 100 additions & 2 deletions sigpyproc/readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@
import numpy as np
import inspect

from collections.abc import Iterator
from collections.abc import Iterator, Callable
try:
from collections.abc import Buffer
except ImportError:
from typing import Any, NewType
Buffer = NewType("Buffer", Any)

from rich.progress import track

from sigpyproc.io import pfits
Expand Down Expand Up @@ -119,7 +125,7 @@ def read_plan(
nsamps: int | None = None,
skipback: int = 0,
description: str | None = None,
quiet: bool = False,
quiet: bool = False
) -> Iterator[tuple[int, int, np.ndarray]]:
if nsamps is None:
nsamps = self.header.nsamples - start
Expand Down Expand Up @@ -152,6 +158,98 @@ def read_plan(
)
yield block // self.header.nchans, ii, data

def _safe_allocate(self, allocator, nbytes):
buffer = allocator(nbytes)
if len(buffer) != nbytes:
raise ValueError(f"Allocated buffer is not the expected size {len(buffer)} (actual) != {nbytes} (expected)")
return buffer

def read_plan_buffered(
self,
gulp: int = 16384,
start: int = 0,
nsamps: int | None = None,
skipback: int = 0,
description: str | None = None,
quiet: bool = False,
allocator: Callable[[int], Buffer] = None,
) -> Iterator[tuple[int, int, np.ndarray]]:
"""
Iterate over the file, reading into a pre-allocated buffer
Parameters
----------
gulp: int
The number of samples (spectra) to read in each iteration
start: int
The starting sample (spectrum) to read from
nsamps: int
The total number of samples (spectra) to read
skipback: int = 0
The number of samples (spectra) to seek back after each read
description: str
Annotation for progress bar
quiet: bool
Disable/Enable progress bar
allocator: Callable[[int], Buffer]
An allocator callback that returns an object implementing
the Python Buffer Protocol interface (PEP 3118)
Yields
-------
Tuple[int, int, np.ndarray]
The number of samples read, the index of the read and the unpacked data
"""
if nsamps is None:
nsamps = self.header.nsamples - start
if description is None:
description = f"{inspect.stack()[1][3]} : "

gulp = min(nsamps, gulp)
skipback = abs(skipback)
if skipback >= gulp:
raise ValueError(f"readsamps ({gulp}) must be > skipback ({skipback})")

# Here we set the allocator
allocator = allocator if allocator is not None else bytearray

# Here we allocate the buffer that will be readinto from file
read_buffer = self._safe_allocate(allocator, gulp * self.sampsize)
# Here we allocate (if needed) a buffer for unpacking the data
if self.bitsinfo.unpack:
# The unpacking always unpacks up to 8-bits, so the size should be nsamps * nchans
# Is there a way to guarantee that the behaviour is correct here?
unpack_buffer = self._safe_allocate(allocator, gulp * self.header.nchans)
data = np.frombuffer(unpack_buffer, dtype=self._file.bitsinfo.dtype)
else:
unpack_buffer = None
data = np.frombuffer(read_buffer, dtype=self._file.bitsinfo.dtype)

self._file.seek(start * self.sampsize)
nreads, lastread = divmod(nsamps, (gulp - skipback))
if lastread < skipback:
nreads -= 1
lastread = nsamps - (nreads * (gulp - skipback))
blocks = [
(ii, gulp * self.header.nchans, -skipback * self.header.nchans)
for ii in range(nreads)
]
if lastread != 0:
blocks.append((nreads, lastread * self.header.nchans, 0))

# / self.logger.debug(f"Reading plan: nsamps = {nsamps}, nreads = {nreads}")
# / self.logger.debug(f"Reading plan: gulp = {gulp}, lastread = {lastread}, skipback = {skipback}")
for ii, block, skip in track(blocks, description=description, disable=quiet):
nbytes = self._file.creadinto(read_buffer, unpack_buffer)
expected_nbytes = block * self.sampsize / self.header.nchans
if nbytes != expected_nbytes :
raise ValueError(f"Unexpected number of bytes read from file {nbytes} (actual) != {expected_nbytes} (expected)")
if skip != 0:
self._file.seek(
skip * self.bitsinfo.itemsize // self.bitsinfo.bitfact, whence=1
)
yield block // self.header.nchans, ii, data[:block]


class PFITSReader(Filterbank):
"""
Expand Down
12 changes: 11 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ def tmpfile(tmp_path_factory, content=""):
fn.write_text(content)
return fn.as_posix()


@pytest.fixture(scope="session", autouse=True)
def filfile_1bit():
return Path(_datadir / "parkes_1bit.fil").as_posix()
Expand All @@ -37,6 +36,17 @@ def filfile_8bit_1():
def filfile_8bit_2():
return Path(_datadir / "parkes_8bit_2.fil").as_posix()

@pytest.fixture(scope="session", autouse=True)
def filterbank_files():
return [
Path(_datadir / "parkes_1bit.fil").as_posix(),
Path(_datadir / "parkes_2bit.fil").as_posix(),
Path(_datadir / "parkes_4bit.fil").as_posix(),
[Path(_datadir / "parkes_8bit_1.fil").as_posix(),
Path(_datadir / "parkes_8bit_2.fil").as_posix()],
Path(_datadir / "tutorial.fil").as_posix(),
Path(_datadir / "tutorial_2bit.fil").as_posix(),
]

@pytest.fixture(scope="session", autouse=True)
def fitsfile_4bit():
Expand Down
15 changes: 10 additions & 5 deletions tests/test_kernels.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,33 @@ def test_unpack1_8(self):
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0,
0, 0, 0, 0, 0, 1, 1, 1, 0, 0, 0, 1, 0, 1, 1, 1], dtype=np.uint8
)
np.testing.assert_array_equal(expected_bit1, kernels.unpack1_8(input_arr))
unpacked = np.empty_like(expected_bit1)
np.testing.assert_array_equal(expected_bit1, kernels.unpack1_8(input_arr, unpacked))

def test_unpack2_8(self):
input_arr = np.array([0, 2, 7, 23], dtype=np.uint8)
expected_bit2 = np.array(
[0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 1, 3, 0, 1, 1, 3], dtype=np.uint8
)
np.testing.assert_array_equal(expected_bit2, kernels.unpack2_8(input_arr))
unpacked = np.empty_like(expected_bit2)
np.testing.assert_array_equal(expected_bit2, kernels.unpack2_8(input_arr, unpacked))

def test_unpack4_8(self):
input_arr = np.array([0, 2, 7, 23], dtype=np.uint8)
expected_bit4 = np.array([0, 0, 0, 2, 0, 7, 1, 7], dtype=np.uint8)
np.testing.assert_array_equal(expected_bit4, kernels.unpack4_8(input_arr))
unpacked = np.empty_like(expected_bit4)
np.testing.assert_array_equal(expected_bit4, kernels.unpack4_8(input_arr, unpacked))

def test_pack2_8(self):
input_arr = np.arange(255, dtype=np.uint8)
output = kernels.pack2_8(kernels.unpack2_8(input_arr))
unpacked = np.empty(input_arr.size * 4, dtype="ubyte")
output = kernels.pack2_8(kernels.unpack2_8(input_arr, unpacked))
np.testing.assert_array_equal(input_arr, output)

def test_pack4_8(self):
input_arr = np.arange(255, dtype=np.uint8)
output = kernels.pack4_8(kernels.unpack4_8(input_arr))
unpacked = np.empty(input_arr.size * 2, dtype="ubyte")
output = kernels.pack4_8(kernels.unpack4_8(input_arr, unpacked))
np.testing.assert_array_equal(input_arr, output)


Expand Down
Loading

0 comments on commit e962bba

Please sign in to comment.