Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skeleton dev2 #286

Merged
merged 3 commits into from
Dec 13, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 56 additions & 12 deletions caveclient/skeletonservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from io import BytesIO, StringIO
from typing import List, Literal, Optional, Union

import numpy as np
import pandas as pd
from cachetools import TTLCache, cached
from packaging.version import Version
Expand All @@ -22,6 +23,9 @@

SERVER_KEY = "skeleton_server_address"

MAX_BULK_ASYNCHRONOUS_SKELETONS = 10000
BULK_ASYNC_SKELETONS_BATCH_SIZE = 100


class NoL2CacheException(Exception):
def __init__(self, value=""):
Expand Down Expand Up @@ -375,6 +379,9 @@
f"Unknown skeleton version: {skeleton_version}. Valid options: {valid_skeleton_versions}"
)

if isinstance(root_ids, np.ndarray):
root_ids = root_ids.tolist()

Check warning on line 383 in caveclient/skeletonservice.py

View check run for this annotation

Codecov / codecov/patch

caveclient/skeletonservice.py#L382-L383

Added lines #L382 - L383 were not covered by tests

if isinstance(root_ids, int):
root_ids = str(root_ids)
elif isinstance(root_ids, List):
Expand Down Expand Up @@ -669,20 +676,57 @@
)
skeleton_version = -1

url = self._build_bulk_async_endpoint(
root_ids, datastack_name, skeleton_version
)
response = self.session.get(url)
self.raise_for_status(response, log_warning=log_warning)
if isinstance(root_ids, np.ndarray):
root_ids = root_ids.tolist()
if not isinstance(root_ids, list):
raise ValueError(

Check warning on line 682 in caveclient/skeletonservice.py

View check run for this annotation

Codecov / codecov/patch

caveclient/skeletonservice.py#L679-L682

Added lines #L679 - L682 were not covered by tests
f"root_ids must be a list or numpy array of root_ids, not a {type(root_ids)}"
)

estimated_async_time_secs_upper_bound = float(response.text)
if len(root_ids) > MAX_BULK_ASYNCHRONOUS_SKELETONS:
logging.warning(

Check warning on line 687 in caveclient/skeletonservice.py

View check run for this annotation

Codecov / codecov/patch

caveclient/skeletonservice.py#L686-L687

Added lines #L686 - L687 were not covered by tests
f"The number of root_ids exceeds the current limit of {MAX_BULK_ASYNCHRONOUS_SKELETONS}. Only the first {MAX_BULK_ASYNCHRONOUS_SKELETONS} will be processed."
)
root_ids = root_ids[:MAX_BULK_ASYNCHRONOUS_SKELETONS]

Check warning on line 690 in caveclient/skeletonservice.py

View check run for this annotation

Codecov / codecov/patch

caveclient/skeletonservice.py#L690

Added line #L690 was not covered by tests

if verbose_level >= 1:
logging.info(
f"Queued asynchronous skeleton generation for root_ids: {root_ids}"
estimated_async_time_secs_upper_bound_sum = 0
for batch in range(0, len(root_ids), BULK_ASYNC_SKELETONS_BATCH_SIZE):
rids_one_batch = root_ids[batch : batch + BULK_ASYNC_SKELETONS_BATCH_SIZE]

Check warning on line 694 in caveclient/skeletonservice.py

View check run for this annotation

Codecov / codecov/patch

caveclient/skeletonservice.py#L692-L694

Added lines #L692 - L694 were not covered by tests

url = self._build_bulk_async_endpoint(

Check warning on line 696 in caveclient/skeletonservice.py

View check run for this annotation

Codecov / codecov/patch

caveclient/skeletonservice.py#L696

Added line #L696 was not covered by tests
rids_one_batch, datastack_name, skeleton_version
)
logging.info(
f"Upper estimate to generate {len(root_ids)} skeletons: {estimated_async_time_secs_upper_bound} seconds"
response = self.session.get(url)
self.raise_for_status(response, log_warning=log_warning)

Check warning on line 700 in caveclient/skeletonservice.py

View check run for this annotation

Codecov / codecov/patch

caveclient/skeletonservice.py#L699-L700

Added lines #L699 - L700 were not covered by tests

estimated_async_time_secs_upper_bound = float(response.text)
estimated_async_time_secs_upper_bound_sum += (

Check warning on line 703 in caveclient/skeletonservice.py

View check run for this annotation

Codecov / codecov/patch

caveclient/skeletonservice.py#L702-L703

Added lines #L702 - L703 were not covered by tests
estimated_async_time_secs_upper_bound
)

if verbose_level >= 1:
logging.info(

Check warning on line 708 in caveclient/skeletonservice.py

View check run for this annotation

Codecov / codecov/patch

caveclient/skeletonservice.py#L707-L708

Added lines #L707 - L708 were not covered by tests
f"Queued asynchronous skeleton generation for one batch of root_ids: {rids_one_batch}"
)
logging.info(

Check warning on line 711 in caveclient/skeletonservice.py

View check run for this annotation

Codecov / codecov/patch

caveclient/skeletonservice.py#L711

Added line #L711 was not covered by tests
f"Upper estimate to generate one batch of {len(rids_one_batch)} skeletons: {estimated_async_time_secs_upper_bound} seconds"
)

if estimated_async_time_secs_upper_bound_sum < 60:
estimate_time_str = (

Check warning on line 716 in caveclient/skeletonservice.py

View check run for this annotation

Codecov / codecov/patch

caveclient/skeletonservice.py#L715-L716

Added lines #L715 - L716 were not covered by tests
f"{estimated_async_time_secs_upper_bound_sum:.0f} seconds"
)
elif estimated_async_time_secs_upper_bound_sum < 3600:
estimate_time_str = (

Check warning on line 720 in caveclient/skeletonservice.py

View check run for this annotation

Codecov / codecov/patch

caveclient/skeletonservice.py#L719-L720

Added lines #L719 - L720 were not covered by tests
f"{(estimated_async_time_secs_upper_bound_sum / 60):.1f} minutes"
)
# With a 10000 skeleton limit, the maximum time about 12 hours, so we don't need to check for more than that.
# elif estimated_async_time_secs_upper_bound_sum < 86400:
else:
estimate_time_str = (

Check warning on line 726 in caveclient/skeletonservice.py

View check run for this annotation

Codecov / codecov/patch

caveclient/skeletonservice.py#L726

Added line #L726 was not covered by tests
f"{(estimated_async_time_secs_upper_bound_sum / 3600):.1f} hours"
)
# else:
# estimate_time_str = f"{(estimated_async_time_secs_upper_bound_sum / 86400):.2f} days"

return f"Upper estimate to generate {len(root_ids)} skeletons: {estimated_async_time_secs_upper_bound} seconds"
return f"Upper estimate to generate all {len(root_ids)} skeletons: {estimate_time_str}"

Check warning on line 732 in caveclient/skeletonservice.py

View check run for this annotation

Codecov / codecov/patch

caveclient/skeletonservice.py#L732

Added line #L732 was not covered by tests
Loading