From 026b9ddb62067120e685db2227fd40def650d427 Mon Sep 17 00:00:00 2001 From: Akhilesh Halageri Date: Tue, 11 Jun 2024 21:17:07 +0000 Subject: [PATCH 1/6] adds latest flag to get timestamps when roots expire --- pychunkedgraph/app/segmentation/common.py | 31 +++++++++++++++++--- pychunkedgraph/app/segmentation/v1/routes.py | 6 +++- pychunkedgraph/graph/chunkedgraph.py | 2 +- 3 files changed, 33 insertions(+), 6 deletions(-) diff --git a/pychunkedgraph/app/segmentation/common.py b/pychunkedgraph/app/segmentation/common.py index ee9da61c1..aadfbefe6 100644 --- a/pychunkedgraph/app/segmentation/common.py +++ b/pychunkedgraph/app/segmentation/common.py @@ -5,6 +5,7 @@ import time from datetime import datetime from functools import reduce +from collections import deque import numpy as np import pandas as pd @@ -1066,7 +1067,7 @@ def handle_is_latest_roots(table_id, is_binary): return cg.is_latest_roots(node_ids, time_stamp=timestamp) -def handle_root_timestamps(table_id, is_binary): +def handle_root_timestamps(table_id, is_binary, latest: bool = False): current_app.request_type = "root_timestamps" current_app.table_id = table_id @@ -1075,11 +1076,33 @@ def handle_root_timestamps(table_id, is_binary): else: node_ids = np.array(json.loads(request.data)["node_ids"], dtype=np.uint64) - # Call ChunkedGraph cg = app_utils.get_cg(table_id) + timestamp = None + if latest: + result = [] + timestamp = _parse_timestamp("timestamp", time.time(), return_datetime=True) + latest_at_ts = cg.is_latest_roots(node_ids, time_stamp=timestamp) + mask = np.array(latest_at_ts, dtype=bool) + non_latest_ids = node_ids[mask] + row_dict = cg.client.read_nodes( + node_ids=non_latest_ids, + properties=attributes.Hierarchy.NewParent, + end_time=timestamp, + ) - timestamps = cg.get_node_timestamps(node_ids, return_numpy=False) - return [ts.timestamp() for ts in timestamps] + new_roots = [] + for v in row_dict.values(): + new_roots.append(v[-1].value) # sorted in descending order + new_roots_ts = deque(cg.get_node_timestamps(new_roots, return_numpy=False)) + for x in latest_at_ts: + if x is True: + result.append(timestamp) + else: + result.append(new_roots_ts.popleft()) + return result + else: + timestamps = cg.get_node_timestamps(node_ids, return_numpy=False) + return [ts.timestamp() for ts in timestamps] ### OPERATION DETAILS ------------------------------------------------------------ diff --git a/pychunkedgraph/app/segmentation/v1/routes.py b/pychunkedgraph/app/segmentation/v1/routes.py index 0f7ac9d9c..e9708bf5e 100644 --- a/pychunkedgraph/app/segmentation/v1/routes.py +++ b/pychunkedgraph/app/segmentation/v1/routes.py @@ -532,7 +532,11 @@ def handle_is_latest_roots(table_id): def handle_root_timestamps(table_id): int64_as_str = request.args.get("int64_as_str", default=False, type=toboolean) is_binary = request.args.get("is_binary", default=False, type=toboolean) - root_timestamps = common.handle_root_timestamps(table_id, is_binary=is_binary) + latest = request.args.get("latest", default=False, type=toboolean) + is_binary = request.args.get("is_binary", default=False, type=toboolean) + root_timestamps = common.handle_root_timestamps( + table_id, is_binary=is_binary, latest=latest + ) resp = {"timestamp": root_timestamps} return jsonify_with_kwargs(resp, int64_as_str=int64_as_str) diff --git a/pychunkedgraph/graph/chunkedgraph.py b/pychunkedgraph/graph/chunkedgraph.py index a5b08ff03..210bff50b 100644 --- a/pychunkedgraph/graph/chunkedgraph.py +++ b/pychunkedgraph/graph/chunkedgraph.py @@ -517,7 +517,7 @@ def is_latest_roots( root_ids: typing.Iterable, time_stamp: typing.Optional[datetime.datetime] = None, ) -> typing.Iterable: - """Determines whether root ids are superseeded.""" + """Determines whether root ids are superseded.""" time_stamp = misc_utils.get_valid_timestamp(time_stamp) row_dict = self.client.read_nodes( From 71bf3c60768fd6fa748f4feec1f89d052b88c33e Mon Sep 17 00:00:00 2001 From: Akhilesh Halageri Date: Wed, 12 Jun 2024 13:10:29 +0000 Subject: [PATCH 2/6] fix: use timestamp from column --- pychunkedgraph/app/segmentation/common.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pychunkedgraph/app/segmentation/common.py b/pychunkedgraph/app/segmentation/common.py index aadfbefe6..efc2260ab 100644 --- a/pychunkedgraph/app/segmentation/common.py +++ b/pychunkedgraph/app/segmentation/common.py @@ -1090,10 +1090,10 @@ def handle_root_timestamps(table_id, is_binary, latest: bool = False): end_time=timestamp, ) - new_roots = [] + new_roots_ts = [] for v in row_dict.values(): - new_roots.append(v[-1].value) # sorted in descending order - new_roots_ts = deque(cg.get_node_timestamps(new_roots, return_numpy=False)) + new_roots_ts.append(v[-1].timestamp.timestamp()) # sorted in descending order + new_roots_ts = deque(new_roots_ts) for x in latest_at_ts: if x is True: result.append(timestamp) From dd690985399658f20f9f2b26550304ab0611bef2 Mon Sep 17 00:00:00 2001 From: Akhilesh Halageri Date: Thu, 13 Jun 2024 22:25:31 +0000 Subject: [PATCH 3/6] fix: invert mask, create separate function --- pychunkedgraph/app/segmentation/common.py | 47 ++++++++++++----------- 1 file changed, 25 insertions(+), 22 deletions(-) diff --git a/pychunkedgraph/app/segmentation/common.py b/pychunkedgraph/app/segmentation/common.py index efc2260ab..9efb6a594 100644 --- a/pychunkedgraph/app/segmentation/common.py +++ b/pychunkedgraph/app/segmentation/common.py @@ -1067,6 +1067,29 @@ def handle_is_latest_roots(table_id, is_binary): return cg.is_latest_roots(node_ids, time_stamp=timestamp) +def _handle_latest(cg, node_ids, timestamp): + latest_mask = cg.is_latest_roots(node_ids, time_stamp=timestamp) + non_latest_ids = node_ids[~latest_mask] + row_dict = cg.client.read_nodes( + node_ids=non_latest_ids, + properties=attributes.Hierarchy.NewParent, + end_time=timestamp, + ) + + new_roots_ts = [] + for v in row_dict.values(): + new_roots_ts.append(v[-1].timestamp.timestamp()) # sorted in descending order + new_roots_ts = deque(new_roots_ts) + + result = [] + for x in latest_mask: + if x: + result.append(timestamp.timestamp()) + else: + result.append(new_roots_ts.popleft()) + return result + + def handle_root_timestamps(table_id, is_binary, latest: bool = False): current_app.request_type = "root_timestamps" current_app.table_id = table_id @@ -1077,29 +1100,9 @@ def handle_root_timestamps(table_id, is_binary, latest: bool = False): node_ids = np.array(json.loads(request.data)["node_ids"], dtype=np.uint64) cg = app_utils.get_cg(table_id) - timestamp = None + timestamp = _parse_timestamp("timestamp", time.time(), return_datetime=True) if latest: - result = [] - timestamp = _parse_timestamp("timestamp", time.time(), return_datetime=True) - latest_at_ts = cg.is_latest_roots(node_ids, time_stamp=timestamp) - mask = np.array(latest_at_ts, dtype=bool) - non_latest_ids = node_ids[mask] - row_dict = cg.client.read_nodes( - node_ids=non_latest_ids, - properties=attributes.Hierarchy.NewParent, - end_time=timestamp, - ) - - new_roots_ts = [] - for v in row_dict.values(): - new_roots_ts.append(v[-1].timestamp.timestamp()) # sorted in descending order - new_roots_ts = deque(new_roots_ts) - for x in latest_at_ts: - if x is True: - result.append(timestamp) - else: - result.append(new_roots_ts.popleft()) - return result + return _handle_latest(cg, node_ids, timestamp) else: timestamps = cg.get_node_timestamps(node_ids, return_numpy=False) return [ts.timestamp() for ts in timestamps] From 0e70a2e3f43e350e3ce6bcf4d222ac06c80b2fda Mon Sep 17 00:00:00 2001 From: Forrest Collman Date: Mon, 17 Jun 2024 11:05:35 +0200 Subject: [PATCH 4/6] =?UTF-8?q?Bump=20version:=202.16.0=20=E2=86=92=202.17?= =?UTF-8?q?.0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .bumpversion.cfg | 2 +- pychunkedgraph/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 2767c6b3d..6ad6c1041 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 2.16.0 +current_version = 2.17.0 commit = True tag = True diff --git a/pychunkedgraph/__init__.py b/pychunkedgraph/__init__.py index 8f4a35170..a6b62ff3b 100644 --- a/pychunkedgraph/__init__.py +++ b/pychunkedgraph/__init__.py @@ -1 +1 @@ -__version__ = "2.16.0" +__version__ = "2.17.0" From 7165f1cc57cf01001a1943db0c2061b9e033fc06 Mon Sep 17 00:00:00 2001 From: Akhilesh Halageri Date: Sat, 22 Jun 2024 15:51:51 +0000 Subject: [PATCH 5/6] fix: handle timestamp from before node was created --- pychunkedgraph/app/segmentation/common.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pychunkedgraph/app/segmentation/common.py b/pychunkedgraph/app/segmentation/common.py index 9efb6a594..bddcab0aa 100644 --- a/pychunkedgraph/app/segmentation/common.py +++ b/pychunkedgraph/app/segmentation/common.py @@ -1077,8 +1077,12 @@ def _handle_latest(cg, node_ids, timestamp): ) new_roots_ts = [] - for v in row_dict.values(): - new_roots_ts.append(v[-1].timestamp.timestamp()) # sorted in descending order + for n in node_ids: + try: + v = row_dict[n] + new_roots_ts.append(v[-1].timestamp.timestamp()) # sorted descending + except KeyError: + new_roots_ts.append(0) new_roots_ts = deque(new_roots_ts) result = [] From f96ca9b9605150f92ab8827dc53570cfa34acb4a Mon Sep 17 00:00:00 2001 From: Akhilesh Halageri Date: Fri, 16 Aug 2024 20:21:15 +0000 Subject: [PATCH 6/6] remove version conflict --- .bumpversion.cfg | 2 +- pychunkedgraph/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 6ad6c1041..b1b19c5b1 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 2.17.0 +current_version = 2.16.1 commit = True tag = True diff --git a/pychunkedgraph/__init__.py b/pychunkedgraph/__init__.py index a6b62ff3b..dc9511ccb 100644 --- a/pychunkedgraph/__init__.py +++ b/pychunkedgraph/__init__.py @@ -1 +1 @@ -__version__ = "2.17.0" +__version__ = "2.16.1"