diff --git a/admin/config/config.yml b/admin/config/config.yml index 4b1c6231..d690ba7e 100755 --- a/admin/config/config.yml +++ b/admin/config/config.yml @@ -83,6 +83,7 @@ dn_retry_backoff_exp: 0.1 # backoff factor for retries xss_protection: "1; mode=block" # Include in response headers if set allow_any_bucket_read: true # enable reads to buckets other than default bucket allow_any_bucket_write: true # enable writes to buckets other than default bucket +bit_shuffle_default_blocksize: 2048 # default blocksize for bitshuffle filter # DEPRECATED - the remaining config values are not used in currently but kept for backward compatibility with older container images aws_lambda_chunkread_function: null # name of aws lambda function for chunk reading aws_lambda_threshold: 4 # number of chunks per node per request to reach before using lambda diff --git a/hsds/datanode_lib.py b/hsds/datanode_lib.py index bbaef431..108eff07 100644 --- a/hsds/datanode_lib.py +++ b/hsds/datanode_lib.py @@ -30,10 +30,10 @@ from .util.attrUtil import getRequestCollectionName from .util.httpUtil import http_post from .util.dsetUtil import getChunkLayout, getFilterOps, getShapeDims -from .util.dsetUtil import getChunkInitializer, getSliceQueryParam +from .util.dsetUtil import getChunkInitializer, getSliceQueryParam, getFilters from .util.chunkUtil import getDatasetId, getChunkSelection, getChunkIndex from .util.arrayUtil import arrayToBytes, bytesToArray, jsonToArray -from .util.hdf5dtype import createDataType, getItemSize +from .util.hdf5dtype import createDataType from .util.rangegetUtil import ChunkLocation, chunkMunge from . import config @@ -810,13 +810,15 @@ async def get_chunk_bytes( bucket=None, offset=0, length=0, - item_size=0, + dtype=None, chunk_id=None, chunk_dims=None, hyper_dims=None ): """ For regular chunk reads, just call getStorBytes. """ + item_size = dtype.itemsize + msg = f"get_chunk_bytes({chunk_id}, bucket={bucket}, offset={offset}, length={length}, " msg += f"item_size={item_size}, chunk_dims={chunk_dims}, hyper_dims={hyper_dims}" log.debug(msg) @@ -978,20 +980,21 @@ async def get_chunk( chunk_arr = None dims = getChunkLayout(dset_json) type_json = dset_json["type"] - item_size = getItemSize(type_json) + dt = createDataType(type_json) layout_json = dset_json["layout"] - log.debug(f"dset_json: {dset_json}") layout_class = layout_json.get("class") chunk_dims = getChunkLayout(dset_json) - dt = createDataType(type_json) # note - officially we should follow the order in which the filters are # defined in the filter_list, # but since we currently have just deflate and shuffle we will always # apply deflate then shuffle on read, and shuffle then deflate on write # also note - get deflate and shuffle will update the deflate and # shuffle map so that the s3sync will do the right thing - filter_ops = getFilterOps(app, dset_json, item_size) + + filters = getFilters(dset_json) + dset_id = dset_json["id"] + filter_ops = getFilterOps(app, dset_id, filters, dtype=dt, chunk_shape=chunk_dims) log.debug(f"filter_ops: {filter_ops}") if s3path: @@ -1049,7 +1052,7 @@ async def get_chunk( "filter_ops": filter_ops, "offset": s3offset, "length": s3size, - "item_size": item_size, + "dtype": dt, "chunk_dims": chunk_dims, "hyper_dims": hyper_dims, "bucket": bucket, @@ -1144,9 +1147,13 @@ def save_chunk(app, chunk_id, dset_json, chunk_arr, bucket=None): log.error(f"Chunk {chunk_id} not in partition") raise HTTPInternalServerError() - item_size = getItemSize(dset_json["type"]) + dset_id = dset_json["id"] + dtype = createDataType(dset_json["type"]) + chunk_shape = getChunkLayout(dset_json) + # will store filter options into app['filter_map'] - getFilterOps(app, dset_json, item_size) + filters = getFilters(dset_json) + getFilterOps(app, dset_id, filters, dtype=dtype, chunk_shape=chunk_shape) chunk_cache = app["chunk_cache"] if chunk_id not in chunk_cache: diff --git a/hsds/util/dsetUtil.py b/hsds/util/dsetUtil.py index dd9a8500..f610a5d4 100644 --- a/hsds/util/dsetUtil.py +++ b/hsds/util/dsetUtil.py @@ -75,6 +75,24 @@ ) +# copied from arrayUtil.py +def isVlen(dt): + """ + Return True if the type contains variable length elements + """ + is_vlen = False + if len(dt) > 1: + names = dt.names + for name in names: + if isVlen(dt[name]): + is_vlen = True + break + else: + if dt.metadata and "vlen" in dt.metadata: + is_vlen = True + return is_vlen + + def getFilterItem(key): """ Return filter code, id, and name, based on an id, a name or a code. @@ -100,9 +118,8 @@ def getFilters(dset_json): return filters -def getCompressionFilter(dset_json): +def getCompressionFilter(filters): """Return compression filter from filters, or None""" - filters = getFilters(dset_json) for filter in filters: if "class" not in filter: msg = f"filter option: {filter} with no class key" @@ -122,9 +139,8 @@ def getCompressionFilter(dset_json): return None -def getShuffleFilter(dset_json): +def getShuffleFilter(filters): """Return shuffle filter, or None""" - filters = getFilters(dset_json) FILTER_CLASSES = ("H5Z_FILTER_SHUFFLE", "H5Z_FILTER_BITSHUFFLE") for filter in filters: log.debug(f"filter: {filter}") @@ -142,21 +158,22 @@ def getShuffleFilter(dset_json): return None -def getFilterOps(app, dset_json, item_size): +def getFilterOps(app, dset_id, filters, dtype=None, chunk_shape=None): """Get list of filter operations to be used for this dataset""" filter_map = app["filter_map"] - dset_id = dset_json["id"] + if dset_id in filter_map: log.debug(f"returning filter from filter_map {filter_map[dset_id]}") return filter_map[dset_id] - compressionFilter = getCompressionFilter(dset_json) + compressionFilter = getCompressionFilter(filters) log.debug(f"got compressionFilter: {compressionFilter}") filter_ops = {} - shuffleFilter = getShuffleFilter(dset_json) - if shuffleFilter and item_size != "H5T_VARIABLE": + shuffleFilter = getShuffleFilter(filters) + + if shuffleFilter and not isVlen(dtype): shuffle_name = shuffleFilter["name"] if shuffle_name == "shuffle": filter_ops["shuffle"] = 1 # use regular shuffle @@ -182,7 +199,9 @@ def getFilterOps(app, dset_json, item_size): filter_ops["level"] = int(compressionFilter["level"]) if filter_ops: - filter_ops["item_size"] = item_size + # save the chunk shape and dtype + filter_ops["chunk_shape"] = chunk_shape + filter_ops["dtype"] = dtype log.debug(f"save filter ops: {filter_ops} for {dset_id}") filter_map[dset_id] = filter_ops # save diff --git a/hsds/util/storUtil.py b/hsds/util/storUtil.py index 9226c6a0..edbdc9fa 100644 --- a/hsds/util/storUtil.py +++ b/hsds/util/storUtil.py @@ -14,12 +14,12 @@ # storage access functions. # Abstracts S3 API vs Azure vs Posix storage access # -import time import json +import time import zlib import numpy as np import numcodecs as codecs -from bitshuffle import bitshuffle, bitunshuffle +import bitshuffle from aiohttp.web_exceptions import HTTPInternalServerError from .. import hsds_logger as log @@ -42,7 +42,6 @@ def FileClient(app): log.error("ImportError for FileClient") return None - from .. import config BYTE_SHUFFLE = 1 @@ -88,45 +87,101 @@ def setBloscThreads(nthreads): def getBloscThreads(): """Get the number of blosc threads to be used for compression""" nthreads = codecs.blosc.get_nthreads() - return nthreads -def _shuffle(codec, data, item_size=4): +def _shuffle(codec, data, chunk_shape=None, dtype=None): + item_size = dtype.itemsize + chunk_size = int(np.prod(chunk_shape)) * item_size + block_size = None if codec == 1: # byte shuffle, use numcodecs Shuffle shuffler = codecs.Shuffle(item_size) arr = shuffler.encode(data) + return arr.tobytes() elif codec == 2: # bit shuffle, use bitshuffle package - if isinstance(data, bytes): - # bitshufle is expecting numpy array - data = np.frombuffer(data, dtype=np.dtype("uint8")) - arr = bitshuffle(data) + # bitshufle is expecting numpy array + # todo - enable block size to be set as part of the filter options + block_size = config.get("bit_shuffle_default_blocksize", default=2048) + + data = np.frombuffer(data, dtype=dtype) + data = data.reshape(chunk_shape) + log.debug(f"bitshuffle.compress_lz4 - chunk_size: {chunk_size} block_size: {block_size}") + arr = bitshuffle.compress_lz4(data, block_size) + else: log.error(f"Unexpected codec: {codec} for _shuffle") raise ValueError() - return arr.tobytes() + arr_bytes = arr.tobytes() + if block_size: + # prepend a 12 byte header with: + # uint64 value of chunk_size + # uint32 value of block_size + + # unfortunate we need to do a data copy here + # don't see a way to preappend to the bytes we + # get from numpy + buffer = bytearray(len(arr_bytes) + 12) + buffer[0:8] = int(chunk_size).to_bytes(8, "big") + buffer[8:12] = int(block_size * item_size).to_bytes(4, "big") + buffer[12:] = arr_bytes + arr_bytes = bytes(buffer) + + return arr_bytes + + +def _unshuffle(codec, data, dtype=None, chunk_shape=None): + item_size = dtype.itemsize + chunk_size = int(np.prod(chunk_shape)) * item_size -def _unshuffle(codec, data, item_size=4): if codec == 1: # byte shuffle, use numcodecs Shuffle shuffler = codecs.Shuffle(item_size) arr = shuffler.decode(data) elif codec == 2: # bit shuffle, use bitshuffle - if isinstance(data, bytes): - # bitshufle is expecting numpy array - data = np.frombuffer(data, dtype=np.dtype("uint8")) - arr = bitunshuffle(data) + # bitshufle is expecting numpy array + data = np.frombuffer(data, dtype=np.dtype("uint8")) + if len(data) < 12: + # there should be at least 12 bytes for the header + msg = f"got {len(data)} bytes for bitshuffle, " + msg += f"expected {12 + len(chunk_size)} bytes" + raise HTTPInternalServerError() + + # use lz4 uncompress with bitshuffle + total_nbytes = int.from_bytes(data[:8], "big") + block_nbytes = int.from_bytes(data[8:12], "big") + if total_nbytes != chunk_size: + msg = f"header reports total_bytes to be {total_nbytes} bytes," + msg += f"expected {chunk_size} bytes" + log.error(msg) + raise HTTPInternalServerError() + + # header has block size, so use that + block_size = block_nbytes // dtype.itemsize + msg = f"got bitshuffle header - total_nbytes: {total_nbytes}, " + msg += f"block_nbytes: {block_nbytes}, block_size: {block_size}" + log.debug(msg) + data = data[12:] + + try: + arr = bitshuffle.decompress_lz4(data, chunk_shape, dtype, block_size) + except Exception as e: + log.error(f"except using bitshuffle.decompress_lz4: {e}") + raise HTTPInternalServerError() return arr.tobytes() -def _uncompress(data, compressor=None, shuffle=0, item_size=4): +def _uncompress(data, compressor=None, shuffle=0, level=None, dtype=None, chunk_shape=None): """ Uncompress the provided data using compessor and/or shuffle """ - log.debug(f"_uncompress(compressor={compressor}, shuffle={shuffle})") + msg = f"_uncompress(compressor={compressor}, shuffle={shuffle})" + if level is not None: + msg += f", level: {level}" + log.debug(msg) + start_time = time.time() if compressor: if compressor in ("gzip", "deflate"): # blosc referes to this as zlib @@ -164,42 +219,56 @@ def _uncompress(data, compressor=None, shuffle=0, item_size=4): log.error(msg) raise HTTPInternalServerError() if shuffle: - start_time = time.time() - data = _unshuffle(shuffle, data, item_size=item_size) - finish_time = time.time() - elapsed = finish_time - start_time - msg = f"unshuffled {len(data)} bytes, {(elapsed):.2f} elapsed" - log.debug(msg) + data = _unshuffle(shuffle, data, dtype=dtype, chunk_shape=chunk_shape) + finish_time = time.time() + elapsed = finish_time - start_time + msg = f"uncompressed {len(data)} bytes, {(elapsed):.3f}s elapsed" + log.debug(msg) return data -def _compress(data, compressor=None, clevel=5, shuffle=0, item_size=4): - log.debug(f"_uncompress(compressor={compressor}, shuffle={shuffle})") +def _compress(data, compressor=None, level=5, shuffle=0, dtype=None, chunk_shape=None): + if not compressor and shuffle != 2: + # nothing to do + return data + log.debug(f"_compress(compressor={compressor}, shuffle={shuffle})") + start_time = time.time() + data_size = len(data) if shuffle == 2: # bit shuffle the data before applying the compressor log.debug("bitshuffling data") - data = _shuffle(shuffle, data, item_size=item_size) + try: + data = _shuffle(shuffle, data, dtype=dtype, chunk_shape=chunk_shape) + except Exception as e: + log.error(f"got exception using bitshuffle: {e}") shuffle = 0 # don't do any blosc shuffling if compressor: if compressor in ("gzip", "deflate"): # blosc referes to this as zlib compressor = "zlib" - cdata = None # try with blosc compressor try: - blosc = codecs.Blosc(cname=compressor, clevel=clevel, shuffle=shuffle) + blosc = codecs.Blosc(cname=compressor, clevel=level, shuffle=shuffle) cdata = blosc.encode(data) msg = f"compressed from {len(data)} bytes to {len(cdata)} bytes " msg += f"using filter: {blosc.cname} with level: {blosc.clevel}" log.info(msg) except Exception as e: log.error(f"got exception using blosc encoding: {e}") - raise HTTPInternalServerError() + else: + # no compressor, just pass back the shuffled data + cdata = data - if cdata is not None: - data = cdata # used compress data + if cdata is not None: + finish_time = time.time() + elapsed = finish_time - start_time + ratio = data_size * 100.0 / len(cdata) + msg = f"compressed {data_size} bytes to {len(cdata)} bytes, " + msg += f"ratio: {ratio:.2f}%, {(elapsed):.3f}s elapsed" + log.debug(msg) + data = cdata # use compressed data return data @@ -348,31 +417,7 @@ async def getStorBytes(app, msg = f"getStorBytes({bucket}/{key}, offset={offset}, length: {length})" log.info(msg) - shuffle = 0 - item_size = 4 - compressor = None - if filter_ops: - log.debug(f"getStorBytes for {key} with filter_ops: {filter_ops}") - if "shuffle" in filter_ops: - shuffle = filter_ops["shuffle"] - if shuffle == 1: - log.debug("using shuffle filter") - elif shuffle == 2: - log.debug("using bitshuffle filter") - else: - log.debug("no shuffle filter") - else: - log.debug("shuffle filter not set in filter_ops") - - if "compressor" in filter_ops: - compressor = filter_ops["compressor"] - log.debug(f"using compressor: {compressor}") - else: - log.debug("compressor not set in filter ops") - item_size = filter_ops["item_size"] - kwargs = {"bucket": bucket, "key": key, "offset": offset, "length": length} - data = await client.get_object(**kwargs) if data is None or len(data) == 0: log.info(f"no data found for {key}") @@ -394,6 +439,7 @@ async def getStorBytes(app, raise HTTPInternalServerError() if len(chunk_locations) * h5_size < len(chunk_bytes): log.error(f"getStorBytes - invalid chunk_bytes length: {len(chunk_bytes)}") + for chunk_location in chunk_locations: log.debug(f"getStoreBytes - processing chunk_location: {chunk_location}") n = chunk_location.offset - offset @@ -403,8 +449,10 @@ async def getStorBytes(app, m = n + chunk_location.length log.debug(f"getStorBytes - extracting chunk from data[{n}:{m}]") h5_bytes = data[n:m] - kwargs = {"compressor": compressor, "shuffle": shuffle, "item_size": item_size} - h5_bytes = _uncompress(h5_bytes, **kwargs) + + if filter_ops: + h5_bytes = _uncompress(h5_bytes, **filter_ops) + if len(h5_bytes) != h5_size: msg = f"expected chunk index: {chunk_location.index} to have size: " msg += f"{h5_size} but got: {len(h5_bytes)}" @@ -420,8 +468,11 @@ async def getStorBytes(app, chunk_bytes[hs_offset:(hs_offset + h5_size)] = h5_bytes # chunk_bytes got updated, so just return None return None + elif filter_ops: + # uncompress and return + data = _uncompress(data, **filter_ops) + return data else: - data = _uncompress(data, compressor=compressor, shuffle=shuffle, item_size=item_size) return data @@ -433,23 +484,11 @@ async def putStorBytes(app, key, data, filter_ops=None, bucket=None): bucket = app["bucket_name"] if key[0] == "/": key = key[1:] # no leading slash - shuffle = 0 - clevel = 5 - cname = None # compressor name - item_size = 4 - if filter_ops: - if "compressor" in filter_ops: - cname = filter_ops["compressor"] - if "shuffle" in filter_ops: - shuffle = filter_ops["shuffle"] - if "level" in filter_ops: - clevel = filter_ops["level"] - item_size = filter_ops["item_size"] - msg = f"putStorBytes({bucket}/{key}), {len(data)} bytes shuffle: {shuffle}" - msg += f" compressor: {cname} level: {clevel}, item_size: {item_size}" - log.info(msg) - data = _compress(data, compressor=cname, clevel=clevel, shuffle=shuffle, item_size=item_size) + log.info(f"putStorBytes({bucket}/{key}), {len(data)}") + + if filter_ops: + data = _compress(data, **filter_ops) rsp = await client.put_object(key, data, bucket=bucket) diff --git a/tests/integ/filter_test.py b/tests/integ/filter_test.py index 3e39ccf0..76c7c057 100755 --- a/tests/integ/filter_test.py +++ b/tests/integ/filter_test.py @@ -11,6 +11,7 @@ ############################################################################## import unittest import json +import hashlib import numpy as np import helper import config @@ -217,6 +218,61 @@ def testShuffleAndDeflate(self): self.assertEqual(len(row), 1) self.assertEqual(row[0], 22) + def testBitShuffle(self): + # test Dataset with creation property list + print("testBitShuffle", self.base_domain) + headers = helper.getRequestHeaders(domain=self.base_domain) + # get domain + req = helper.getEndpoint() + "/" + rsp = self.session.get(req, headers=headers) + rspJson = json.loads(rsp.text) + self.assertTrue("root" in rspJson) + root_uuid = rspJson["root"] + + # create the dataset + req = self.endpoint + "/datasets" + + # Create ~1MB dataset + payload = {"type": "H5T_STD_I32LE", "shape": [1024, 1024]} + + # bit shuffle + bitshuffle_filter = {"class": "H5Z_FILTER_BITSHUFFLE", "id": 32008, "name": "bitshuffle"} + payload["creationProperties"] = {"filters": [bitshuffle_filter, ]} + req = self.endpoint + "/datasets" + rsp = self.session.post(req, data=json.dumps(payload), headers=headers) + self.assertEqual(rsp.status_code, 201) # create dataset + rspJson = json.loads(rsp.text) + dset_uuid = rspJson["id"] + self.assertTrue(helper.validateId(dset_uuid)) + + # link new dataset as 'dset' + name = "dset" + req = self.endpoint + "/groups/" + root_uuid + "/links/" + name + payload = {"id": dset_uuid} + rsp = self.session.put(req, data=json.dumps(payload), headers=headers) + self.assertEqual(rsp.status_code, 201) + + # write a horizontal strip of 22s + req = self.endpoint + "/datasets/" + dset_uuid + "/value" + data = [22] * 1024 + payload = {"start": [512, 0], "stop": [513, 1024], "value": data} + rsp = self.session.put(req, data=json.dumps(payload), headers=headers) + self.assertEqual(rsp.status_code, 200) + + # read back the 512,512 element + req = self.endpoint + "/datasets/" + dset_uuid + "/value" # test + params = {"select": "[512:513,512:513]"} # read 1 element + rsp = self.session.get(req, params=params, headers=headers) + self.assertEqual(rsp.status_code, 200) + rspJson = json.loads(rsp.text) + self.assertTrue("hrefs" in rspJson) + self.assertTrue("value" in rspJson) + value = rspJson["value"] + self.assertEqual(len(value), 1) + row = value[0] + self.assertEqual(len(row), 1) + self.assertEqual(row[0], 22) + def testBitShuffleAndDeflate(self): # test Dataset with creation property list print("testBitShuffleAndDeflate", self.base_domain) @@ -386,6 +442,87 @@ def testDeshuffling(self): f'Different values for "{name}" dataset', ) + def testBitDeshuffling(self): + """Test the bitshuffle filter implementation used with a known data file.""" + print("testBitDeshuffling", self.base_domain) + headers = helper.getRequestHeaders(domain=self.base_domain) + + # offset and size of the file's one and only chunk + CHUNK_OFFSET = 6864 + CHUNK_SIZE = 3432249 + CHUNK_SHAPE = (1, 2167, 2070) + CHUNK_HASH = "5595cc6303dde20228fe9a0dc23c2a75" + # Sample file URI... + hdf5_sample_bucket = config.get("hdf5_sample_bucket") + furi = f"{hdf5_sample_bucket}/data/hdf5test/bitshuffle.h5" + + hdf5_sample_bucket = config.get("hdf5_sample_bucket") + if not hdf5_sample_bucket: + print("hdf5_sample_bucket config not set, skipping testShuffleFilter") + return + + # Get domain + req = helper.getEndpoint() + "/" + rsp = self.session.get(req, headers=headers) + rspJson = json.loads(rsp.text) + self.assertTrue("root" in rspJson, '"root" JSON key missing') + root_uuid = rspJson["root"] + + # Create the HSDS dataset that points to the test shuffled data... + payload = { + "type": {"base": "H5T_STD_U32LE", "class": "H5T_INTEGER"}, + "shape": CHUNK_SHAPE, + "creationProperties": + { + "filters": + [ + {"class": "H5Z_FILTER_BITSHUFFLE", "id": 32008, "name": "bitshuffle"} + ], + "layout": + { + "class": "H5D_CHUNKED_REF", + "file_uri": furi, + "dims": CHUNK_SHAPE, + "chunks": {"0_0_0": (CHUNK_OFFSET, CHUNK_SIZE)} + }, + }, + } + req = self.endpoint + "/datasets" + rsp = self.session.post(req, data=json.dumps(payload), headers=headers) + self.assertEqual(rsp.status_code, 201, rsp.text) + rspJson = json.loads(rsp.text) + dset_uuid = rspJson["id"] + self.assertTrue(helper.validateId(dset_uuid)) + req = self.endpoint + "/groups/" + root_uuid + "/links/" + "dset" + payload = {"id": dset_uuid} + rsp = self.session.put(req, data=json.dumps(payload), headers=headers) + self.assertEqual(rsp.status_code, 201, rsp.text) + + # Read dataset's test values... + req = self.endpoint + "/datasets/" + dset_uuid + "/value" + + headers_bin_rsp = helper.getRequestHeaders(domain=self.base_domain) + headers_bin_rsp["accept"] = "application/octet-stream" + + rsp = self.session.get(req, headers=headers_bin_rsp) + if rsp.status_code == 404: + print(f"File object: {furi} not found, skipping " "shuffle filter test") + return + self.assertEqual(rsp.status_code, 200) + self.assertEqual(rsp.headers["Content-Type"], "application/octet-stream") + data = rsp.content + expected_bytes = np.prod(CHUNK_SHAPE) * 4 + self.assertEqual(len(data), expected_bytes) + + arr = np.frombuffer(data, dtype=np.uint32) + arr = arr.reshape(CHUNK_SHAPE) + # if it's all zeros, it's likely the rangeget request failed + self.assertTrue(arr.max() > 0) + # compare the hash of the values we got with expected hash + hash_object = hashlib.md5(data) + md5_hash = hash_object.hexdigest() + self.assertEqual(md5_hash, CHUNK_HASH) + if __name__ == "__main__": # setup test files diff --git a/tests/unit/compression_test.py b/tests/unit/compression_test.py index 06fd57cf..7a7cc932 100755 --- a/tests/unit/compression_test.py +++ b/tests/unit/compression_test.py @@ -24,7 +24,7 @@ def __init__(self, *args, **kwargs): # main def testCompression(self): - shape = 1_000_000 + shape = (1_000_000, ) dt = np.dtype("