diff --git a/sigmf/error.py b/sigmf/error.py index 92ac194..9f2564c 100644 --- a/sigmf/error.py +++ b/sigmf/error.py @@ -8,21 +8,17 @@ class SigMFError(Exception): - """ SigMF base exception.""" - pass + """SigMF base exception.""" class SigMFValidationError(SigMFError): """Exceptions related to validating SigMF metadata.""" - pass class SigMFAccessError(SigMFError): """Exceptions related to accessing the contents of SigMF metadata, notably when expected fields are missing or accessing out of bounds captures.""" - pass class SigMFFileError(SigMFError): """Exceptions related to reading or writing SigMF files or archives.""" - pass diff --git a/sigmf/schema.py b/sigmf/schema.py index d576e26..530d3fe 100644 --- a/sigmf/schema.py +++ b/sigmf/schema.py @@ -4,7 +4,7 @@ # # SPDX-License-Identifier: LGPL-3.0-or-later -'''Schema IO''' +"""Schema IO""" import json from pathlib import Path @@ -12,8 +12,8 @@ from . import __version__ as toolversion from . import utils -SCHEMA_META = 'schema-meta.json' -SCHEMA_COLLECTION = 'schema-collection.json' +SCHEMA_META = "schema-meta.json" +SCHEMA_COLLECTION = "schema-collection.json" def get_schema(version=toolversion, schema_file=SCHEMA_META): @@ -23,6 +23,6 @@ def get_schema(version=toolversion, schema_file=SCHEMA_META): TODO: In the future load specific schema versions. ''' schema_dir = Path(__file__).parent - with open(schema_dir / schema_file, 'rb') as handle: + with open(schema_dir / schema_file, "rb") as handle: schema = json.load(handle) return schema diff --git a/sigmf/sigmf_hash.py b/sigmf/sigmf_hash.py index bf31f8d..9482c35 100644 --- a/sigmf/sigmf_hash.py +++ b/sigmf/sigmf_hash.py @@ -4,7 +4,7 @@ # # SPDX-License-Identifier: LGPL-3.0-or-later -'''Hashing Functions''' +"""Hashing Functions""" import hashlib from pathlib import Path diff --git a/sigmf/sigmffile.py b/sigmf/sigmffile.py index d449024..1dd27b8 100644 --- a/sigmf/sigmffile.py +++ b/sigmf/sigmffile.py @@ -333,7 +333,7 @@ def add_capture(self, start_index, metadata=None): # sort captures by start_index self._metadata[self.CAPTURE_KEY] = sorted( capture_list, - key=lambda item: item[self.START_INDEX_KEY] + key=lambda item: item[self.START_INDEX_KEY], ) def get_captures(self): @@ -374,13 +374,17 @@ def get_capture_byte_boundarys(self, index): compliant or noncompliant SigMF Recordings. """ if index >= len(self.get_captures()): - raise SigMFAccessError("Invalid captures index {} (only {} captures in Recording)".format(index, len(self.get_captures()))) + raise SigMFAccessError( + "Invalid captures index {} (only {} captures in Recording)".format(index, len(self.get_captures())) + ) start_byte = 0 prev_start_sample = 0 for ii, capture in enumerate(self.get_captures()): start_byte += capture.get(self.HEADER_BYTES_KEY, 0) - start_byte += (self.get_capture_start(ii) - prev_start_sample) * self.get_sample_size() * self.get_num_channels() + start_byte += ( + (self.get_capture_start(ii) - prev_start_sample) * self.get_sample_size() * self.get_num_channels() + ) prev_start_sample = self.get_capture_start(ii) if ii >= index: break @@ -389,7 +393,11 @@ def get_capture_byte_boundarys(self, index): if index == len(self.get_captures()) - 1: # last captures...data is the rest of the file end_byte = self.data_file.stat().st_size - self.get_global_field(self.TRAILING_BYTES_KEY, 0) else: - end_byte += (self.get_capture_start(index+1) - self.get_capture_start(index)) * self.get_sample_size() * self.get_num_channels() + end_byte += ( + (self.get_capture_start(index + 1) - self.get_capture_start(index)) + * self.get_sample_size() + * self.get_num_channels() + ) return (start_byte, end_byte) def add_annotation(self, start_index, length=None, metadata=None): @@ -408,7 +416,7 @@ def add_annotation(self, start_index, length=None, metadata=None): # sort annotations by start_index self._metadata[self.ANNOTATION_KEY] = sorted( self._metadata[self.ANNOTATION_KEY], - key=lambda item: item[self.START_INDEX_KEY] + key=lambda item: item[self.START_INDEX_KEY], ) def get_annotations(self, index=None): @@ -465,13 +473,18 @@ def _count_samples(self): header_bytes = sum([c.get(self.HEADER_BYTES_KEY, 0) for c in self.get_captures()]) file_size = self.data_file.stat().st_size if self.data_size_bytes is None else self.data_size_bytes file_data_size = file_size - self.get_global_field(self.TRAILING_BYTES_KEY, 0) - header_bytes # bytes - sample_size = self.get_sample_size() # size of a sample in bytes + sample_size = self.get_sample_size() # size of a sample in bytes num_channels = self.get_num_channels() sample_count = file_data_size // sample_size // num_channels if file_data_size % (sample_size * num_channels) != 0: - warnings.warn(f"File `{self.data_file}` does not contain an integer number of samples across channels. It may be invalid data.") + warnings.warn( + f"File `{self.data_file}` does not contain an integer number of samples across channels. " + "It may be invalid data." + ) if self._get_sample_count_from_annotations() > sample_count: - warnings.warn(f"File `{self.data_file}` ends before the final annotation in the corresponding SigMF metadata.") + warnings.warn( + f"File `{self.data_file}` ends before the final annotation in the corresponding SigMF metadata." + ) self.sample_count = sample_count return sample_count @@ -502,9 +515,17 @@ def calculate_hash(self): """ old_hash = self.get_global_field(self.HASH_KEY) if self.data_file is not None: - new_hash = sigmf_hash.calculate_sha512(self.data_file, offset=self.data_offset, size=self.data_size_bytes) + new_hash = sigmf_hash.calculate_sha512( + filename=self.data_file, + offset=self.data_offset, + size=self.data_size_bytes, + ) else: - new_hash = sigmf_hash.calculate_sha512(fileobj=self.data_buffer, offset=self.data_offset, size=self.data_size_bytes) + new_hash = sigmf_hash.calculate_sha512( + fileobj=self.data_buffer, + offset=self.data_offset, + size=self.data_size_bytes, + ) if old_hash is not None: if old_hash != new_hash: raise SigMFFileError("Calculated file hash does not match associated metadata.") @@ -512,7 +533,9 @@ def calculate_hash(self): self.set_global_field(self.HASH_KEY, new_hash) return new_hash - def set_data_file(self, data_file=None, data_buffer=None, skip_checksum=False, offset=0, size_bytes=None, map_readonly=True): + def set_data_file( + self, data_file=None, data_buffer=None, skip_checksum=False, offset=0, size_bytes=None, map_readonly=True + ): """ Set the datafile path, then recalculate sample count. If not skipped, update the hash and return the hash string. @@ -727,7 +750,13 @@ class SigMFCollection(SigMFMetafile): STREAMS_KEY = "core:streams" COLLECTION_KEY = "collection" VALID_COLLECTION_KEYS = [ - AUTHOR_KEY, COLLECTION_DOI_KEY, DESCRIPTION_KEY, EXTENSIONS_KEY, LICENSE_KEY, STREAMS_KEY, VERSION_KEY + AUTHOR_KEY, + COLLECTION_DOI_KEY, + DESCRIPTION_KEY, + EXTENSIONS_KEY, + LICENSE_KEY, + STREAMS_KEY, + VERSION_KEY, ] VALID_KEYS = {COLLECTION_KEY: VALID_COLLECTION_KEYS} diff --git a/tests/test_utils.py b/tests/test_utils.py index e4cb9a5..18c4f0b 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -13,6 +13,7 @@ from sigmf import utils +# fmt: off @pytest.mark.parametrize("time_str, expected", [ ("1955-07-04T05:15:00Z", datetime(year=1955, month=7, day=4, hour=5, minute=15, second=00, microsecond=0, tzinfo=timezone.utc)), ("2956-08-05T06:15:12Z", datetime(year=2956, month=8, day=5, hour=6, minute=15, second=12, microsecond=0, tzinfo=timezone.utc)), @@ -20,8 +21,8 @@ ("4958-10-07T08:15:12.0345Z", datetime(year=4958, month=10, day=7, hour=8, minute=15, second=12, microsecond=34500, tzinfo=timezone.utc)), ("5959-11-08T09:15:12.000000Z", datetime(year=5959, month=11, day=8, hour=9, minute=15, second=12, microsecond=0, tzinfo=timezone.utc)), ("6960-12-09T10:15:12.123456789123Z", datetime(year=6960, month=12, day=9, hour=10, minute=15, second=12, microsecond=123456, tzinfo=timezone.utc)), - ]) +# fmt: on def test_parse_simple_iso8601(time_str: str, expected: datetime) -> None: """Ensure various times are represented as expected""" date_struct = utils.parse_iso8601_datetime(time_str)