Skip to content

Commit

Permalink
Bitshuffle2 (HDFGroup#291)
Browse files Browse the repository at this point in the history
* bitshuffle with lz4

* store bitshuffle header

* added more test cases

* fixed fstring
  • Loading branch information
jreadey authored Dec 1, 2023
1 parent 6caf74e commit ae75bfa
Show file tree
Hide file tree
Showing 7 changed files with 321 additions and 112 deletions.
1 change: 1 addition & 0 deletions admin/config/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ dn_retry_backoff_exp: 0.1 # backoff factor for retries
xss_protection: "1; mode=block" # Include in response headers if set
allow_any_bucket_read: true # enable reads to buckets other than default bucket
allow_any_bucket_write: true # enable writes to buckets other than default bucket
bit_shuffle_default_blocksize: 2048 # default blocksize for bitshuffle filter
# DEPRECATED - the remaining config values are not used in currently but kept for backward compatibility with older container images
aws_lambda_chunkread_function: null # name of aws lambda function for chunk reading
aws_lambda_threshold: 4 # number of chunks per node per request to reach before using lambda
Expand Down
27 changes: 17 additions & 10 deletions hsds/datanode_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
from .util.attrUtil import getRequestCollectionName
from .util.httpUtil import http_post
from .util.dsetUtil import getChunkLayout, getFilterOps, getShapeDims
from .util.dsetUtil import getChunkInitializer, getSliceQueryParam
from .util.dsetUtil import getChunkInitializer, getSliceQueryParam, getFilters
from .util.chunkUtil import getDatasetId, getChunkSelection, getChunkIndex
from .util.arrayUtil import arrayToBytes, bytesToArray, jsonToArray
from .util.hdf5dtype import createDataType, getItemSize
from .util.hdf5dtype import createDataType
from .util.rangegetUtil import ChunkLocation, chunkMunge

from . import config
Expand Down Expand Up @@ -810,13 +810,15 @@ async def get_chunk_bytes(
bucket=None,
offset=0,
length=0,
item_size=0,
dtype=None,
chunk_id=None,
chunk_dims=None,
hyper_dims=None
):
""" For regular chunk reads, just call getStorBytes.
"""
item_size = dtype.itemsize

msg = f"get_chunk_bytes({chunk_id}, bucket={bucket}, offset={offset}, length={length}, "
msg += f"item_size={item_size}, chunk_dims={chunk_dims}, hyper_dims={hyper_dims}"
log.debug(msg)
Expand Down Expand Up @@ -978,20 +980,21 @@ async def get_chunk(
chunk_arr = None
dims = getChunkLayout(dset_json)
type_json = dset_json["type"]
item_size = getItemSize(type_json)
dt = createDataType(type_json)
layout_json = dset_json["layout"]
log.debug(f"dset_json: {dset_json}")
layout_class = layout_json.get("class")
chunk_dims = getChunkLayout(dset_json)

dt = createDataType(type_json)
# note - officially we should follow the order in which the filters are
# defined in the filter_list,
# but since we currently have just deflate and shuffle we will always
# apply deflate then shuffle on read, and shuffle then deflate on write
# also note - get deflate and shuffle will update the deflate and
# shuffle map so that the s3sync will do the right thing
filter_ops = getFilterOps(app, dset_json, item_size)

filters = getFilters(dset_json)
dset_id = dset_json["id"]
filter_ops = getFilterOps(app, dset_id, filters, dtype=dt, chunk_shape=chunk_dims)
log.debug(f"filter_ops: {filter_ops}")

if s3path:
Expand Down Expand Up @@ -1049,7 +1052,7 @@ async def get_chunk(
"filter_ops": filter_ops,
"offset": s3offset,
"length": s3size,
"item_size": item_size,
"dtype": dt,
"chunk_dims": chunk_dims,
"hyper_dims": hyper_dims,
"bucket": bucket,
Expand Down Expand Up @@ -1144,9 +1147,13 @@ def save_chunk(app, chunk_id, dset_json, chunk_arr, bucket=None):
log.error(f"Chunk {chunk_id} not in partition")
raise HTTPInternalServerError()

item_size = getItemSize(dset_json["type"])
dset_id = dset_json["id"]
dtype = createDataType(dset_json["type"])
chunk_shape = getChunkLayout(dset_json)

# will store filter options into app['filter_map']
getFilterOps(app, dset_json, item_size)
filters = getFilters(dset_json)
getFilterOps(app, dset_id, filters, dtype=dtype, chunk_shape=chunk_shape)

chunk_cache = app["chunk_cache"]
if chunk_id not in chunk_cache:
Expand Down
39 changes: 29 additions & 10 deletions hsds/util/dsetUtil.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,24 @@
)


# copied from arrayUtil.py
def isVlen(dt):
"""
Return True if the type contains variable length elements
"""
is_vlen = False
if len(dt) > 1:
names = dt.names
for name in names:
if isVlen(dt[name]):
is_vlen = True
break
else:
if dt.metadata and "vlen" in dt.metadata:
is_vlen = True
return is_vlen


def getFilterItem(key):
"""
Return filter code, id, and name, based on an id, a name or a code.
Expand All @@ -100,9 +118,8 @@ def getFilters(dset_json):
return filters


def getCompressionFilter(dset_json):
def getCompressionFilter(filters):
"""Return compression filter from filters, or None"""
filters = getFilters(dset_json)
for filter in filters:
if "class" not in filter:
msg = f"filter option: {filter} with no class key"
Expand All @@ -122,9 +139,8 @@ def getCompressionFilter(dset_json):
return None


def getShuffleFilter(dset_json):
def getShuffleFilter(filters):
"""Return shuffle filter, or None"""
filters = getFilters(dset_json)
FILTER_CLASSES = ("H5Z_FILTER_SHUFFLE", "H5Z_FILTER_BITSHUFFLE")
for filter in filters:
log.debug(f"filter: {filter}")
Expand All @@ -142,21 +158,22 @@ def getShuffleFilter(dset_json):
return None


def getFilterOps(app, dset_json, item_size):
def getFilterOps(app, dset_id, filters, dtype=None, chunk_shape=None):
"""Get list of filter operations to be used for this dataset"""
filter_map = app["filter_map"]
dset_id = dset_json["id"]

if dset_id in filter_map:
log.debug(f"returning filter from filter_map {filter_map[dset_id]}")
return filter_map[dset_id]

compressionFilter = getCompressionFilter(dset_json)
compressionFilter = getCompressionFilter(filters)
log.debug(f"got compressionFilter: {compressionFilter}")

filter_ops = {}

shuffleFilter = getShuffleFilter(dset_json)
if shuffleFilter and item_size != "H5T_VARIABLE":
shuffleFilter = getShuffleFilter(filters)

if shuffleFilter and not isVlen(dtype):
shuffle_name = shuffleFilter["name"]
if shuffle_name == "shuffle":
filter_ops["shuffle"] = 1 # use regular shuffle
Expand All @@ -182,7 +199,9 @@ def getFilterOps(app, dset_json, item_size):
filter_ops["level"] = int(compressionFilter["level"])

if filter_ops:
filter_ops["item_size"] = item_size
# save the chunk shape and dtype
filter_ops["chunk_shape"] = chunk_shape
filter_ops["dtype"] = dtype
log.debug(f"save filter ops: {filter_ops} for {dset_id}")
filter_map[dset_id] = filter_ops # save

Expand Down
Loading

0 comments on commit ae75bfa

Please sign in to comment.