diff --git a/hsds/basenode.py b/hsds/basenode.py index c1802d5b..4ae3dfed 100644 --- a/hsds/basenode.py +++ b/hsds/basenode.py @@ -33,7 +33,7 @@ from .util.k8sClient import getDnLabelSelector, getPodIps from . import hsds_logger as log -HSDS_VERSION = "0.8.4" +HSDS_VERSION = "0.8.5" def getVersion(): diff --git a/hsds/chunk_crawl.py b/hsds/chunk_crawl.py index a66ce720..b131ca9b 100755 --- a/hsds/chunk_crawl.py +++ b/hsds/chunk_crawl.py @@ -84,7 +84,7 @@ async def write_chunk_hyperslab( if not bucket: bucket = config.get("bucket_name") - msg = f"write_chunk_hyperslab, chunk_id:{chunk_id}, slices:{slices}, " + msg = f"write_chunk_hyperslab, chunk_id: {chunk_id}, slices: {slices}, " msg += f"bucket: {bucket}" log.info(msg) if "layout" not in dset_json: @@ -95,11 +95,20 @@ async def write_chunk_hyperslab( log.debug(f"using partition_chunk_id: {partition_chunk_id}") chunk_id = partition_chunk_id # replace the chunk_id + params = {} + if "type" not in dset_json: log.error(f"No type found in dset_json: {dset_json}") raise HTTPInternalServerError() + type_json = dset_json["type"] + dset_dtype = createDataType(type_json) + + if len(arr.dtype) < len(dset_dtype): + # field selection, pass in the field names + fields_param = ":".join(arr.dtype.names) + log.debug(f"setting fields_param to: {fields_param}") + params["fields"] = fields_param - params = {} layout = getChunkLayout(dset_json) log.debug(f"getChunkCoverage({chunk_id}, {slices}, {layout})") chunk_sel = getChunkCoverage(chunk_id, slices, layout) @@ -154,6 +163,7 @@ async def read_chunk_hyperslab( chunk_id, dset_json, np_arr, + select_dtype=None, query=None, query_update=None, limit=0, @@ -178,6 +188,10 @@ async def read_chunk_hyperslab( log.error("expected chunk_map to be set") return + if np_arr is None and select_dtype is None: + log.error("expected np_arr to be set") + return + msg = f"read_chunk_hyperslab, chunk_id: {chunk_id}," msg += f" bucket: {bucket}" if query is not None: @@ -187,7 +201,18 @@ async def read_chunk_hyperslab( log.warn(f"expected to find {chunk_id} in chunk_map") return chunk_info = chunk_map[chunk_id] - log.debug(f"using chunk_map entry for {chunk_id}: {chunk_info}") + log.debug(f"using chunk_map entry for {chunk_id}") + if "points" in chunk_info: + points = chunk_info["points"] + log.debug(f"chunkinfo {len(points)} points") + elif "chunk_sel" in chunk_info: + chunk_sel = chunk_info["chunk_sel"] + log.debug(f"chunkinfo - chunk_sel: {chunk_sel}") + elif "data_sel" in chunk_info: + data_sel = chunk_info["data_sel"] + log.debug(f"chunkinfo - data_sel: {data_sel}") + else: + log.warn(f"unexpected chunkinfo: {chunk_info}") partition_chunk_id = getChunkIdForPartition(chunk_id, dset_json) if partition_chunk_id != chunk_id: @@ -197,6 +222,8 @@ async def read_chunk_hyperslab( if "type" not in dset_json: log.error(f"No type found in dset_json: {dset_json}") raise HTTPInternalServerError() + type_json = dset_json["type"] + dset_dt = createDataType(type_json) chunk_shape = None # expected return array shape chunk_sel = None # for hyperslab @@ -224,17 +251,16 @@ async def read_chunk_hyperslab( raise HTTPInternalServerError() point_index = chunk_info["indices"] method = "POST" - chunk_shape = [ - len(point_list), - ] + chunk_shape = [len(point_list), ] log.debug(f"point selection - chunk_shape: {chunk_shape}") - type_json = dset_json["type"] - dt = createDataType(type_json) + if select_dtype is None and np_arr is not None: + select_dtype = np_arr.dtype + if query is None and query_update is None: query_dtype = None else: - query_dtype = getQueryDtype(dt) + query_dtype = getQueryDtype(select_dtype) chunk_arr = None array_data = None @@ -265,6 +291,13 @@ async def read_chunk_hyperslab( # convert to colon seperated string hyper_dims = ":".join(map(str, hyper_dims)) params["hyper_dims"] = hyper_dims + if len(select_dtype) < len(dset_dt): + # field selection, pass in the field names + fields_param = ":".join(select_dtype.names) + log.debug(f"setting fields param to: {fields_param}") + params["fields"] = fields_param + else: + log.debug("no fields param") # set query-based params if query is not None: @@ -323,14 +356,10 @@ async def read_chunk_hyperslab( array_data = await http_get(app, req, params=params, client=client) log.debug(f"http_get {req}, returned {len(array_data)} bytes") elif method == "PUT": - array_data = await http_put( - app, req, data=body, params=params, client=client - ) + array_data = await http_put(app, req, data=body, params=params, client=client) log.debug(f"http_put {req}, returned {len(array_data)} bytes") else: # POST - array_data = await http_post( - app, req, data=body, params=params, client=client - ) + array_data = await http_post(app, req, data=body, params=params, client=client) log.debug(f"http_post {req}, returned {len(array_data)} bytes") except HTTPNotFound: if query is None and "s3path" in params: @@ -364,7 +393,9 @@ async def read_chunk_hyperslab( else: # convert binary data to numpy array try: - chunk_arr = bytesToArray(array_data, dt, chunk_shape) + log.debug(f"np_arr.dtype: {np_arr.dtype}") + log.debug(f"chunk_shape: {chunk_shape}") + chunk_arr = bytesToArray(array_data, np_arr.dtype, chunk_shape) except ValueError as ve: log.warn(f"bytesToArray ValueError: {ve}") raise HTTPBadRequest() @@ -377,9 +408,9 @@ async def read_chunk_hyperslab( raise HTTPInternalServerError() chunk_arr = chunk_arr.reshape(chunk_shape) - log.info(f"chunk_arr shape: {chunk_arr.shape}") - log.info(f"data_sel: {data_sel}") - log.info(f"np_arr shape: {np_arr.shape}") + log.debug(f"chunk_arr shape: {chunk_arr.shape}, dtype: {chunk_arr.dtype}") + log.debug(f"data_sel: {data_sel}") + log.debug(f"np_arr shape: {np_arr.shape}") if point_list is not None: # point selection @@ -593,6 +624,7 @@ def __init__( bucket=None, slices=None, arr=None, + select_dtype=None, query=None, query_update=None, limit=0, @@ -603,7 +635,10 @@ def __init__( max_tasks_per_node = config.get("max_tasks_per_node_per_request", default=16) client_pool_count = config.get("client_pool_count", default=10) log.info(f"ChunkCrawler.__init__ {len(chunk_ids)} chunks, action={action}") - log.debug(f"ChunkCrawler - chunk_ids: {chunk_ids}") + if len(chunk_ids) < 10: + log.debug(f"ChunkCrawler - chunk_ids: {chunk_ids}") + else: + log.debug(f"ChunkCrawler - chunk_ids: {chunk_ids[:10]} ...") self._app = app self._slices = slices @@ -611,6 +646,7 @@ def __init__( self._chunk_map = chunk_map self._dset_json = dset_json self._arr = arr + self._select_dtype = select_dtype self._points = points self._query = query self._query_update = query_update @@ -630,6 +666,7 @@ def __init__( self._max_tasks = max_tasks else: self._max_tasks = len(chunk_ids) + log.debug(f"ChunkCrawler max_tasks: {max_tasks}") if self._max_tasks >= client_pool_count: self._client_pool = 1 @@ -683,7 +720,8 @@ async def work(self): this_task = asyncio.current_task() task_name = this_task.get_name() log.info(f"ChunkCrawler - work method for task: {task_name}") - client_name = f"{task_name}.{random.randrange(0,self._client_pool)}" + task_suffix = random.randrange(0, self._client_pool) + client_name = f"{task_name}.{task_suffix}" log.info(f"ChunkCrawler - client_name: {client_name}") while True: try: @@ -739,6 +777,7 @@ async def do_work(self, chunk_id, client=None): chunk_id, self._dset_json, self._arr, + select_dtype=self._select_dtype, query=self._query, query_update=self._query_update, limit=self._limit, @@ -746,9 +785,8 @@ async def do_work(self, chunk_id, client=None): bucket=self._bucket, client=client, ) - log.debug( - f"read_chunk_hyperslab - got 200 status for chunk_id: {chunk_id}" - ) + msg = f"read_chunk_hyperslab - got 200 status for chunk_id: {chunk_id}" + log.debug(msg) status_code = 200 elif self._action == "write_chunk_hyperslab": await write_chunk_hyperslab( @@ -760,9 +798,9 @@ async def do_work(self, chunk_id, client=None): bucket=self._bucket, client=client, ) - log.debug( - f"write_chunk_hyperslab - got 200 status for chunk_id: {chunk_id}" - ) + + msg = f"write_chunk_hyperslab - got 200 status for chunk_id: {chunk_id}" + log.debug(msg) status_code = 200 elif self._action == "read_point_sel": if not isinstance(self._points, dict): @@ -770,9 +808,9 @@ async def do_work(self, chunk_id, client=None): status_code = 500 break if chunk_id not in self._points: - log.error( - f"ChunkCrawler - read_point_sel, no entry for chunk: {chunk_id}" - ) + msg = "ChunkCrawler - read_point_sel, " + msg += f"no entry for chunk: {chunk_id}" + log.error(msg) status_code = 500 break item = self._points[chunk_id] @@ -790,9 +828,8 @@ async def do_work(self, chunk_id, client=None): bucket=self._bucket, client=client, ) - log.debug( - f"read_point_sel - got 200 status for chunk_id: {chunk_id}" - ) + msg = f"read_point_sel - got 200 status for chunk_id: {chunk_id}" + log.debug(msg) status_code = 200 elif self._action == "write_point_sel": if not isinstance(self._points, dict): @@ -800,9 +837,9 @@ async def do_work(self, chunk_id, client=None): status_code = 500 break if chunk_id not in self._points: - log.error( - f"ChunkCrawler - read_point_sel, no entry for chunk: {chunk_id}" - ) + msg = "ChunkCrawler - read_point_sel, " + msg += f"no entry for chunk: {chunk_id}" + log.error(msg) status_code = 500 break item = self._points[chunk_id] @@ -819,9 +856,8 @@ async def do_work(self, chunk_id, client=None): bucket=self._bucket, client=client, ) - log.debug( - f"read_point_sel - got 200 status for chunk_id: {chunk_id}" - ) + msg = f"read_point_sel - got 200 status for chunk_id: {chunk_id}" + log.debug(msg) status_code = 200 else: log.error(f"ChunkCrawler - unexpected action: {self._action}") @@ -830,9 +866,8 @@ async def do_work(self, chunk_id, client=None): except ClientError as ce: status_code = 500 - log.warn( - f"ClientError {type(ce)} for {self._action}({chunk_id}): {ce} " - ) + msg = f"ClientError {type(ce)} for {self._action}({chunk_id}): {ce} " + log.warn(msg) except CancelledError as cle: status_code = 503 log.warn(f"CancelledError for {self._action}({chunk_id}): {cle}") @@ -845,9 +880,8 @@ async def do_work(self, chunk_id, client=None): break except HTTPInternalServerError as ise: status_code = 500 - log.warn( - f"HTTPInternalServerError for {self._action}({chunk_id}): {ise}" - ) + msg = f"HTTPInternalServerError for {self._action}({chunk_id}): {ise}" + log.warn(msg) except HTTPServiceUnavailable as sue: status_code = 503 msg = f"HTTPServiceUnavailable for {self._action}({chunk_id}): {sue}" @@ -876,6 +910,5 @@ async def do_work(self, chunk_id, client=None): if "query_rsp" in item: query_rsp = item["query_rsp"] self._hits += len(query_rsp) - log.info( - f"ChunkCrawler - worker status for chunk {chunk_id}: {self._status_map[chunk_id]}" - ) + msg = f"ChunkCrawler - worker status for chunk {chunk_id}: {self._status_map[chunk_id]}" + log.info(msg) diff --git a/hsds/chunk_dn.py b/hsds/chunk_dn.py index 4f3da7f7..6f9fbfbc 100644 --- a/hsds/chunk_dn.py +++ b/hsds/chunk_dn.py @@ -23,7 +23,7 @@ from .util.arrayUtil import bytesToArray, arrayToBytes, getBroadcastShape from .util.idUtil import getS3Key, validateInPartition, isValidUuid from .util.storUtil import isStorObj, deleteStorObj -from .util.hdf5dtype import createDataType +from .util.hdf5dtype import createDataType, getSubType from .util.dsetUtil import getSelectionList, getChunkLayout, getShapeDims from .util.dsetUtil import getSelectionShape, getChunkInitializer from .util.chunkUtil import getChunkIndex, getDatasetId, chunkQuery @@ -60,15 +60,19 @@ async def PUT_Chunk(request): msg = "Missing chunk id" log.error(msg) raise HTTPBadRequest(reason=msg) + if not isValidUuid(chunk_id, "Chunk"): msg = f"Invalid chunk id: {chunk_id}" log.warn(msg) raise HTTPBadRequest(reason=msg) + log.debug(f"PUT_Chunk - id: {chunk_id}") + if not request.has_body: msg = "PUT Value with no body" log.warn(msg) raise HTTPBadRequest(reason=msg) + if "bucket" in params: bucket = params["bucket"] log.debug(f"PUT_Chunk using bucket: {bucket}") @@ -99,7 +103,12 @@ async def PUT_Chunk(request): log.error(msg) raise HTTPBadRequest(reason=msg) - log.debug(f"PUT_Chunk - id: {chunk_id}") + if "fields" in params: + select_fields = params["fields"].split(":") + log.debug(f"PUT_Chunk - got fields: {select_fields}") + else: + select_fields = [] + log.debug("PUT_Chunk - no select fields") # verify we have at least min_chunk_size free in the chunk cache # otherwise, have the client try a bit later @@ -118,7 +127,12 @@ async def PUT_Chunk(request): rank = len(dims) type_json = dset_json["type"] - dt = createDataType(type_json) + dset_dt = createDataType(type_json) + if select_fields: + select_dt = getSubType(dset_dt, select_fields) + else: + select_dt = dset_dt + if "size" in type_json: itemsize = type_json["size"] else: @@ -142,7 +156,7 @@ async def PUT_Chunk(request): mshape = getSelectionShape(selection) if element_count is not None: bcshape = getBroadcastShape(mshape, element_count) - log.debug(f"ussing bcshape: {bcshape}") + log.debug(f"using bcshape: {bcshape}") else: bcshape = None @@ -170,7 +184,7 @@ async def PUT_Chunk(request): raise HTTPNotFound() if query: - if not dt.fields: + if not dset_dt.fields: log.error("expected compound dtype for PUT query") raise HTTPInternalServerError() if rank != 1: @@ -239,7 +253,7 @@ async def PUT_Chunk(request): # regular chunk update # check that the content_length is what we expect if itemsize != "H5T_VARIABLE": - log.debug(f"expect content_length: {num_elements*itemsize}") + log.debug(f"expected content_length: {num_elements*itemsize}") log.debug(f"actual content_length: {request.content_length}") actual = request.content_length @@ -259,11 +273,11 @@ async def PUT_Chunk(request): log.error(msg) raise HTTPInternalServerError() - input_arr = bytesToArray(input_bytes, dt, [num_elements, ]) + input_arr = bytesToArray(input_bytes, select_dt, [num_elements, ]) if bcshape: input_arr = input_arr.reshape(bcshape) log.debug(f"broadcasting {bcshape} to mshape {mshape}") - arr_tmp = np.zeros(mshape, dtype=dt) + arr_tmp = np.zeros(mshape, dtype=select_dt) arr_tmp[...] = input_arr input_arr = arr_tmp else: @@ -422,6 +436,12 @@ async def GET_Chunk(request): raise HTTPInternalServerError() log.debug(f"GET_Chunk - got selection: {selection}") + if "fields" in params: + select_fields = params["fields"].split(":") + log.debug(f"GET_Chunk - got fields: {select_fields}") + else: + select_fields = [] + if getChunkInitializer(dset_json): chunk_init = True else: @@ -448,8 +468,18 @@ async def GET_Chunk(request): if chunk_init: save_chunk(app, chunk_id, dset_json, chunk_arr, bucket=bucket) - if query: + if select_fields: + try: + select_dt = getSubType(chunk_arr.dtype, select_fields) + except TypeError as te: + # this shouldn't happen, but just in case... + msg = f"invalid fields selection: {te}" + log.warn(msg) + raise HTTPBadRequest(reason=msg) + else: + select_dt = chunk_arr.dtype + if query: try: parser = BooleanParser(query) except Exception as e: @@ -473,6 +503,7 @@ async def GET_Chunk(request): "slices": selection, "query": eval_str, "limit": limit, + "select_dt": select_dt, } output_arr = chunkQuery(**kwargs) except TypeError as te: @@ -482,13 +513,13 @@ async def GET_Chunk(request): log.warn(f"chunkQuery - ValueError: {ve}") raise HTTPBadRequest() if output_arr is None or output_arr.shape[0] == 0: - # no mathces to query + # no matches to query msg = f"chunk {chunk_id} no results for query: {query}" log.debug(msg) raise HTTPNotFound() else: # read selected data from chunk - output_arr = chunkReadSelection(chunk_arr, slices=selection) + output_arr = chunkReadSelection(chunk_arr, slices=selection, select_dt=select_dt) # write response if output_arr is not None: @@ -525,13 +556,15 @@ async def POST_Chunk(request): put_points = False select = None # for hyperslab/fancy selection + body = None num_points = 0 + select_fields = None if "count" in params: try: num_points = int(params["count"]) except ValueError: - msg = f"expected int for count param but got: {params['clount']}" + msg = f"expected int for count param but got: {params['count']}" log.warn(msg) raise HTTPBadRequest(reason=msg) else: @@ -616,7 +649,7 @@ async def POST_Chunk(request): rank = len(dims) type_json = dset_json["type"] - dset_dtype = createDataType(type_json) + dset_dt = createDataType(type_json) output_arr = None if getChunkInitializer(dset_json): @@ -627,6 +660,13 @@ async def POST_Chunk(request): # don't need for getting points chunk_init = False + if "fields" in params: + select_fields = params["fields"].split(":") + if select_fields: + select_dt = getSubType(dset_dt, select_fields) + else: + select_dt = dset_dt + if content_type == "binary": # create a numpy array for incoming points input_bytes = await request_read(request) @@ -644,7 +684,7 @@ async def POST_Chunk(request): if put_points: # create a numpy array with the following type: # (coord1, coord2, ...) | dset_dtype - type_fields = [("coord", np.dtype(coord_type_str)), ("value", dset_dtype)] + type_fields = [("coord", np.dtype(coord_type_str)), ("value", select_dt)] point_dt = np.dtype(type_fields) point_shape = (num_points,) else: @@ -659,6 +699,18 @@ async def POST_Chunk(request): raise HTTPBadRequest() select = body["select"] log.debug(f"POST_Chunk - using select string: {select}") + if "fields" in body: + if select_fields: + # this should have been caught in the chunk_sn code... + msg = "POST_Chunk: got fields key in body when already given as query param" + log.warn(msg) + raise HTTPBadRequest(reason=msg) + + select_fields = body["fields"] + if isinstance(select_fields, str): + select_fields = [select_fields, ] # convert to a list + log.debug(f"POST_Chunk - got fields: {select_fields}") + select_dt = getSubType(dset_dt, select_fields) kwargs = {"chunk_init": chunk_init} if s3path: @@ -685,6 +737,7 @@ async def POST_Chunk(request): "chunk_layout": dims, "chunk_arr": chunk_arr, "point_arr": point_arr, + "select_dt": select_dt, } chunkWritePoints(**kwargs) except ValueError as ve: @@ -701,7 +754,7 @@ async def POST_Chunk(request): raise HTTPInternalServerError() log.debug(f"GET_Chunk - got selection: {selection}") # read selected data from chunk - output_arr = chunkReadSelection(chunk_arr, slices=selection) + output_arr = chunkReadSelection(chunk_arr, slices=selection, select_dt=select_dt) else: # read points @@ -711,6 +764,7 @@ async def POST_Chunk(request): "chunk_layout": dims, "chunk_arr": chunk_arr, "point_arr": point_arr, + "select_dt": select_dt } output_arr = chunkReadPoints(**kwargs) except ValueError as ve: @@ -740,8 +794,6 @@ async def POST_Chunk(request): async def DELETE_Chunk(request): """HTTP DELETE method for /chunks/ - Note: clients (i.e. SN nodes) don't directly delete chunks. - This method should only be called by the AN node. """ log.request(request) app = request.app diff --git a/hsds/chunk_sn.py b/hsds/chunk_sn.py index 14588d91..03c28fba 100755 --- a/hsds/chunk_sn.py +++ b/hsds/chunk_sn.py @@ -24,13 +24,13 @@ from aiohttp.web_exceptions import HTTPConflict, HTTPInternalServerError from aiohttp.web import StreamResponse -from .util.httpUtil import getHref, getAcceptType, getContentType, http_put +from .util.httpUtil import getHref, getAcceptType, getContentType from .util.httpUtil import request_read, jsonResponse, isAWSLambda -from .util.idUtil import isValidUuid, getDataNodeUrl +from .util.idUtil import isValidUuid from .util.domainUtil import getDomainFromRequest, isValidDomain from .util.domainUtil import getBucketForDomain -from .util.hdf5dtype import getItemSize, createDataType -from .util.dsetUtil import isNullSpace, get_slices, getShapeDims +from .util.hdf5dtype import getItemSize, getSubType, createDataType +from .util.dsetUtil import isNullSpace, isScalarSpace, get_slices, getShapeDims from .util.dsetUtil import isExtensible, getSelectionPagination from .util.dsetUtil import getSelectionShape, getDsetMaxDims, getChunkLayout from .util.chunkUtil import getNumChunks, getChunkIds, getChunkId @@ -38,9 +38,8 @@ from .util.arrayUtil import getNumElements, arrayToBytes, bytesToArray from .util.arrayUtil import squeezeArray, getBroadcastShape from .util.authUtil import getUserPasswordFromRequest, validateUserPassword -from .util.boolparser import BooleanParser from .servicenode_lib import getDsetJson, validateAction -from .dset_lib import getSelectionData +from .dset_lib import getSelectionData, getParser, extendShape from .chunk_crawl import ChunkCrawler from . import config from . import hsds_logger as log @@ -76,70 +75,581 @@ def use_http_streaming(request, rank): return True -async def PUT_Value(request): - """ - Handler for PUT //value request - """ - log.request(request) - app = request.app - bucket = None - body = None - query = None - json_data = None - params = request.rel_url.query - append_rows = None # this is a append update or not - append_dim = 0 - num_elements = None - element_count = None - if "append" in params and params["append"]: +def _isIgnoreNan(params, body=None): + kw = "ignore_nan" + if isinstance(body, dict) and kw in params and params[kw]: + ignore_nan = bool(body) + elif kw in params and params[kw]: + ignore_nan = True + else: + ignore_nan = False + return ignore_nan + + +def _isAppend(params, body=None): + """ return True if append values are specified in params or body """ + kw = "append" + if isinstance(body, dict) and kw in body and body[kw]: + isAppend = True + elif kw in params and params[kw]: + isAppend = True + else: + isAppend = False + return isAppend + + +def _getAppendDim(params, body=None): + append_dim_param = None + kw = "append_dim" + if isinstance(body, dict): + if kw in body: + append_dim_param = body[kw] + + if append_dim_param is None: + # check query param + if kw in params: + append_dim_param = params[kw] + + if append_dim_param is not None: try: - append_rows = int(params["append"]) + append_dim = int(append_dim_param) except ValueError: - msg = "invalid append query param" + msg = "invalid append_dim" log.warn(msg) raise HTTPBadRequest(reason=msg) - log.info(f"append_rows: {append_rows}") - if "select" in params: - msg = "select query parameter can not be used with packet updates" + log.info(f"append_dim: {append_dim}") + else: + append_dim = 0 + + return append_dim + + +def _getAppendRows(params, dset_json, body=None): + """ get append rows value from query param or body """ + append_rows = None + kw = "append" + + if isinstance(body, dict) and kw in body and body[kw]: + try: + append_rows = int(body[kw]) + except ValueError: + msg = "invalid append value in body" log.warn(msg) raise HTTPBadRequest(reason=msg) - if "append_dim" in params and params["append_dim"]: + elif kw in params and params[kw]: try: - append_dim = int(params["append_dim"]) + append_rows = int(params[kw]) except ValueError: + msg = "invalid append query param" + log.warn(msg) + raise HTTPBadRequest(reason=msg) + else: + append_rows = None + + if append_rows: + log.info(f"append_rows: {append_rows}") + datashape = dset_json["shape"] + dims = getShapeDims(datashape) + rank = len(dims) + if rank == 0: + msg = "append can't be used in scalar or null space datasets" + log.warn(msg) + raise HTTPBadRequest(reason=msg) + # select can't be used with append + if _isSelect(params, body=body): + msg = "select query parameter can not be used with append" + log.warn(msg) + raise HTTPBadRequest(reason=msg) + + # shape must be extensible + datashape = dset_json["shape"] + dims = getShapeDims(datashape) + rank = len(dims) + maxdims = getDsetMaxDims(dset_json) + if not isExtensible(dims, maxdims): + msg = "Dataset shape must be extensible for packet updates" + log.warn(msg) + raise HTTPBadRequest(reason=msg) + append_dim = _getAppendDim(params, body=body) + if append_dim < 0 or append_dim > rank - 1: msg = "invalid append_dim" log.warn(msg) raise HTTPBadRequest(reason=msg) - log.info(f"append_dim: {append_dim}") - if "query" in params: - if "append" in params: + if maxdims[append_dim] != 0: + if dims[append_dim] + append_rows > maxdims[append_dim]: + log.warn("unable to append to dataspace") + raise HTTPConflict() + + return append_rows + + +def _isSelect(params, body=None): + """ return True if select param or select is set in request body + """ + if "select" in params and params["select"]: + return True + + if isinstance(body, dict): + if "select" in body and body["select"]: + return True + for key in ("start", "stop", "step"): + if key in body and body[key]: + return True + return False + + +def _getSelect(params, dset_json, body=None): + """ return selection region if any as a list + of slices. """ + slices = None + log.debug(f"_getSelect params: {params} body: {body}") + try: + if body and isinstance(body, dict): + if "select" in body and body["select"]: + select = body.get("select") + slices = get_slices(select, dset_json) + elif "start" in body and "stop" in body: + slices = get_slices(body, dset_json) + if "select" in params and params["select"]: + select = params.get("select") + if slices: + msg = "select defined in both request body and query parameters" + raise ValueError(msg) + slices = get_slices(select, dset_json) + except ValueError as ve: + log.warn(f"Invalid selection: {ve}") + raise HTTPBadRequest(reason="Invalid selection") + + if _isAppend(params, body=body) and slices: + msg = "append can't be used with selection" + log.warn(msg) + raise HTTPBadRequest(reason=msg) + + if not slices: + # just return the entire dataspace + datashape = dset_json["shape"] + dims = getShapeDims(datashape) + slices = [] + for dim in dims: + s = slice(0, dim, 1) + slices.append(s) + log.debug(f"_getSelect returning: {slices}") + return slices + + +def _getSelectDtype(params, dset_dtype, body=None): + """ if a field list is defined in params or body, + create a sub-type of the dset dtype. Else, + just return the dset dtype. """ + + kw = "fields" + if isinstance(body, dict) and kw in body: + select_fields = body[kw] + elif kw in params: + fields_param = params.get(kw) + log.debug(f"fields param: {fields_param}") + select_fields = fields_param.split(":") + else: + select_fields = None + + if select_fields: + try: + select_dtype = getSubType(dset_dtype, select_fields) + except TypeError as te: + msg = f"invalid fields selection: {te}" + log.warn(msg) + raise HTTPBadRequest(reason=msg) + log.debug(f"using select dtype: {select_dtype}") + else: + select_dtype = dset_dtype # return all fields + + return select_dtype + + +def _getLimit(params, body=None): + kw = "Limit" + if isinstance(body, dict) and kw in body: + limit = body[kw] + elif kw in params: + try: + limit = int(params[kw]) + except ValueError: + msg = "Limit param must be positive int" + log.warning(msg) + raise HTTPBadRequest(reason=msg) + else: + limit = 0 + return limit + + +def _getPoints(body, rank=1): + """ return a set of points defined in the body + as a numpy array. Return None if no points are set. """ + if not body or "points" not in body: + return None + + json_points = body["points"] + num_points = len(json_points) + + if rank == 1: + point_shape = (num_points,) + log.info(f"rank 1: point_shape: {point_shape}") + else: + point_shape = (num_points, rank) + log.info(f"rank >1: point_shape: {point_shape}") + try: + # use uint64 so we can address large array extents + dt = np.dtype(np.uint64) + points = jsonToArray(point_shape, dt, json_points) + except ValueError: + msg = "Bad Request: point list not valid for dataset shape" + log.warn(msg) + raise HTTPBadRequest(reason=msg) + return points + + +def _getQuery(params, dtype, rank=1, body=None): + """ get query parameter and validate if set """ + + kw = "query" + if isinstance(body, dict) and kw in body: + query = body[kw] + elif kw in params: + query = params[kw] + else: + query = None + + if query: + if _isAppend(params, body=body): msg = "Query string can not be used with append parameter" log.warn(msg) raise HTTPBadRequest(reason=msg) - query = params["query"] + # validate the query string + if rank > 1: + msg = "Query string is not supported for multidimensional datasets" + log.warn(msg) + raise HTTPBadRequest(reason=msg) + + if len(dtype) == 0: + msg = "Query string is not supported for primitive type datasets" + log.warn(msg) - if "element_count" in params: + # following will throw HTTPBadRequest if query is malformed + getParser(query, dtype) + return query + + +def _getElementCount(params, body=None): + """ get element count as query param or body key """ + kw = "element_count" + + if isinstance(body, dict) and kw in body: + element_count = body[kw] + if not isinstance(element_count, int): + msg = f"expected int value for element_count, but got: {type(element_count)}" + log.warn(msg) + raise HTTPBadRequest(reason=msg) + log.debug(f"element_count body: {element_count}") + elif kw in params: try: - element_count = int(params["element_count"]) + element_count = int(params[kw]) except ValueError: msg = "invalid element_count" log.warn(msg) raise HTTPBadRequest(reason=msg) - log.debug(f"element_count param: {element_count}") + log.debug(f"{kw} param: {element_count}") + else: + element_count = None - dset_id = request.match_info.get("id") - if not dset_id: - msg = "Missing dataset id" + return element_count + + +async def _getRequestData(request, http_streaming=True): + """ get input data from request + return dict for json input, bytes for non-streaming binary + or None, for streaming """ + + input_data = None + request_type = getContentType(request) + log.debug(f"_getRequestData - request_type: {request_type}") + if request_type == "json": + body = await request.json() + log.debug(f"getRequestData - got json: {body}") + if "value" in body: + input_data = body["value"] + log.debug("input_data: {input_data}") + elif "value_base64" in body: + base64_data = body["value_base64"] + base64_data = base64_data.encode("ascii") + input_data = base64.b64decode(base64_data) + else: + msg = "request has no value or value_base64 key in body" + log.warn(msg) + raise HTTPBadRequest(reason=msg) + else: + # read binary data from request + log.info(f"request content_length: {request.content_length}") + + if isinstance(request.content_length, int): + max_request_size = int(config.get("max_request_size")) + if request.content_length >= max_request_size: + if http_streaming: + # just do an info log that we'll be paginating over a large request + msg = f"will paginate over large request with {request.content_length} bytes" + log.info(msg) + else: + msg = f"Request size {request.content_length} too large " + msg += f"for variable length data, max: {max_request_size}" + log.warn(msg) + raise HTTPRequestEntityTooLarge(max_request_size, request.content_length) + + if not http_streaming: + # read the request data now + # TBD: support streaming for variable length types + try: + input_data = await request_read(request) + except HTTPRequestEntityTooLarge as tle: + msg = "Got HTTPRequestEntityTooLarge exception during " + msg += f"binary read: {tle})" + log.warn(msg) + raise # re-throw + return input_data + + +async def arrayResponse(arr, request, dset_json): + """ return the array as binary or json response based on accept type """ + response_type = getAcceptType(request) + + if response_type == "binary": + output_data = arr.tobytes() + msg = f"PUT_Value query - returning {len(output_data)} bytes binary data" + log.debug(msg) + + # write response + try: + resp = StreamResponse() + if config.get("http_compression"): + log.debug("enabling http_compression") + resp.enable_compression() + resp.headers["Content-Type"] = "application/octet-stream" + resp.content_length = len(output_data) + await resp.prepare(request) + await resp.write(output_data) + await resp.write_eof() + except Exception as e: + log.error(f"Exception during binary data write: {e}") + else: + log.debug("PUT Value query - returning JSON data") + rsp_json = {} + data = arr.tolist() + log.debug(f"got rsp data {len(data)} points") + try: + json_query_data = bytesArrayToList(data) + except ValueError as err: + msg = f"Cannot decode provided bytes to list: {err}" + raise HTTPBadRequest(reason=msg) + rsp_json["value"] = json_query_data + rsp_json["hrefs"] = get_hrefs(request, dset_json) + + resp = await jsonResponse(request, rsp_json) + return resp + + +async def _doPointWrite(app, + request, + points=None, + data=None, + dset_json=None, + bucket=None + ): + """ write the given points to the dataset """ + + num_points = len(points) + log.debug(f"doPointWrite - num_points: {num_points}") + dset_id = dset_json["id"] + layout = getChunkLayout(dset_json) + datashape = dset_json["shape"] + dims = getShapeDims(datashape) + rank = len(dims) + + chunk_dict = {} # chunk ids to list of points in chunk + + for pt_indx in range(num_points): + if rank == 1: + point = int(points[pt_indx]) + else: + point_tuple = points[pt_indx] + point = [] + for i in range(len(point_tuple)): + point.append(int(point_tuple[i])) + if rank == 1: + if point < 0 or point >= dims[0]: + msg = f"PUT Value point: {point} is not within the " + msg += "bounds of the dataset" + log.warn(msg) + raise HTTPBadRequest(reason=msg) + else: + if len(point) != rank: + msg = "PUT Value point value did not match dataset rank" + log.warn(msg) + raise HTTPBadRequest(reason=msg) + for i in range(rank): + if point[i] < 0 or point[i] >= dims[i]: + msg = f"PUT Value point: {point} is not within the " + msg += "bounds of the dataset" + log.warn(msg) + raise HTTPBadRequest(reason=msg) + chunk_id = getChunkId(dset_id, point, layout) + # get the pt_indx element from the input data + value = data[pt_indx] + if chunk_id not in chunk_dict: + point_list = [point, ] + point_data = [value, ] + chunk_dict[chunk_id] = {"indices": point_list, "points": point_data} + else: + item = chunk_dict[chunk_id] + point_list = item["indices"] + point_list.append(point) + point_data = item["points"] + point_data.append(value) + + num_chunks = len(chunk_dict) + log.debug(f"num_chunks: {num_chunks}") + max_chunks = int(config.get("max_chunks_per_request", default=1000)) + if num_chunks > max_chunks: + msg = f"PUT value request with more than {max_chunks} chunks" log.warn(msg) - raise HTTPBadRequest(reason=msg) - if not isValidUuid(dset_id, "Dataset"): - msg = f"Invalid dataset id: {dset_id}" + + chunk_ids = list(chunk_dict.keys()) + chunk_ids.sort() + + crawler = ChunkCrawler( + app, + chunk_ids, + dset_json=dset_json, + bucket=bucket, + points=chunk_dict, + action="write_point_sel", + ) + await crawler.crawl() + + crawler_status = crawler.get_status() + + if crawler_status not in (200, 201): + msg = f"doPointWritte raising HTTPInternalServerError for status: {crawler_status}" + log.error(msg) + raise HTTPInternalServerError() + else: + log.info("doPointWrite success") + + +async def _doHyperslabWrite(app, + request, + page_number=0, + page=None, + data=None, + dset_json=None, + select_dtype=None, + bucket=None + ): + """ write the given page selection to the dataset """ + dset_id = dset_json["id"] + log.info(f"_doHyperslabWrite on {dset_id} - page: {page_number}") + type_json = dset_json["type"] + item_size = getItemSize(type_json) + layout = getChunkLayout(dset_json) + + num_chunks = getNumChunks(page, layout) + log.debug(f"num_chunks: {num_chunks}") + max_chunks = int(config.get("max_chunks_per_request", default=1000)) + if num_chunks > max_chunks: + msg = f"PUT value chunk count: {num_chunks} exceeds max_chunks: {max_chunks}" + log.warn(msg) + select_shape = getSelectionShape(page) + log.debug(f"got select_shape: {select_shape} for page: {page_number}") + num_bytes = math.prod(select_shape) * item_size + if data is None: + log.debug(f"reading {num_bytes} from request stream") + # read page of data from input stream + try: + page_bytes = await request_read(request, count=num_bytes) + except HTTPRequestEntityTooLarge as tle: + msg = "Got HTTPRequestEntityTooLarge exception during " + msg += f"binary read: {tle}) for page: {page_number}" + log.warn(msg) + raise # re-throw + except IncompleteReadError as ire: + msg = "Got asyncio.IncompleteReadError during binary " + msg += f"read: {ire} for page: {page_number}" + log.warn(msg) + raise HTTPBadRequest(reason=msg) + log.debug(f"read {len(page_bytes)} for page: {page_number}") + try: + arr = bytesToArray(page_bytes, select_dtype, select_shape) + except ValueError as ve: + msg = f"bytesToArray value error for page: {page_number}: {ve}" + log.warn(msg) + raise HTTPBadRequest(reason=msg) + else: + arr = data # use array provided to function + + try: + chunk_ids = getChunkIds(dset_id, page, layout) + except ValueError: + log.warn("getChunkIds failed") + raise HTTPInternalServerError() + if len(chunk_ids) < 10: + log.debug(f"chunk_ids: {chunk_ids}") + else: + log.debug(f"chunk_ids: {chunk_ids[:10]} ...") + if len(chunk_ids) > max_chunks: + msg = f"got {len(chunk_ids)} for page: {page_number}. max_chunks: {max_chunks}" + log.warn(msg) + + crawler = ChunkCrawler( + app, + chunk_ids, + dset_json=dset_json, + bucket=bucket, + slices=page, + arr=arr, + action="write_chunk_hyperslab", + ) + await crawler.crawl() + + crawler_status = crawler.get_status() + + if crawler_status not in (200, 201): + msg = f"crawler failed for page: {page_number} with status: {crawler_status}" + log.warn(msg) + else: + log.info("crawler write_chunk_hyperslab successful") + + +async def PUT_Value(request): + """ + Handler for PUT //value request + """ + app = request.app + bucket = None + body = None + query = None + params = request.rel_url.query + append_rows = None # this is a append update or not + append_dim = 0 + num_elements = None + element_count = None + limit = None # query limit + + arr_rsp = None # array data to return if any + + if not request.has_body: + msg = "PUT Value with no body" log.warn(msg) raise HTTPBadRequest(reason=msg) - username, pswd = getUserPasswordFromRequest(request) - await validateUserPassword(app, username, pswd) + log.request(request) domain = getDomainFromRequest(request) if not isValidDomain(domain): @@ -148,281 +658,134 @@ async def PUT_Value(request): raise HTTPBadRequest(reason=msg) bucket = getBucketForDomain(domain) - request_type = getContentType(request) - - log.debug(f"PUT value - request_type is {request_type}") - - if not request.has_body: - msg = "PUT Value with no body" + dset_id = request.match_info.get("id") + if not dset_id: + msg = "Missing dataset id" + log.warn(msg) + raise HTTPBadRequest(reason=msg) + if not isValidUuid(dset_id, "Dataset"): + msg = f"Invalid dataset id: {dset_id}" log.warn(msg) raise HTTPBadRequest(reason=msg) - if request_type == "json": - try: - body = await request.json() - except JSONDecodeError: - msg = "Unable to load JSON body" - log.warn(msg) - raise HTTPBadRequest(reason=msg) - if "append" in body and body["append"]: - try: - append_rows = int(body["append"]) - except ValueError: - msg = "invalid append value in body" - log.warn(msg) - raise HTTPBadRequest(reason=msg) - log.info(f"append_rows: {append_rows}") + # authenticate and authorize action + username, pswd = getUserPasswordFromRequest(request) + await validateUserPassword(app, username, pswd) - if append_rows: - for key in ("start", "stop", "step"): - if key in body: - msg = f"body key {key} can not be used with append" - log.warn(msg) - raise HTTPBadRequest(reason=msg) + await validateAction(app, domain, dset_id, username, "update") - if "append_dim" in body and body["append_dim"]: - try: - append_dim = int(body["append_dim"]) - except ValueError: - msg = "invalid append_dim" - log.warn(msg) - raise HTTPBadRequest(reason=msg) - log.info(f"append_dim: {append_dim}") + request_type = getContentType(request) - # get state for dataset from DN. + log.debug(f"PUT value - request_type is {request_type}") + + # get state for dataset from DN - will need this to validate + # some of the query parameters dset_json = await getDsetJson(app, dset_id, bucket=bucket) - layout = None datashape = dset_json["shape"] - if datashape["class"] == "H5S_NULL": + if isNullSpace(dset_json): msg = "Null space datasets can not be used as target for PUT value" log.warn(msg) raise HTTPBadRequest(reason=msg) dims = getShapeDims(datashape) - maxdims = getDsetMaxDims(dset_json) rank = len(dims) - if query and rank > 1: - msg = "Query string is not supported for multidimensional arrays" - log.warn(msg) - raise HTTPBadRequest(reason=msg) - - layout = getChunkLayout(dset_json) - type_json = dset_json["type"] dset_dtype = createDataType(type_json) item_size = getItemSize(type_json) - max_request_size = int(config.get("max_request_size")) - if query: - # divert here if we are doing a put query - # returns array data like a GET query request - log.debug(f"got query: {query}") + if request_type == "json": try: - parser = BooleanParser(query) - except Exception: - msg = f"query: {query} is not valid" + body = await request.json() + except JSONDecodeError: + msg = "Unable to load JSON body" log.warn(msg) raise HTTPBadRequest(reason=msg) - field_names = set(dset_dtype.names) - variables = parser.getVariables() - for variable in variables: - if variable not in field_names: - msg = f"query variable {variable} not valid" - log.warn(msg) - raise HTTPBadRequest(reason=msg) + log.debug(f"got body: {body}") - select = params.get("select") - try: - slices = get_slices(select, dset_json) - except ValueError as ve: - log.warn(f"Invalid selection: {ve}") - raise HTTPBadRequest(reason="Invalid selection") + select_dtype = _getSelectDtype(params, dset_dtype, body=body) + append_rows = _getAppendRows(params, dset_json, body=body) - if "Limit" in params: - try: - limit = int(params["Limit"]) - except ValueError: - msg = "Limit param must be positive int" - log.warning(msg) - raise HTTPBadRequest(reason=msg) - else: - limit = 0 + if append_rows: + append_dim = _getAppendDim(params, body=body) + log.debug(f"append_rows: {append_rows}, append_dim: {append_dim}") + + points = _getPoints(body, rank) + if points is not None: + log.debug(f"got points: {points.shape}") + if append_rows: + msg = "points not valid with append update" + log.warn(msg) + raise HTTPBadRequest(reason=msg) + + # if there's no selection parameter, this will return entire dataspace + slices = _getSelect(params, dset_json, body=body) + + query = _getQuery(params, dset_dtype, rank=rank, body=body) + + element_count = _getElementCount(params, body=body) + + if item_size == 'H5T_VARIABLE' or element_count or not use_http_streaming(request, rank): + http_streaming = False + else: + http_streaming = True + + if query: + # divert here if we are doing a put query + # returns array data like a GET query request + log.debug(f"got query: {query}") + limit = _getLimit(params, body=body) arr_rsp = await getSelectionData( app, dset_id, dset_json, - slices, + slices=slices, query=query, bucket=bucket, limit=limit, query_update=body, - method=request.method, ) - - response_type = getAcceptType(request) - - if response_type == "binary": - output_data = arr_rsp.tobytes() - msg = f"PUT_Value query - returning {len(output_data)} bytes binary data" - log.debug(msg) - - # write response - try: - resp = StreamResponse() - if config.get("http_compression"): - log.debug("enabling http_compression") - resp.enable_compression() - resp.headers["Content-Type"] = "application/octet-stream" - resp.content_length = len(output_data) - await resp.prepare(request) - await resp.write(output_data) - await resp.write_eof() - except Exception as e: - log.error(f"Exception during binary data write: {e}") - else: - log.debug("PUT Value query - returning JSON data") - rsp_json = {} - data = arr_rsp.tolist() - log.debug(f"got rsp data {len(data)} points") - try: - json_query_data = bytesArrayToList(data) - except ValueError as err: - msg = f"Cannot decode provided bytes to list: {err}" - raise HTTPBadRequest(reason=msg) - rsp_json["value"] = json_query_data - rsp_json["hrefs"] = get_hrefs(request, dset_json) - resp = await jsonResponse(request, rsp_json) + resp = await arrayResponse(arr_rsp, request, dset_json) log.response(request, resp=resp) return resp - # Resume regular PUT_Value processing without query update - dset_dtype = createDataType(type_json) # np datatype + # regular PUT_Value processing without query update binary_data = None - points = None # used for point selection writes np_shape = [] # shape of incoming data bc_shape = [] # shape of broadcast array (if element_count is set) - slices = [] # selection area to write to - - if item_size == 'H5T_VARIABLE' or element_count or not use_http_streaming(request, rank): - http_streaming = False - else: - http_streaming = True - - # body could also contain a point selection specifier - if body and "points" in body: - if append_rows: - msg = "points not valid with append update" - log.warn(msg) - raise HTTPBadRequest(reason=msg) - - json_points = body["points"] - num_points = len(json_points) - if rank == 1: - point_shape = (num_points,) - log.info(f"rank 1: point_shape: {point_shape}") - else: - point_shape = (num_points, rank) - log.info(f"rank >1: point_shape: {point_shape}") - try: - # use uint64 so we can address large array extents - dt = np.dtype(np.uint64) - points = jsonToArray(point_shape, dt, json_points) - except ValueError: - msg = "Bad Request: point list not valid for dataset shape" - log.warn(msg) - raise HTTPBadRequest(reason=msg) + input_data = await _getRequestData(request, http_streaming=http_streaming) + # could be int, list, str, bytes, or None + log.debug(f"got input data type: {type(input_data)}") if append_rows: - # shape must be extensible - if not isExtensible(dims, maxdims): - msg = "Dataset shape must be extensible for packet updates" - log.warn(msg) - raise HTTPBadRequest(reason=msg) - if append_dim < 0 or append_dim > rank - 1: - msg = "invalid append_dim" - log.warn(msg) - raise HTTPBadRequest(reason=msg) - maxdims = getDsetMaxDims(dset_json) - if maxdims[append_dim] != 0: - if dims[append_dim] + append_rows > maxdims[append_dim]: - log.warn("unable to append to dataspace") - raise HTTPConflict() - - if request_type == "json": - if "value" in body: - json_data = body["value"] - - elif "value_base64" in body: - base64_data = body["value_base64"] - base64_data = base64_data.encode("ascii") - binary_data = base64.b64decode(base64_data) - else: - msg = "PUT value has no value or value_base64 key in body" - log.warn(msg) - raise HTTPBadRequest(reason=msg) - else: - # read binary data from request - log.info(f"request content_length: {request.content_length}") - - if isinstance(request.content_length, int): - if request.content_length >= max_request_size: - if http_streaming: - # just do an info log that we'll be paginating over a large request - msg = f"will paginate over large request with {request.content_length} bytes" - log.info(msg) - else: - msg = f"Request size {request.content_length} too large " - msg += f"for variable length data, max: {max_request_size}" - log.warn(msg) - raise HTTPRequestEntityTooLarge(max_request_size, request.content_length) - - if not http_streaming: - # read the request data now - # TBD: support streaming for variable length types - try: - binary_data = await request_read(request) - except HTTPRequestEntityTooLarge as tle: - msg = "Got HTTPRequestEntityTooLarge exception during " - msg += f"binary read: {tle})" + # extend datashape and set slices to shape of appended region + # make sure any other dimensions are non-zero + for dim in range(rank): + if dim == append_dim: + continue + extent = dims[dim] + if extent == 0: + msg = f"zero-extent datashape in dimension {dim} " + msg += "must be extended before appending values" log.warn(msg) - raise # re-throw + raise HTTPBadRequest(reason=msg) - if append_rows: - for i in range(rank): - if i == append_dim: - np_shape.append(append_rows) - # this will be adjusted once the dataspace is extended - slices.append(slice(0, append_rows, 1)) - else: - if dims[i] == 0: - dims[i] = 1 # need a non-zero extent for all dimensionas - np_shape.append(dims[i]) - slices.append(slice(0, dims[i], 1)) + slices = await extendShape(app, dset_json, append_rows, axis=append_dim, bucket=bucket) + np_shape = getSelectionShape(slices) log.debug(f"np_shape based on append_rows: {np_shape}") - np_shape = tuple(np_shape) - elif points is None: - try: - if body and "start" in body and "stop" in body: - slices = get_slices(body, dset_json) - else: - select = params.get("select") - slices = get_slices(select, dset_json) - except ValueError as ve: - log.warn(f"Invalid Selection: {ve}") - raise HTTPBadRequest(reason="Invalid Selection") - # The selection parameters will determine expected put value shape log.debug(f"PUT Value selection: {slices}") # not point selection, get hyperslab selection shape np_shape = getSelectionShape(slices) else: # point update - np_shape = [num_points,] + np_shape = [len(points),] + + np_shape = tuple(np_shape) # no more edits log.debug(f"selection shape: {np_shape}") if np.prod(np_shape) == 0: @@ -449,33 +812,29 @@ async def PUT_Value(request): log.debug(f"selection num elements: {num_elements}") arr = None # np array to hold request data - if binary_data: + if isinstance(input_data, bytes): + # non-streaming binary input if item_size == "H5T_VARIABLE": # binary variable length data try: - arr = bytesToArray(binary_data, dset_dtype, [num_elements,]) + arr = bytesToArray(input_data, select_dtype, [num_elements,]) except ValueError as ve: log.warn(f"bytesToArray value error: {ve}") raise HTTPBadRequest() else: # fixed item size - if len(binary_data) % item_size != 0: + if len(input_data) % item_size != 0: msg = f"Expected request size to be a multiple of {item_size}, " msg += f"but {len(binary_data)} bytes received" log.warn(msg) raise HTTPBadRequest(reason=msg) - if len(binary_data) // item_size != num_elements: + if len(input_data) // item_size != num_elements: msg = f"expected {item_size * num_elements} bytes but got {len(binary_data)}" log.warn(msg) raise HTTPBadRequest(reason=msg) - # check against max request size - if num_elements * item_size > max_request_size: - msg = f"read {num_elements*item_size} bytes, greater than {max_request_size}" - log.warn(msg) - - arr = np.fromstring(binary_data, dtype=dset_dtype) + arr = np.fromstring(input_data, dtype=dset_dtype) log.debug(f"read fixed type array: {arr}") if bc_shape: @@ -486,7 +845,7 @@ async def PUT_Value(request): else: # need to instantiate the full np_shape since chunk boundries # will effect how individual chunks get set - arr_tmp = np.zeros(np_shape, dtype=dset_dtype) + arr_tmp = np.zeros(np_shape, dtype=select_dtype) arr_tmp[...] = arr arr = arr_tmp @@ -509,9 +868,9 @@ async def PUT_Value(request): # only enable broadcast if not appending if bc_shape: - arr = jsonToArray(bc_shape, dset_dtype, json_data) + arr = jsonToArray(bc_shape, select_dtype, input_data) else: - arr = jsonToArray(np_shape, dset_dtype, json_data) + arr = jsonToArray(np_shape, select_dtype, input_data) if num_elements != np.prod(arr.shape): msg = f"expected {num_elements} elements, but got {np.prod(arr.shape)}" @@ -519,7 +878,7 @@ async def PUT_Value(request): if bc_shape and element_count != 1: # broadcast to target - arr_tmp = np.zeros(np_shape, dtype=dset_dtype) + arr_tmp = np.zeros(np_shape, dtype=select_dtype) arr_tmp[...] = arr arr = arr_tmp except ValueError: @@ -533,217 +892,42 @@ async def PUT_Value(request): raise HTTPBadRequest(reason=msg) log.debug(f"got json arr: {arr.shape}") else: - log.debug("will using streaming for request data") - - if append_rows: - # extend the shape of the dataset - req = getDataNodeUrl(app, dset_id) + "/datasets/" + dset_id + "/shape" - body = {"extend": append_rows, "extend_dim": append_dim} - params = {} - if bucket: - params["bucket"] = bucket - selection = None - try: - shape_rsp = await http_put(app, req, data=body, params=params) - log.info(f"got shape put rsp: {shape_rsp}") - if "selection" in shape_rsp: - selection = shape_rsp["selection"] - except HTTPConflict: - log.warn("got 409 extending dataspace for PUT value") - raise - if not selection: - log.error("expected to get selection in PUT shape response") - raise HTTPInternalServerError() - # selection should be in the format [:,n:m,:]. - # extract n and m and use it to update the slice for the - # appending dimension - if not selection.startswith("[") or not selection.endswith("]"): - log.error("Unexpected selection in PUT shape response") - raise HTTPInternalServerError() - selection = selection[1:-1] # strip off brackets - parts = selection.split(",") - for part in parts: - if part == ":": - continue - bounds = part.split(":") - if len(bounds) != 2: - log.error("Unexpected selection in PUT shape response") - raise HTTPInternalServerError() - lb = ub = 0 - try: - lb = int(bounds[0]) - ub = int(bounds[1]) - except ValueError: - log.error("Unexpected selection in PUT shape response") - raise HTTPInternalServerError() - log.info(f"lb: {lb} ub: {ub}") - # update the slices to indicate where to place the data - slices[append_dim] = slice(lb, ub, 1) + log.debug("will use streaming for request data") slices = tuple(slices) # no more edits to slices - crawler_status = None # will be set below if points is None: + # do a hyperslab write if arr is not None: # make a one page list to handle the write in one chunk crawler run # (larger write request should user binary streaming) pages = (slices,) log.debug(f"non-streaming data, setting page list to: {slices}") else: + max_request_size = int(config.get("max_request_size")) pages = getSelectionPagination(slices, dims, item_size, max_request_size) log.debug(f"getSelectionPagination returned: {len(pages)} pages") - bytes_streamed = 0 - max_chunks = int(config.get("max_chunks_per_request", default=1000)) for page_number in range(len(pages)): page = pages[page_number] msg = f"streaming request data for page: {page_number+1} of {len(pages)}, " msg += f"selection: {page}" log.info(msg) - num_chunks = getNumChunks(page, layout) - log.debug(f"num_chunks: {num_chunks}") - if num_chunks > max_chunks: - log.warn( - f"PUT value chunk count: {num_chunks} exceeds max_chunks: {max_chunks}" - ) - select_shape = getSelectionShape(page) - log.debug(f"got select_shape: {select_shape} for page: {page}") - num_bytes = math.prod(select_shape) * item_size - if arr is None or page_number > 0: - log.debug( - f"page: {page_number} reading {num_bytes} from request stream" - ) - # read page of data from input stream - try: - page_bytes = await request_read(request, count=num_bytes) - except HTTPRequestEntityTooLarge as tle: - msg = "Got HTTPRequestEntityTooLarge exception during " - msg += f"binary read: {tle})" - log.warn(msg) - raise # re-throw - except IncompleteReadError as ire: - msg = "Got asyncio.IncompleteReadError during binary " - msg += f"read: {ire}" - log.warn(msg) - raise HTTPBadRequest(reason=msg) - log.debug(f"read {len(page_bytes)} for page: {page_number+1}") - bytes_streamed += len(page_bytes) - try: - arr = bytesToArray(page_bytes, dset_dtype, select_shape) - except ValueError as ve: - msg = f"bytesToArray value error for page: {page_number+1}: {ve}" - log.warn(msg) - raise HTTPBadRequest(reason=msg) - - try: - chunk_ids = getChunkIds(dset_id, page, layout) - except ValueError: - log.warn("getChunkIds failed") - raise HTTPInternalServerError() - log.debug(f"chunk_ids: {chunk_ids}") - if len(chunk_ids) > max_chunks: - msg = f"got {len(chunk_ids)} for page: {page_number+1}. max_chunks: {max_chunks}" - log.warn(msg) - - crawler = ChunkCrawler( - app, - chunk_ids, - dset_json=dset_json, - bucket=bucket, - slices=page, - arr=arr, - action="write_chunk_hyperslab", - ) - await crawler.crawl() - - crawler_status = crawler.get_status() - - if crawler_status not in (200, 201): - msg = f"crawler failed for page: {page_number+1} with status: {crawler_status}" - log.warn(msg) + kwargs = {"page_number": page_number, "page": page} + kwargs["dset_json"] = dset_json + kwargs["bucket"] = bucket + kwargs["select_dtype"] = select_dtype + if arr is not None and page_number == 0: + kwargs["data"] = arr else: - log.info("crawler write_chunk_hyperslab successful") - + kwargs["data"] = None + # do write for one page selection + await _doHyperslabWrite(app, request, **kwargs) else: # # Do point put # - log.debug(f"num_points: {num_points}") - - chunk_dict = {} # chunk ids to list of points in chunk - - for pt_indx in range(num_points): - if rank == 1: - point = int(points[pt_indx]) - else: - point_tuple = points[pt_indx] - point = [] - for i in range(len(point_tuple)): - point.append(int(point_tuple[i])) - if rank == 1: - if point < 0 or point >= dims[0]: - msg = f"PUT Value point: {point} is not within the " - msg += "bounds of the dataset" - log.warn(msg) - raise HTTPBadRequest(reason=msg) - else: - if len(point) != rank: - msg = "PUT Value point value did not match dataset rank" - log.warn(msg) - raise HTTPBadRequest(reason=msg) - for i in range(rank): - if point[i] < 0 or point[i] >= dims[i]: - msg = f"PUT Value point: {point} is not within the " - msg += "bounds of the dataset" - log.warn(msg) - raise HTTPBadRequest(reason=msg) - chunk_id = getChunkId(dset_id, point, layout) - # get the pt_indx element from the input data - value = arr[pt_indx] - if chunk_id not in chunk_dict: - point_list = [ - point, - ] - point_data = [ - value, - ] - chunk_dict[chunk_id] = {"indices": point_list, "points": point_data} - else: - item = chunk_dict[chunk_id] - point_list = item["indices"] - point_list.append(point) - point_data = item["points"] - point_data.append(value) - - num_chunks = len(chunk_dict) - log.debug(f"num_chunks: {num_chunks}") - max_chunks = int(config.get("max_chunks_per_request", default=1000)) - if num_chunks > max_chunks: - msg = f"PUT value request with more than {max_chunks} chunks" - log.warn(msg) - - chunk_ids = list(chunk_dict.keys()) - chunk_ids.sort() - - crawler = ChunkCrawler( - app, - chunk_ids, - dset_json=dset_json, - bucket=bucket, - points=chunk_dict, - action="write_point_sel", - ) - await crawler.crawl() - - crawler_status = crawler.get_status() - - if crawler_status == 400: - log.info(f"doWriteSelection raising BadRequest error: {crawler_status}") - raise HTTPBadRequest() - if crawler_status not in (200, 201): - log.info( - f"doWriteSelection raising HTTPInternalServerError for status: {crawler_status}" - ) - raise HTTPInternalServerError() + kwargs = {"points": points, "data": arr, "dset_json": dset_json, "bucket": bucket} + await _doPointWrite(app, request, **kwargs) # write successful @@ -807,26 +991,15 @@ async def GET_Value(request): await validateAction(app, domain, dset_id, username, "read") # Get query parameter for selection - select = params.get("select") - if select: - log.debug(f"select query param: {select}") - try: - slices = get_slices(select, dset_json) - except ValueError as ve: - log.warn(f"Invalid selection: {ve}") - raise HTTPBadRequest(reason="Invalid selection") + slices = _getSelect(params, dset_json) + + # dtype for selection, or just dset_dtype if no fields are given + select_dtype = _getSelectDtype(params, dset_dtype) log.debug(f"GET Value selection: {slices}") + log.debug(f"dset_dtype: {dset_dtype}, select_dtype: {select_dtype}") - limit = 0 - if "Limit" in params: - try: - limit = int(params["Limit"]) - log.debug(f"limit: {limit}") - except ValueError: - msg = "Invalid Limit query param" - log.warn(msg) - raise HTTPBadRequest(reason=msg) + limit = _getLimit(params) if "ignore_nan" in params and params["ignore_nan"]: ignore_nan = True @@ -834,23 +1007,7 @@ async def GET_Value(request): ignore_nan = False log.debug(f"ignore nan: {ignore_nan}") - query = params.get("query") - if query: - log.debug(f"got query: {query}") - try: - parser = BooleanParser(query) - except Exception: - msg = f"query: {query} is not valid" - log.warn(msg) - raise HTTPBadRequest(reason=msg) - - field_names = set(dset_dtype.names) - variables = parser.getVariables() - for variable in variables: - if variable not in field_names: - msg = f"query variable {variable} not valid" - log.warn(msg) - raise HTTPBadRequest(reason=msg) + query = _getQuery(params, dset_dtype, rank=rank) response_type = getAcceptType(request) @@ -918,9 +1075,7 @@ async def GET_Value(request): page_item_size = VARIABLE_AVG_ITEM_SIZE # random guess of avg item_size else: page_item_size = item_size - pages = getSelectionPagination( - slices, dims, page_item_size, max_request_size - ) + pages = getSelectionPagination(slices, dims, page_item_size, max_request_size) log.debug(f"getSelectionPagination returned: {len(pages)} pages") bytes_streamed = 0 try: @@ -930,19 +1085,21 @@ async def GET_Value(request): msg += f"of {len(pages)}, selection: {page}" log.info(msg) + log.debug("calling getSelectionData!") + arr = await getSelectionData( app, dset_id, dset_json, - page, + slices=page, + select_dtype=select_dtype, query=query, bucket=bucket, - limit=limit, - method=request.method, + limit=limit ) if arr is None or math.prod(arr.shape) == 0: - log.warn(f"no data returend for streaming page: {page_number}") + log.warn(f"no data returned for streaming page: {page_number}") continue log.debug("preparing binary response") @@ -954,9 +1111,8 @@ async def GET_Value(request): if query and limit > 0: query_rows = arr.shape[0] - log.debug( - f"streaming page {page_number} returned {query_rows} rows" - ) + msg = f"streaming page {page_number} returned {query_rows} rows" + log.debug(msg) limit -= query_rows if limit <= 0: log.debug("skipping remaining pages, query limit reached") @@ -968,6 +1124,8 @@ async def GET_Value(request): resp_json["status"] = he.status_code # can't raise a HTTPException here since write is in progress # + except Exception as e: + log.error(f"got {type(e)} exception doing getSelectionData: {e}") finally: msg = f"streaming data for {len(pages)} pages complete, " msg += f"{bytes_streamed} bytes written" @@ -985,11 +1143,11 @@ async def GET_Value(request): app, dset_id, dset_json, - slices, + slices=slices, + select_dtype=select_dtype, query=query, bucket=bucket, - limit=limit, - method=request.method, + limit=limit ) except HTTPException as he: # close the response stream @@ -1076,12 +1234,6 @@ async def POST_Value(request): log.info(f"POST_Value, dataset id: {dset_id}") - username, pswd = getUserPasswordFromRequest(request) - if username is None and app["allow_noauth"]: - username = "default" - else: - await validateUserPassword(app, username, pswd) - domain = getDomainFromRequest(request) if not isValidDomain(domain): msg = f"Invalid domain: {domain}" @@ -1089,14 +1241,18 @@ async def POST_Value(request): raise HTTPBadRequest(reason=msg) bucket = getBucketForDomain(domain) + username, pswd = getUserPasswordFromRequest(request) + if username is None and app["allow_noauth"]: + username = "default" + else: + await validateUserPassword(app, username, pswd) + await validateAction(app, domain, dset_id, username, "read") + accept_type = getAcceptType(request) response_type = accept_type # will adjust later if binary not possible params = request.rel_url.query - if "ignore_nan" in params and params["ignore_nan"]: - ignore_nan = True - else: - ignore_nan = False + ignore_nan = _isIgnoreNan(params) request_type = getContentType(request) log.debug(f"POST value - request_type is {request_type}") @@ -1109,28 +1265,28 @@ async def POST_Value(request): # get state for dataset from DN. dset_json = await getDsetJson(app, dset_id, bucket=bucket) - datashape = dset_json["shape"] - if datashape["class"] == "H5S_NULL": + if isNullSpace(dset_json): msg = "POST value not supported for datasets with NULL shape" log.warn(msg) raise HTTPBadRequest(reason=msg) - if datashape["class"] == "H5S_SCALAR": + if isScalarSpace(dset_json): msg = "POST value not supported for datasets with SCALAR shape" log.warn(msg) raise HTTPBadRequest(reason=msg) + datashape = dset_json["shape"] dims = getShapeDims(datashape) rank = len(dims) type_json = dset_json["type"] item_size = getItemSize(type_json) + dset_dtype = createDataType(type_json) log.debug(f"item size: {item_size}") - await validateAction(app, domain, dset_id, username, "read") - # read body data slices = None # this will be set for hyperslab selection points = None # this will be set for point selection point_dt = np.dtype("u8") # use unsigned long for point index + if request_type == "json": try: body = await request.json() @@ -1138,6 +1294,17 @@ async def POST_Value(request): msg = "Unable to load JSON body" log.warn(msg) raise HTTPBadRequest(reason=msg) + + if _isSelect(params, body=body) and "points" in body: + msg = "Unexpected points and select key in request body" + log.warn(msg) + raise HTTPBadRequest(reason=msg) + + slices = _getSelect(params, dset_json, body=body) + select_dtype = _getSelectDtype(params, dset_dtype, body=body) + log.debug(f"got select_dtype: {select_dtype}") + + if request_type == "json": if "points" in body: points_list = body["points"] if not isinstance(points_list, list): @@ -1146,16 +1313,8 @@ async def POST_Value(request): raise HTTPBadRequest(reason=msg) points = np.asarray(points_list, dtype=point_dt) log.debug(f"get {len(points)} points from json request") - elif "select" in body: - select = body["select"] - log.debug(f"select: {select}") - try: - slices = get_slices(select, dset_json) - except ValueError as ve: - log.warn(f"Invalid selection: {ve}") - raise HTTPBadRequest(reason="Invalid selection") - log.debug(f"got slices: {slices}") - else: + + elif not _isSelect(params, body=body): msg = "Expected points or select key in request body" log.warn(msg) raise HTTPBadRequest(reason=msg) @@ -1187,19 +1346,13 @@ async def POST_Value(request): if points is not None: log.debug(f"got {len(points)} num_points") - # get expected content_length - item_size = getItemSize(type_json) - log.debug(f"item size: {item_size}") - # get the shape of the response array - if slices: + if _isSelect(params, body=body): # hyperslab post np_shape = getSelectionShape(slices) else: # point selection - np_shape = [ - len(points), - ] + np_shape = [len(points), ] log.debug(f"selection shape: {np_shape}") @@ -1262,10 +1415,11 @@ async def POST_Value(request): await resp.prepare(request) kwargs = {"bucket": bucket} - if slices is not None: + if points is None: kwargs["slices"] = slices - if points is not None: + else: kwargs["points"] = points + kwargs["select_dtype"] = select_dtype log.debug(f"getSelectionData kwargs: {kwargs}") arr_rsp = await getSelectionData(app, dset_id, dset_json, **kwargs) diff --git a/hsds/dset_dn.py b/hsds/dset_dn.py index e250bde9..780f006c 100755 --- a/hsds/dset_dn.py +++ b/hsds/dset_dn.py @@ -206,7 +206,7 @@ async def PUT_DatasetShape(request): dset_id = request.match_info.get("id") if not isValidUuid(dset_id, obj_class="dataset"): - log.error("Unexpected type_id: {}".format(dset_id)) + log.error(f"Unexpected dset_id: {dset_id}") raise HTTPInternalServerError() body = await request.json() diff --git a/hsds/dset_lib.py b/hsds/dset_lib.py index 5f93b1ea..5534d2b6 100755 --- a/hsds/dset_lib.py +++ b/hsds/dset_lib.py @@ -15,19 +15,19 @@ import numpy as np from aiohttp.client_exceptions import ClientError -from aiohttp.web_exceptions import HTTPBadRequest, HTTPInternalServerError -from .util.hdf5dtype import createDataType, getItemSize +from aiohttp.web_exceptions import HTTPBadRequest, HTTPConflict, HTTPInternalServerError from .util.arrayUtil import getNumpyValue +from .util.boolparser import BooleanParser from .util.dsetUtil import isNullSpace, getDatasetLayout, getDatasetLayoutClass from .util.dsetUtil import getChunkLayout, getSelectionShape, getShapeDims, get_slices from .util.chunkUtil import getChunkCoordinate, getChunkIndex, getChunkSuffix from .util.chunkUtil import getNumChunks, getChunkIds, getChunkId from .util.chunkUtil import getChunkCoverage, getDataCoverage from .util.chunkUtil import getQueryDtype, get_chunktable_dims - +from .util.hdf5dtype import createDataType, getItemSize +from .util.httpUtil import http_delete, http_put from .util.idUtil import getDataNodeUrl, isSchema2Id, getS3Key, getObjId from .util.storUtil import getStorKeys -from .util.httpUtil import http_delete from .servicenode_lib import getDsetJson from .chunk_crawl import ChunkCrawler @@ -237,10 +237,8 @@ def getChunkItem(chunkid): table_factor = table_factors[0] for i in range(num_chunks): chunk_id = chunk_ids[i] - log.debug(f"chunk_id: {chunk_id}") chunk_index = getChunkIndex(chunk_id) chunk_index = chunk_index[0] - log.debug(f"chunk_index: {chunk_index}") for j in range(table_factor): index = chunk_index * table_factor + j arr_index = i * table_factor + j @@ -253,12 +251,10 @@ def getChunkItem(chunkid): arr_points = np.zeros((num_chunks, rank), dtype=np.dtype("u8")) for i in range(num_chunks): chunk_id = chunk_ids[i] - log.debug(f"chunk_id for chunktable: {chunk_id}") indx = getChunkIndex(chunk_id) - log.debug(f"get chunk indx: {indx}") arr_points[i] = indx - msg = f"got chunktable points: {arr_points}, calling getSelectionData" + msg = f"got chunktable - {len(arr_points)} entries, calling getSelectionData" log.debug(msg) # this call won't lead to a circular loop of calls since we've checked # that the chunktable layout is not H5D_CHUNKED_REF_INDIRECT @@ -313,7 +309,7 @@ def getChunkItem(chunkid): log.error(f"Unexpected chunk layout: {layout['class']}") raise HTTPInternalServerError() - log.debug(f"returning chunkinfo_map: {chunkinfo_map}") + log.debug(f"returning chunkinfo_map: {len(chunkinfo_map)} items") return chunkinfo_map @@ -321,7 +317,7 @@ def get_chunkmap_selections(chunk_map, chunk_ids, slices, dset_json): """Update chunk_map with chunk and data selections for the given set of slices """ - log.debug(f"get_chunkmap_selections - chunk_ids: {chunk_ids}") + log.debug(f"get_chunkmap_selections - {len(chunk_ids)} chunk_ids") if not slices: log.debug("no slices set, returning") return # nothing to do @@ -348,7 +344,7 @@ def get_chunk_selections(chunk_map, chunk_ids, slices, dset_json): """Update chunk_map with chunk and data selections for the given set of slices """ - log.debug(f"get_chunk_selections - chunk_ids: {chunk_ids}") + log.debug(f"get_chunk_selections - {len(chunk_ids)} chunk_ids") if not slices: log.debug("no slices set, returning") return # nothing to do @@ -362,26 +358,47 @@ def get_chunk_selections(chunk_map, chunk_ids, slices, dset_json): chunk_map[chunk_id] = item chunk_sel = getChunkCoverage(chunk_id, slices, layout) - log.debug( - f"get_chunk_selections - chunk_id: {chunk_id}, chunk_sel: {chunk_sel}" - ) + msg = f"get_chunk_selections - chunk_id: {chunk_id}, chunk_sel: {chunk_sel}" + log.debug(msg) item["chunk_sel"] = chunk_sel data_sel = getDataCoverage(chunk_id, slices, layout) log.debug(f"get_chunk_selections - data_sel: {data_sel}") item["data_sel"] = data_sel +def getParser(query, dtype): + """ get query BooleanParser. If query contains variables that + arent' part of the data type, throw a HTTPBadRequest exception. """ + + try: + parser = BooleanParser(query) + except Exception: + msg = f"query: {query} is not valid" + log.warn(msg) + raise HTTPBadRequest(reason=msg) + + field_names = set(dtype.names) + variables = parser.getVariables() + for variable in variables: + if variable not in field_names: + msg = f"query variable {variable} not valid" + log.warn(msg) + raise HTTPBadRequest(reason=msg) + + return parser + + async def getSelectionData( app, dset_id, dset_json, slices=None, + select_dtype=None, points=None, query=None, query_update=None, bucket=None, - limit=0, - method="GET", + limit=0 ): """Read selected slices and return numpy array""" log.debug("getSelectionData") @@ -441,17 +458,14 @@ async def getSelectionData( # get chunk selections for hyperslab select get_chunk_selections(chunkinfo, chunk_ids, slices, dset_json) - log.debug(f"chunkinfo_map: {chunkinfo}") - - if method == "OPTIONS": - # skip doing any big data load for options request - return None + log.debug(f"chunkinfo_map: {len(chunkinfo)} items") arr = await doReadSelection( app, chunk_ids, dset_json, slices=slices, + select_dtype=select_dtype, points=points, query=query, query_update=query_update, @@ -468,6 +482,7 @@ async def doReadSelection( chunk_ids, dset_json, slices=None, + select_dtype=None, points=None, query=None, query_update=None, @@ -477,26 +492,31 @@ async def doReadSelection( ): """read selection utility function""" log.info(f"doReadSelection - number of chunk_ids: {len(chunk_ids)}") - log.debug(f"doReadSelection - chunk_ids: {chunk_ids}") + if len(chunk_ids) < 10: + log.debug(f"chunk_ids: {chunk_ids}") + else: + log.debug(f"chunk_ids: {chunk_ids[:10]} ...") + log.debug(f"doReadSelection - select_dtype: {select_dtype}") type_json = dset_json["type"] item_size = getItemSize(type_json) log.debug(f"item size: {item_size}") dset_dtype = createDataType(type_json) # np datatype + if select_dtype is None: + select_dtype = dset_dtype if query is None: query_dtype = None else: log.debug(f"query: {query} limit: {limit}") - query_dtype = getQueryDtype(dset_dtype) + query_dtype = getQueryDtype(select_dtype) + log.debug(f"query_dtype: {query_dtype}") # create array to hold response data arr = None if points is not None: # point selection - np_shape = [ - len(points), - ] + np_shape = [len(points), ] elif query is not None: # return shape will be determined by number of matches np_shape = None @@ -527,10 +547,10 @@ async def doReadSelection( fill_value = getFillValue(dset_json) if fill_value is not None: - arr = np.empty(np_shape, dtype=dset_dtype, order="C") + arr = np.empty(np_shape, dtype=select_dtype, order="C") arr[...] = fill_value else: - arr = np.zeros(np_shape, dtype=dset_dtype, order="C") + arr = np.zeros(np_shape, dtype=select_dtype, order="C") crawler = ChunkCrawler( app, @@ -543,6 +563,7 @@ async def doReadSelection( query_update=query_update, limit=limit, arr=arr, + select_dtype=select_dtype, action="read_chunk_hyperslab", ) await crawler.crawl() @@ -554,9 +575,8 @@ async def doReadSelection( log.info(f"doReadSelection raising BadRequest error: {crawler_status}") raise HTTPBadRequest() if crawler_status not in (200, 201): - log.info( - f"doReadSelection raising HTTPInternalServerError for status: {crawler_status}" - ) + msg = f"doReadSelection raising HTTPInternalServerError for status: {crawler_status}" + log.info(msg) raise HTTPInternalServerError() if query is not None: @@ -695,6 +715,77 @@ async def getAllocatedChunkIds(app, dset_id, bucket=None): return chunk_ids +async def extendShape(app, dset_json, nelements, axis=0, bucket=None): + """ extend the shape of the dataset by nelements along given axis """ + dset_id = dset_json["id"] + datashape = dset_json["shape"] + dims = getShapeDims(datashape) + rank = len(dims) + log.info(f"extendShape of {dset_id} dims: {dims} by {nelements} on axis: {axis}") + # do some sanity checks here + if rank == 0: + msg = "can't change shape of scalar or null space dataset" + log.warn(msg) + raise HTTPBadRequest(reason=msg) + if axis >= rank: + msg = f"extendShape, invalid axis {axis} for dataset of rank: {rank}" + log.warn(msg) + raise HTTPBadRequest(reason=msg) + + req = getDataNodeUrl(app, dset_id) + "/datasets/" + dset_id + "/shape" + body = {"extend": nelements, "extend_dim": axis} + params = {} + if bucket: + params["bucket"] = bucket + selection = None + try: + shape_rsp = await http_put(app, req, data=body, params=params) + log.info(f"got shape put rsp: {shape_rsp}") + if "selection" in shape_rsp: + selection = shape_rsp["selection"] + except HTTPConflict: + log.warn("got 409 extending dataspace for PUT value") + raise + if not selection: + log.error("expected to get selection in PUT shape response") + raise HTTPInternalServerError() + + # selection should be in the format [:,n:m,:]. + # extract n and m and use it to update the slice for the + # appending dimension + if not selection.startswith("[") or not selection.endswith("]"): + log.error("Unexpected selection in PUT shape response") + raise HTTPInternalServerError() + selection = selection[1:-1] # strip off bracketss + fields = selection.split(",") + msg = f"extendShape - unexpected response for dataset of rank {rank}: {selection}" + if len(fields) != rank: + log.error(msg) + raise HTTPInternalServerError() + slices = [] + for (field, extent) in zip(fields, dims): + if field == ":": + s = slice(0, extent, 1) + else: + bounds = field.split(":") + if len(bounds) != 2: + # reuse msg + log.error(msg) + raise HTTPInternalServerError() + try: + lb = int(bounds[0]) + ub = int(bounds[1]) + except ValueError: + # reuse msg + log.error(msg) + raise HTTPInternalServerError() + s = slice(lb, ub, 1) + slices.append(s) + + log.debug(f"extendShape returning slices: {slices}") + return slices + + async def reduceShape(app, dset_json, shape_update, bucket=None): """ Given an existing dataset and a new shape, Reinitialize any edge chunks and delete any chunks diff --git a/hsds/hsds_logger.py b/hsds/hsds_logger.py index d7a623c9..3421d981 100644 --- a/hsds/hsds_logger.py +++ b/hsds/hsds_logger.py @@ -158,6 +158,8 @@ def request(req): if max_task_count and active_tasks > max_task_count: warning(f"more than {max_task_count} tasks, returning 503") raise HTTPServiceUnavailable() + else: + debug(f"active_tasks: {active_tasks} max_tasks: {max_task_count}") def response(req, resp=None, code=None, message=None): @@ -178,9 +180,28 @@ def response(req, resp=None, code=None, message=None): log_level = config["log_level"] - if log_level <= level: + if log_level == DEBUG: prefix = config["prefix"] ts = _timestamp() + num_tasks = len(asyncio.all_tasks()) + active_tasks = _activeTaskCount() + + debug(f"rsp - num tasks: {num_tasks} active tasks: {active_tasks}") + s = "{}{} RSP> <{}> ({}): {}" print(s.format(prefix, ts, code, message, req.path)) + + elif log_level <= level: + prefix = config["prefix"] + ts = _timestamp() + + num_tasks = len(asyncio.all_tasks()) + active_tasks = _activeTaskCount() + + debug(f"num tasks: {num_tasks} active tasks: {active_tasks}") + + s = "{}{} RSP> <{}> ({}): {}" + print(s.format(prefix, ts, code, message, req.path)) + else: + pass diff --git a/hsds/util/chunkUtil.py b/hsds/util/chunkUtil.py index 1e043fad..fef87e57 100644 --- a/hsds/util/chunkUtil.py +++ b/hsds/util/chunkUtil.py @@ -491,9 +491,7 @@ def getChunkIds(dset_id, selection, layout, dim=0, prefix=None, chunk_ids=None): chunk_ids.append(chunk_id) else: chunk_id += "_" # dimension seperator - getChunkIds( - dset_id, selection, layout, dim + 1, chunk_id, chunk_ids - ) + getChunkIds(dset_id, selection, layout, dim + 1, chunk_id, chunk_ids) last_chunk_index = chunk_index # got the complete list, return it! @@ -747,7 +745,7 @@ def next(self): return chunk_id -def chunkReadSelection(chunk_arr, slices=None): +def chunkReadSelection(chunk_arr, slices=None, select_dt=None): """ Return data from requested chunk and selection """ @@ -763,6 +761,10 @@ def chunkReadSelection(chunk_arr, slices=None): log.debug(f"got selection: {slices}") slices = tuple(slices) + if select_dt is None: + # no field selection + select_dt = chunk_arr.dtype + if len(slices) != rank: msg = "Selection rank does not match shape rank" raise ValueError(msg) @@ -773,6 +775,21 @@ def chunkReadSelection(chunk_arr, slices=None): # get requested data output_arr = chunk_arr[slices] + if len(select_dt) < len(dt): + # do a field selection + log.debug(f"select_dtype: {select_dt}") + # create an array with just the given fields + arr = np.zeros(output_arr.shape, select_dt) + # slot in each of the given fields + fields = select_dt.names + if len(fields) > 1: + for field in fields: + arr[field] = output_arr[field] + log.debug(f"arr: {arr}") + else: + arr[...] = output_arr[fields[0]] + output_arr = arr # return this + return output_arr @@ -780,12 +797,15 @@ def chunkWriteSelection(chunk_arr=None, slices=None, data=None): """ Write data for requested chunk and selection """ + + log.debug(f"chunkWriteSelection for slices: {slices}") dims = chunk_arr.shape + log.debug(f"data: {data}") rank = len(dims) if rank == 0: - msg = "No dimension passed to chunkReadSelection" + msg = "No dimension passed to chunkWriteSelection" log.error(msg) raise ValueError(msg) if len(slices) != rank: @@ -797,25 +817,57 @@ def chunkWriteSelection(chunk_arr=None, slices=None, data=None): log.error(msg) raise ValueError(msg) + field_update = False + if len(data.dtype) > 0: + if len(data.dtype) < len(chunk_arr.dtype): + field_update = True + log.debug(f"ChunkWriteSelection for fields: {data.dtype.names}") + else: + log.debug("ChunkWriteSelection for all fields") + updated = False - # check if the new data modifies the array or not - # TBD - is this worth the cost of comparing two arrays element by element? try: - if not ndarray_compare(chunk_arr[slices], data): - # if not np.array_equal(chunk_arr[slices], data): - # update chunk array - chunk_arr[slices] = data - updated = True + if field_update: + arr = chunk_arr[slices] + # update each field of the selected region in the chunk + updated = False + field_updates = [] + for field in data.dtype.names: + if not ndarray_compare(arr[field], data[field]): + # update the field + arr[field] = data[field] + updated = True + field_updates.append(field) + if updated: + # write back to the chunk + chunk_arr[slices] = arr[...] + log.debug(f"updated chunk arr for fields: {field_updates}") + else: + # check if the new data modifies the array or not + # TBD - is this worth the cost of comparing two arrays element by element? + log.debug(f"ndcompare: {chunk_arr[slices]} to {data}") + if not ndarray_compare(chunk_arr[slices], data): + # update chunk array + chunk_arr[slices] = data + updated = True except ValueError as ve: msg = f"array_equal ValueError, chunk_arr[{slices}]: {chunk_arr[slices]} " msg += f"data: {data}, data type: {type(data)} ve: {ve}" log.error(msg) raise + log.debug(f"ChunkWriteSelection - chunk updated: {updated}") + log.debug(f"chunk_arr: {chunk_arr}") + return updated -def chunkReadPoints(chunk_id=None, chunk_layout=None, chunk_arr=None, point_arr=None): +def chunkReadPoints(chunk_id=None, + chunk_layout=None, + chunk_arr=None, + point_arr=None, + select_dt=None + ): """ Read points from given chunk """ @@ -830,6 +882,8 @@ def chunkReadPoints(chunk_id=None, chunk_layout=None, chunk_arr=None, point_arr= raise ValueError(msg) dset_dtype = chunk_arr.dtype + if select_dt is None: + select_dt = dset_dtype # no field selection # verify chunk_layout if len(chunk_layout) != rank: @@ -853,19 +907,32 @@ def chunkReadPoints(chunk_id=None, chunk_layout=None, chunk_arr=None, point_arr= log.debug(f"got {num_points} points") - output_arr = np.zeros((num_points,), dtype=dset_dtype) + output_arr = np.zeros((num_points,), dtype=select_dt) chunk_coord = getChunkCoordinate(chunk_id, chunk_layout) for i in range(num_points): + # TBD: there's likely a better way to do this that + # doesn't require iterating through each point... point = point_arr[i, :] tr_point = getChunkRelativePoint(chunk_coord, point) val = chunk_arr[tuple(tr_point)] + if len(select_dt) < len(dset_dtype): + # just update the relevant fields + subfield_val = [] + for (x, field) in zip(val, dset_dtype.names): + if field in select_dt.names: + subfield_val.append(x) + val = tuple(subfield_val) output_arr[i] = val return output_arr -def chunkWritePoints(chunk_id=None, chunk_layout=None, chunk_arr=None, point_arr=None): +def chunkWritePoints(chunk_id=None, + chunk_layout=None, + chunk_arr=None, + point_arr=None, + select_dt=None): """ Write points to given chunk """ @@ -883,11 +950,13 @@ def chunkWritePoints(chunk_id=None, chunk_layout=None, chunk_arr=None, point_arr msg = "Expected point array to be one dimensional" raise ValueError(msg) dset_dtype = chunk_arr.dtype + if select_dt is None: + select_dt = dset_dtype # no field selection log.debug(f"dtype: {dset_dtype}") log.debug(f"point_arr: {point_arr}") # point_arr should have the following type: - # (coord1, coord2, ...) | dset_dtype + # (coord1, coord2, ...) | select_dtype comp_dtype = point_arr.dtype if len(comp_dtype) != 2: msg = "expected compound type for point array" @@ -905,7 +974,7 @@ def chunkWritePoints(chunk_id=None, chunk_layout=None, chunk_arr=None, point_arr msg = "unexpected shape for point array" raise ValueError(msg) dt_1 = comp_dtype[1] - if dt_1 != dset_dtype: + if dt_1 != select_dt: msg = "unexpected dtype for point array" raise ValueError(msg) @@ -932,6 +1001,17 @@ def chunkWritePoints(chunk_id=None, chunk_layout=None, chunk_arr=None, point_arr log.debug(f"relative coordinate: {coord}") val = elem[1] # value + if len(select_dt) < len(dset_dtype): + # get the element from the chunk + chunk_val = list(chunk_arr[coord]) + # and just update the relevant fields + index = 0 + for (x, field) in zip(val, dset_dtype.names): + if field in select_dt.names: + chunk_val[index] = x + index += 1 + val = tuple(chunk_val) # this will get written back + chunk_arr[coord] = val # update the point @@ -1026,9 +1106,7 @@ def getQueryDtype(dt): else: break - dt_fields = [ - (index_name, "uint64"), - ] + dt_fields = [(index_name, "uint64"), ] for i in range(len(dt)): dt_fields.append((dt.names[i], dt[i])) query_dt = np.dtype(dt_fields) @@ -1043,14 +1121,15 @@ def chunkQuery( slices=None, query=None, query_update=None, + select_dt=None, limit=0, ): """ Run query on chunk and selection """ - log.debug( - f"chunkQuery - chunk_id: {chunk_id} query: {query} slices: {slices}, limit: {limit}" - ) + msg = f"chunkQuery - chunk_id: {chunk_id} query: {query} slices: {slices}, " + msg += f"limit: {limit} select_dt: {select_dt}" + log.debug(msg) if not isinstance(chunk_arr, np.ndarray): raise TypeError("unexpected array type") @@ -1060,6 +1139,8 @@ def chunkQuery( rank = len(dims) dset_dt = chunk_arr.dtype + if select_dt is None: + select_dt = dset_dt if rank != 1: msg = "Query operations only supported on one-dimensional datasets" @@ -1149,9 +1230,10 @@ def chunkQuery( for i in range(nrows): where_indices[i] = where_indices[i] + (s.step - 1) * i - dt_rsp = getQueryDtype(dset_dt) + dt_rsp = getQueryDtype(select_dt) # construct response array rsp_arr = np.zeros((nrows,), dtype=dt_rsp) + field_names = select_dt.names for field in field_names: rsp_arr[field] = where_result[field] index_name = dt_rsp.names[0] diff --git a/hsds/util/dsetUtil.py b/hsds/util/dsetUtil.py index f610a5d4..330f8aed 100644 --- a/hsds/util/dsetUtil.py +++ b/hsds/util/dsetUtil.py @@ -234,6 +234,27 @@ def isNullSpace(dset_json): return False +def isScalarSpace(dset_json): + """ return true if this is a scalar dataset """ + datashape = dset_json["shape"] + is_scalar = False + if datashape["class"] == "H5S_NULL": + is_scalar = False + elif datashape["class"] == "H5S_SCALAR": + is_scalar = True + else: + if "dims" not in datashape: + log.warn(f"expected to find dims key in shape_json: {datashape}") + is_scalar = False + else: + dims = datashape["dims"] + if len(dims) == 0: + # guess this properly be a H5S_SCALAR class + # but treat this as equivalent + is_scalar = True + return is_scalar + + def getHyperslabSelection(dsetshape, start=None, stop=None, step=None): """ Get slices given lists of start, stop, step values @@ -376,6 +397,24 @@ def getShapeDims(shape): return dims +def isSelectAll(slices, dims): + """ return True if the selection covers the entire dataspace """ + if len(slices) != len(dims): + raise ValueError("isSelectAll - dimensions don't match") + is_all = True + for (s, dim) in zip(slices, dims): + if s.step is not None and s.step != 1: + is_all = False + break + if s.start != 0: + is_all = False + break + if s.stop != dim: + is_all = False + break + return is_all + + def getQueryParameter(request, query_name, body=None, default=None): """ Herlper function, get query parameter value from request. @@ -428,29 +467,22 @@ def _getSelectionStringFromRequestBody(body): raise KeyError("no start key") start_val = body["start"] if not isinstance(start_val, (list, tuple)): - start_val = [ - start_val, - ] + start_val = [start_val, ] rank = len(start_val) if "stop" not in body: raise KeyError("no stop key") stop_val = body["stop"] if not isinstance(stop_val, (list, tuple)): - stop_val = [ - stop_val, - ] + stop_val = [stop_val, ] if len(stop_val) != rank: raise ValueError("start and stop values have different ranks") if "step" in body: step_val = body["step"] if not isinstance(step_val, (list, tuple)): - step_val = [ - step_val, - ] + step_val = [step_val, ] if len(step_val) != rank: - raise ValueError( - "step values have differnt rank from start and stop selections" - ) + msg = "step values have differnt rank from start and stop selections" + raise ValueError(msg) else: step_val = None selection = [] @@ -518,7 +550,7 @@ def _getSelectElements(select): def getSelectionList(select, dims): """Return tuple of slices and/or coordinate list for the given selection""" select_list = [] - + log.debug(f"getSelectionList, {select} dims: {dims}") if isinstance(select, dict): select = _getSelectionStringFromRequestBody(select) @@ -577,9 +609,8 @@ def getSelectionList(select, dims): except ValueError: raise ValueError(f"Invalid selection - start value for dim {dim}") if start < 0 or start >= extent: - raise ValueError( - f"Invalid selection - start value out of range for dim {dim}" - ) + msg = f"Invalid selection - start value out of range for dim {dim}" + raise ValueError(msg) if len(fields[1]) == 0: stop = extent else: @@ -588,9 +619,8 @@ def getSelectionList(select, dims): except ValueError: raise ValueError(f"Invalid selection - stop value for dim {dim}") if stop < 0 or stop > extent or stop <= start: - raise ValueError( - f"Invalid selection - stop value out of range for dim {dim}" - ) + msg = f"Invalid selection - stop value out of range for dim {dim}" + raise ValueError(msg) if len(fields) == 3: # get step value if len(fields[2]) == 0: @@ -599,13 +629,11 @@ def getSelectionList(select, dims): try: step = int(fields[2]) except ValueError: - raise ValueError( - f"Invalid selection - step value for dim {dim}" - ) + msg = f"Invalid selection - step value for dim {dim}" + raise ValueError(msg) if step <= 0: - raise ValueError( - f"Invalid selection - step value out of range for dim {dim}" - ) + msg = f"Invalid selection - step value out of range for dim {dim}" + raise ValueError(msg) else: step = 1 s = slice(start, stop, step) @@ -617,13 +645,12 @@ def getSelectionList(select, dims): except ValueError: raise ValueError(f"Invalid selection - index value for dim {dim}") if index < 0 or index >= extent: - raise ValueError( - f"Invalid selection - index value out of range for dim {dim}" - ) - + msg = f"Invalid selection - index value out of range for dim {dim}" + raise ValueError(msg) s = slice(index, index + 1, 1) select_list.append(s) # end dimension loop + log.debug(f"select_list: {select_list}") return tuple(select_list) @@ -883,12 +910,13 @@ def getChunkLayout(dset_json): def getChunkInitializer(dset_json): """ get initializer application and arguments if set """ initializer = None - log.debug(f"getChunkInitializer({dset_json})") if "creationProperties" in dset_json: cprops = dset_json["creationProperties"] log.debug(f"get creationProperties: {cprops}") if "initializer" in cprops: initializer = cprops["initializer"] + dset_id = dset_json["id"] + log.debug(f"returning chunk initializer: {initializer} for dset: {dset_id}") return initializer diff --git a/hsds/util/hdf5dtype.py b/hsds/util/hdf5dtype.py index 0bfc2628..8d6d123c 100644 --- a/hsds/util/hdf5dtype.py +++ b/hsds/util/hdf5dtype.py @@ -822,3 +822,25 @@ def getBaseTypeJson(type_name): else: raise TypeError("Invalid type name") return type_json + + +def getSubType(dt_parent, fields): + """ Return a dtype that is a compound type composed of + the fields given in the field_names list + """ + if len(dt_parent) == 0: + raise TypeError("getSubType - parent must be compound type") + if not fields: + raise TypeError("null field specification") + if isinstance(fields, str): + fields = [fields,] # convert to a list + + field_names = set(dt_parent.names) + dt_items = [] + for field in fields: + if field not in field_names: + raise TypeError(f"field: {field} is not defined in parent type") + dt_items.append((field, dt_parent[field])) + dt = np.dtype(dt_items) + + return dt diff --git a/hsds/util/rangegetUtil.py b/hsds/util/rangegetUtil.py index 5c7846fc..40c13fd4 100644 --- a/hsds/util/rangegetUtil.py +++ b/hsds/util/rangegetUtil.py @@ -51,7 +51,7 @@ def _chunk_dist(chunk_left, chunk_right): def _find_min_pair(h5chunks, max_gap=None): """ Given a list of chunk_map entries which are sorted by offset, return the indicies of the two chunks nearest to each other in the file. - If max_gap is set, chunms must be within max_gap bytes + If max_gap is set, chunks must be within max_gap bytes """ num_chunks = len(h5chunks) diff --git a/hsds/util/storUtil.py b/hsds/util/storUtil.py index edbdc9fa..b80b7b3a 100644 --- a/hsds/util/storUtil.py +++ b/hsds/util/storUtil.py @@ -220,10 +220,13 @@ def _uncompress(data, compressor=None, shuffle=0, level=None, dtype=None, chunk_ raise HTTPInternalServerError() if shuffle: data = _unshuffle(shuffle, data, dtype=dtype, chunk_shape=chunk_shape) - finish_time = time.time() - elapsed = finish_time - start_time - msg = f"uncompressed {len(data)} bytes, {(elapsed):.3f}s elapsed" - log.debug(msg) + + if compressor or shuffle: + # log the decompression time + finish_time = time.time() + elapsed = finish_time - start_time + msg = f"uncompressed {len(data)} bytes, {(elapsed):.3f}s elapsed" + log.debug(msg) return data diff --git a/pyproject.toml b/pyproject.toml index e693e13c..482c640d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,7 +31,7 @@ classifiers = [ "Topic :: Software Development :: Libraries :: Python Modules", ] requires-python = ">=3.8" -version = "0.8.4" +version = "0.8.5" dependencies = [ "aiohttp == 3.8.5", diff --git a/tests/integ/pointsel_test.py b/tests/integ/pointsel_test.py index f14ad053..0eddfa9a 100755 --- a/tests/integ/pointsel_test.py +++ b/tests/integ/pointsel_test.py @@ -109,9 +109,7 @@ def testPost1DDataset(self): self.assertTrue("value" in rspJson) ret_value = rspJson["value"] self.assertEqual(len(ret_value), len(points)) - expected_result = [ - 0, - ] * len(points) + expected_result = [0, ] * len(points) self.assertEqual(ret_value, expected_result) # write to the dset @@ -706,15 +704,12 @@ def testPostChunkedRefDataset(self): # do a point selection req = self.endpoint + "/datasets/" + dset_id + "/value" - points = [ - 1234567, - ] + points = [1234567, ] body = {"points": points} rsp = self.session.post(req, data=json.dumps(body), headers=headers) if rsp.status_code == 404: - print( - "s3object: {} not found, skipping point chunk ref test".format(s3path) - ) + msg = "s3object: {s3path} not found, skipping point chunk ref test" + print(msg) else: self.assertEqual(rsp.status_code, 200) rspJson = json.loads(rsp.text) @@ -732,9 +727,9 @@ def testPostChunkedRefIndirectDataset(self): hdf5_sample_bucket = config.get("hdf5_sample_bucket") if not hdf5_sample_bucket: - print( - "hdf5_sample_bucket config not set, skipping testPostChunkedRefIndirectDataset" - ) + msg = "hdf5_sample_bucket config not set, " + msg += "skipping testPostChunkedRefIndirectDataset" + print(msg) return s3path = "s3://" + hdf5_sample_bucket + "/data/hdf5test" + "/snp500.h5" @@ -748,9 +743,7 @@ def testPostChunkedRefIndirectDataset(self): if "snp500.h5" not in snp500_json: self.assertTrue(False) - chunk_dims = [ - 60000, - ] # chunk layout used in snp500.h5 file + chunk_dims = [60000,] # chunk layout used in snp500.h5 file num_chunks = (SNP500_ROWS // chunk_dims[0]) + 1 chunk_info = snp500_json["snp500.h5"] @@ -868,17 +861,13 @@ def testPostChunkedRefIndirectDataset(self): # do a point selection req = self.endpoint + "/datasets/" + dset_id + "/value" - points = [ - 1234567, - ] + points = [1234567, ] body = {"points": points} rsp = self.session.post(req, data=json.dumps(body), headers=headers) if rsp.status_code == 404: - print( - "s3object: {} not found, skipping point read chunk reference indirect test".format( - s3path - ) - ) + msg = f"s3object: {s3path} not found, " + msg += "skipping point read chunk reference indirect test" + print(msg) return self.assertEqual(rsp.status_code, 200) @@ -911,9 +900,7 @@ def testPut1DDataset(self): data["creationProperties"] = { "layout": { "class": "H5D_CHUNKED", - "dims": [ - 20, - ], + "dims": [20, ], } } @@ -960,9 +947,7 @@ def testPut1DDataset(self): 97, ] # write 1's at indexes that are prime - value = [ - 1, - ] * len(primes) + value = [1,] * len(primes) # write 1's to all the prime indexes payload = {"points": primes, "value": value} @@ -1038,9 +1023,7 @@ def testPut2DDataset(self): points = [] for i in range(20): points.append((i, i)) - value = [ - 1, - ] * 20 + value = [1, ] * 20 # write 1's to all the point locations payload = {"points": points, "value": value} @@ -1084,9 +1067,7 @@ def testPut1DDatasetBinary(self): data["creationProperties"] = { "layout": { "class": "H5D_CHUNKED", - "dims": [ - 20, - ], + "dims": [20, ], } } @@ -1348,9 +1329,7 @@ def testScalarDataset(self): self.assertTrue(helper.validateId(dset_id)) # write to the dset - data = [ - 42, - ] + data = [42, ] payload = {"value": data} req = self.endpoint + "/datasets/" + dset_id + "/value" @@ -1358,9 +1337,7 @@ def testScalarDataset(self): rsp = self.session.put(req, data=json.dumps(payload), headers=headers) self.assertEqual(rsp.status_code, 200) - points = [ - 0, - ] + points = [0, ] body = {"points": points} # read selected points rsp = self.session.post(req, data=json.dumps(body), headers=headers) @@ -1578,6 +1555,119 @@ def testSelect2DDataset(self): self.assertEqual(len(data), 3 * 4) self.assertEqual(data, b"\x1e\x00\x00\x00 \x00\x00\x00#\x00\x00\x00") + def testPostCompoundDataset(self): + + # Test selecting points in a compound dataset using POST value + print("testPostCompoundDataset", self.base_domain) + + points = [ + 2, + 3, + 5, + 7, + 11, + 13, + 17, + 19, + 23, + 29, + 31, + 37, + 41, + 43, + 47, + 53, + 59, + 61, + 67, + 71, + 73, + 79, + 83, + 97, + 98, + ] + + headers = helper.getRequestHeaders(domain=self.base_domain) + req = self.endpoint + "/" + + # Get root uuid + rsp = self.session.get(req, headers=headers) + self.assertEqual(rsp.status_code, 200) + rspJson = json.loads(rsp.text) + root_uuid = rspJson["root"] + helper.validateId(root_uuid) + + # create dataset + # pass in layout specification so that we can test selection across chunk boundries + data = {"type": "H5T_STD_I32LE", "shape": (100,)} + # + # create 1d dataset + # + + field_names = ("x1", "X2", "x3", "X4", "X5") + + fields = [] + for field_name in field_names: + field = {"name": field_name, "type": "H5T_STD_I32LE"} + fields.append(field) + + datatype = {"class": "H5T_COMPOUND", "fields": fields} + + num_elements = 100 + payload = {"type": datatype, "shape": num_elements} + + req = self.endpoint + "/datasets" + rsp = self.session.post(req, data=json.dumps(payload), headers=headers) + self.assertEqual(rsp.status_code, 201) + rspJson = json.loads(rsp.text) + dset_id = rspJson["id"] + self.assertTrue(helper.validateId(dset_id)) + + # link new dataset as 'dset_compound' + name = "dset" + req = self.endpoint + "/groups/" + root_uuid + "/links/" + name + payload = {"id": dset_id} + rsp = self.session.put(req, data=json.dumps(payload), headers=headers) + self.assertEqual(rsp.status_code, 201) + + # try reading points from uninitialized chunks + body = {"points": points} + req = self.endpoint + "/datasets/" + dset_id + "/value" + rsp = self.session.post(req, data=json.dumps(body), headers=headers) + self.assertEqual(rsp.status_code, 200) + rspJson = json.loads(rsp.text) + self.assertTrue("value" in rspJson) + ret_value = rspJson["value"] + self.assertEqual(len(ret_value), len(points)) + for i in range(len(points)): + self.assertEqual(ret_value[i], [0, 0, 0, 0, 0]) + + # write to the dset by fields + for field in field_names: + x = int(field[1]) # get the number part of the field name + data = [(x * i) for i in range(num_elements)] + + payload = {"value": data, "fields": field} + req = self.endpoint + "/datasets/" + dset_id + "/value" + + rsp = self.session.put(req, data=json.dumps(payload), headers=headers) + self.assertEqual(rsp.status_code, 200) + + # read back selected points by field + for field in field_names: + x = int(field[1]) + body = {"points": points, "fields": field} + rsp = self.session.post(req, data=json.dumps(body), headers=headers) + self.assertEqual(rsp.status_code, 200) + rspJson = json.loads(rsp.text) + self.assertTrue("value" in rspJson) + ret_value = rspJson["value"] + self.assertEqual(len(ret_value), len(points)) + for i in range(len(points)): + self.assertEqual(ret_value[i], [x * points[i]]) + return + if __name__ == "__main__": # setup test files diff --git a/tests/integ/query_test.py b/tests/integ/query_test.py index 2ca5c38e..4bfac9a6 100644 --- a/tests/integ/query_test.py +++ b/tests/integ/query_test.py @@ -160,32 +160,54 @@ def verifyQueryRsp(rsp, expected_indices=None, expect_bin=None): req = self.endpoint + "/datasets/" + dset_uuid + "/value" for query_headers in (headers, headers_bin_rsp): + kwargs = {} + if query_headers.get("accept") == "application/octet-stream": - expect_bin = True + kwargs["expect_bin"] = True else: - expect_bin = False + kwargs["expect_bin"] = False + # read first row with AAPL params = {"query": "stock_symbol == b'AAPL'", "Limit": 1} - expected_indices = (1,) rsp = self.session.get(req, params=params, headers=query_headers) - verifyQueryRsp( - rsp, expected_indices=expected_indices, expect_bin=expect_bin - ) + kwargs["expected_indices"] = (1,) + + verifyQueryRsp(rsp, **kwargs) # read all rows with APPL params = {"query": "stock_symbol == b'AAPL'"} rsp = self.session.get(req, params=params, headers=query_headers) - # self.assertTrue("hrefs" in rspJson) - verifyQueryRsp(rsp, expected_indices=(1, 4, 7, 10), expect_bin=expect_bin) + expected_indices = (1, 4, 7, 10) + kwargs["expected_indices"] = expected_indices + + verifyQueryRsp(rsp, **kwargs) + + # return just open and close fields + params = {"query": "stock_symbol == b'AAPL'", "fields": "open:close"} + # just do json to keep the verification simple + rsp = self.session.get(req, params=params, headers=headers) + # need to check this one by hand + self.assertEqual(rsp.status_code, 200) + rspJson = json.loads(rsp.text) + query_rsp = rspJson["value"] + self.assertEqual(len(query_rsp), 4) + for i in range(4): + item = query_rsp[i] + self.assertEqual(len(item), 3) + self.assertEqual(item[0], expected_indices[i]) + # expected_indices will be the same params["select"] = "[2:12]" + del params["fields"] # remove key from last test rsp = self.session.get(req, params=params, headers=query_headers) - verifyQueryRsp(rsp, expected_indices=(4, 7, 10), expect_bin=expect_bin) + kwargs["expected_indices"] = (4, 7, 10) + verifyQueryRsp(rsp, **kwargs) # combine with Limit params["Limit"] = 2 rsp = self.session.get(req, params=params, headers=query_headers) - verifyQueryRsp(rsp, expected_indices=(4, 7), expect_bin=expect_bin) + kwargs["expected_indices"] = (4, 7) + verifyQueryRsp(rsp, **kwargs) # try bad Limit params["Limit"] = "abc" @@ -211,15 +233,16 @@ def verifyQueryRsp(rsp, expected_indices=None, expect_bin=None): params = {"query": "(open > 3000) & (open < 3100)"} rsp = self.session.get(req, params=params, headers=query_headers) self.assertEqual(rsp.status_code, 200) - verifyQueryRsp( - rsp, expected_indices=(0, 1, 3, 5, 11), expect_bin=expect_bin - ) + kwargs["expected_indices"] = (0, 1, 3, 5, 11) + verifyQueryRsp(rsp, **kwargs) # query for a zero sector field (should return none) params = {"query": "open == 0"} # query for zero sector rsp = self.session.get(req, params=params, headers=headers) self.assertEqual(rsp.status_code, 200) - verifyQueryRsp(rsp, expected_indices=(), expect_bin=False) + kwargs["expected_indices"] = () + kwargs["expect_bin"] = False # will always get json for null response + verifyQueryRsp(rsp, **kwargs) def testChunkedRefIndirectDataset(self): print("testChunkedRefIndirectDatasetQuery", self.base_domain) @@ -243,9 +266,7 @@ def testChunkedRefIndirectDataset(self): if "snp500.h5" not in snp500_json: self.assertTrue(False) - chunk_dims = [ - 60000, - ] # chunk layout used in snp500.h5 file + chunk_dims = [60000, ] # chunk layout used in snp500.h5 file num_chunks = (SNP500_ROWS // chunk_dims[0]) + 1 chunk_info = snp500_json["snp500.h5"] @@ -416,7 +437,7 @@ def testPutQuery(self): datatype = {"class": "H5T_COMPOUND", "fields": fields} num_elements = 12 - payload = {"type": datatype, "shape": num_elements} # , 'maxdims': maxdims} + payload = {"type": datatype, "shape": num_elements} req = self.endpoint + "/datasets" rsp = self.session.post(req, data=json.dumps(payload), headers=headers) self.assertEqual(rsp.status_code, 201) # create dataset diff --git a/tests/integ/setup_test.py b/tests/integ/setup_test.py index da501c4d..744bf386 100755 --- a/tests/integ/setup_test.py +++ b/tests/integ/setup_test.py @@ -49,10 +49,7 @@ def testHomeFolders(self): req = helper.getEndpoint() + '/' params = {"domain": home_domain} - print("req:", req) - print("domain:", home_domain) rsp = self.session.get(req, params=params, headers=headers) - print("/home get status:", rsp.status_code) if rsp.status_code == 404: if not admin_headers: @@ -67,7 +64,6 @@ def testHomeFolders(self): data=json.dumps(body), params=params, headers=admin_headers) - print("put request status:", rsp.status_code) self.assertEqual(rsp.status_code, 201) # do the original request again rsp = self.session.get(req, params=params, headers=headers) @@ -76,18 +72,15 @@ def testHomeFolders(self): "set env variable for USER_PASSWORD") self.assertTrue(False) - print("got status code:", rsp.status_code) self.assertEqual(rsp.status_code, 200) rspJson = json.loads(rsp.text) - print("home folder json:", rspJson) for k in ("owner", "created", "lastModified"): self.assertTrue(k in rspJson) self.assertFalse("root" in rspJson) # no root -> folder params = {"domain": user_domain} rsp = self.session.get(req, params=params, headers=headers) - print(f"{user_domain} get status: {rsp.status_code}") if rsp.status_code == 404: if not admin_headers: print(f"{user_domain} folder doesn't exist, set ADMIN_USERNAME " @@ -106,7 +99,6 @@ def testHomeFolders(self): self.assertEqual(rsp.status_code, 200) rspJson = json.loads(rsp.text) - print("user folder:", rspJson) self.assertFalse("root" in rspJson) # no root group for folder domain self.assertTrue("owner" in rspJson) self.assertTrue("hrefs" in rspJson) diff --git a/tests/integ/value_test.py b/tests/integ/value_test.py index 644b2e5d..3d3f53ab 100755 --- a/tests/integ/value_test.py +++ b/tests/integ/value_test.py @@ -1095,9 +1095,54 @@ def testPutCompound(self): self.assertTrue("value" in rspJson) readData = rspJson["value"] + self.assertEqual(len(readData), num_elements) + for i in range(num_elements): + item = readData[i] + self.assertEqual(len(item), 2) + expected = (i * 10, i * 10 + i / 10.0) if i > 0 else (42, 0.42) + self.assertEqual(item[0], expected[0]) + tol = 0.1 # tbd: investiage why results need such a high tolerance + self.assertTrue(abs(item[1] - expected[1]) < tol) + + # read back just the "temp" field of the compound type + params = {"fields": "temp"} + rsp = self.session.get(req, params=params, headers=headers) + self.assertEqual(rsp.status_code, 200) + rspJson = json.loads(rsp.text) + self.assertTrue("hrefs" in rspJson) + self.assertTrue("value" in rspJson) - self.assertEqual(readData[0][0], 42) - self.assertEqual(readData[1][0], 10) + readData = rspJson["value"] + self.assertEqual(len(readData), num_elements) + for i in range(num_elements): + elem = readData[i] + self.assertEqual(len(elem), 1) + expected = i * 10 if i > 0 else 42 + self.assertEqual(elem[0], expected) + + rev_temp = readData[::-1] # reverse the value list + params = {"fields": "temp"} + payload = {"value": rev_temp} + req = self.endpoint + "/datasets/" + dset1d_uuid + "/value" + rsp = self.session.put(req, data=json.dumps(payload), params=params, headers=headers) + self.assertEqual(rsp.status_code, 200) # write value + + # read back the data again + rsp = self.session.get(req, headers=headers) + self.assertEqual(rsp.status_code, 200) + rspJson = json.loads(rsp.text) + + readData = rspJson["value"] + self.assertEqual(len(readData), num_elements) + for i in range(num_elements): + item = readData[i] + self.assertEqual(len(item), 2) + x = (num_elements - i - 1) * 10 if i < 9 else 42 + y = i * 10 + i / 10 if i > 0 else 0.42 + expected = (x, y) + self.assertEqual(item[0], expected[0]) + tol = 0.1 + self.assertTrue(abs(item[1] - expected[1]) < tol) # # create 2d dataset @@ -1145,6 +1190,22 @@ def testPutCompound(self): self.assertEqual(readData[0][1], [0, 0.5]) self.assertEqual(readData[1][1], [10, 10.5]) + # read back just the "temp" field of the compound type + params = {"fields": "temp"} + rsp = self.session.get(req, params=params, headers=headers) + self.assertEqual(rsp.status_code, 200) + rspJson = json.loads(rsp.text) + self.assertTrue("hrefs" in rspJson) + self.assertTrue("value" in rspJson) + + readData = rspJson["value"] + for i in range(dims[0]): + row = readData[i] + for j in range(dims[1]): + item = row[j] + self.assertEqual(len(item), 1) + self.assertEqual(item[0], i * 10) + def testSimpleTypeFillValue(self): # test Dataset with simple type and fill value print("testSimpleTypeFillValue", self.base_domain) @@ -1184,16 +1245,12 @@ def testSimpleTypeFillValue(self): rspJson = json.loads(rsp.text) self.assertTrue("hrefs" in rspJson) self.assertTrue("value" in rspJson) - expected_value = [ - 42, - ] + expected_value = [42, ] expected_value *= 10 self.assertEqual(rspJson["value"], expected_value) # write some values - value = [ - 24, - ] + value = [24, ] value *= 5 payload = {"start": 0, "stop": 5, "value": value} rsp = self.session.put(req, data=json.dumps(payload), headers=headers) @@ -1258,9 +1315,7 @@ def testCompoundFillValue(self): rspJson = json.loads(rsp.text) shape = rspJson["shape"] self.assertEqual(shape["class"], "H5S_SIMPLE") - expected_value = [ - 40, - ] + expected_value = [40, ] self.assertEqual(shape["dims"], expected_value) # read the default values @@ -1344,16 +1399,12 @@ def testBigFillValue(self): rspJson = json.loads(rsp.text) self.assertTrue("hrefs" in rspJson) self.assertTrue("value" in rspJson) - expected_value = [ - fill_value, - ] + expected_value = [fill_value, ] expected_value *= 10 self.assertEqual(rspJson["value"], expected_value) # write some values - value = [ - "hello", - ] + value = ["hello", ] value *= 5 payload = {"start": 0, "stop": 5, "value": value} rsp = self.session.put(req, data=json.dumps(payload), headers=headers) @@ -1427,9 +1478,7 @@ def testNaNFillValue(self): self.assertEqual(ret_values[i], None) # write some values - value = [ - 3.12, - ] + value = [3.12, ] value *= 5 payload = {"start": 0, "stop": 5, "value": value} rsp = self.session.put(req, data=json.dumps(payload), headers=headers) @@ -2084,7 +2133,7 @@ def testAppend2DJson(self): # create the dataset with a 0-sized shape req = self.endpoint + "/datasets" - payload = {"type": "H5T_STD_I32LE", "shape": [0, 0], "maxdims": [0, 0]} + payload = {"type": "H5T_STD_I32LE", "shape": [1, 0], "maxdims": [0, 0]} req = self.endpoint + "/datasets" rsp = self.session.post(req, data=json.dumps(payload), headers=headers) self.assertEqual(rsp.status_code, 201) # create dataset @@ -2420,9 +2469,7 @@ def testGetSelectionChunkedRefDataset(self): # read a selection req = self.endpoint + "/datasets/" + dset_id + "/value" - params = { - "select": "[1234567:1234568]" - } # read 1 element, starting at index 1234567 + params = {"select": "[1234567:1234568]"} # read 1 element, starting at index 1234567 params["nonstrict"] = 1 # allow use of aws lambda if configured rsp = self.session.get(req, params=params, headers=headers) if rsp.status_code == 404: @@ -2444,6 +2491,24 @@ def testGetSelectionChunkedRefDataset(self): self.assertEqual(item[2], 3) # skip check rest of fields since float comparisons are trcky... + # do a select with just the fields: date, symbol, sector + params["fields"] = "date:symbol:sector" + rsp = self.session.get(req, params=params, headers=headers) + self.assertEqual(rsp.status_code, 200) + rspJson = json.loads(rsp.text) + self.assertTrue("hrefs" in rspJson) + self.assertTrue("value" in rspJson) + value = rspJson["value"] + # should get one element back (still have the select param...) + self.assertEqual(len(value), 1) + item = value[0] + + # verify that this is what we expected to get + self.assertEqual(len(item), 3) + self.assertEqual(item[0], "1998.10.22") + self.assertEqual(item[1], "MHFI") + self.assertEqual(item[2], 3) + def testChunkedRefIndirectDataset(self): test_name = "testChunkedRefIndirectDataset" print("testChunkedRefIndirectDataset", self.base_domain)