Skip to content

Commit

Permalink
feat+perf: adds max_labels_per_shard to skeleton shard production
Browse files Browse the repository at this point in the history
  • Loading branch information
william-silversmith committed Dec 3, 2022
1 parent bc07b07 commit df6e292
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 14 deletions.
32 changes: 19 additions & 13 deletions igneous/task_creation/skeleton.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import numpy as np
from tqdm import tqdm

import shardcomputer

from cloudvolume import CloudVolume
from cloudvolume.lib import Vec, Bbox, max2, min2, xyzrange, find_closest_divisor, yellow, jsonify
from cloudvolume.datasource.precomputed.sharding import ShardingSpecification
Expand Down Expand Up @@ -273,20 +275,27 @@ def __iter__(self):
return GrapheneSkeletonMergeTaskIterator()

def create_sharded_skeleton_merge_tasks(
layer_path, dust_threshold, tick_threshold,
shard_index_bytes=2**13,
minishard_index_bytes=2**15,
min_shards=1,
minishard_index_encoding='gzip',
data_encoding='gzip',
max_cable_length=None, spatial_index_db=None
layer_path:str,
dust_threshold:int, tick_threshold:float,
shard_index_bytes:int = 2**13,
minishard_index_bytes:int = 2**15,
min_shards:int = 1,
minishard_index_encoding:str = 'gzip',
data_encoding:str = 'gzip',
max_cable_length:Optional[float] = None,
spatial_index_db:Optional[str] = None,
max_labels_per_shard:Optional[int] = None,
):
cv = CloudVolume(layer_path, progress=True, spatial_index_db=spatial_index_db)
cv.mip = cv.skeleton.meta.mip

# 17 sec to download for pinky100
all_labels = cv.skeleton.spatial_index.query(cv.bounds * cv.resolution)

if max_labels_per_shard is not None:
assert max_labels_per_shard >= 1
min_shards = max(int(np.ceil(len(all_labels) / max_labels_per_shard)), min_shards)

(shard_bits, minishard_bits, preshift_bits) = \
compute_shard_params_for_hashed(
num_labels=len(all_labels),
Expand All @@ -311,12 +320,9 @@ def create_sharded_skeleton_merge_tasks(
cv = CloudVolume(layer_path, progress=True, spatial_index_db=spatial_index_db)
cv.mip = cv.skeleton.meta.mip

# perf: ~36k hashes/sec
shardfn = lambda lbl: cv.skeleton.reader.spec.compute_shard_location(lbl).shard_number

shard_labels = defaultdict(list)
for label in tqdm(all_labels, desc="Hashes"):
shard_labels[shardfn(label)].append(label)
all_labels = np.fromiter(all_labels, dtype=np.uint64, count=len(all_labels))
shard_labels = shardcomputer.assign_labels_to_shards(all_labels, preshift_bits, shard_bits, minishard_bits)
del all_labels

cf = CloudFiles(cv.skeleton.meta.layerpath, progress=True)
files = (
Expand Down
5 changes: 4 additions & 1 deletion igneous_cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1079,6 +1079,7 @@ def skeleton_merge(
@click.option('--tick-threshold', default=0, help="Remove small \"ticks\", or branches from the main skeleton one at a time from smallest to largest. Branches larger than this are preserved. Default: no elimination", type=float)
@click.option('--shard-index-bytes', default=2**13, help="Size in bytes to make the shard index.", type=int, show_default=True)
@click.option('--minishard-index-bytes', default=2**15, help="Size in bytes to make the minishard index.", type=int, show_default=True)
@click.option('--max-labels-per-shard', default=2000, help="Maximum average number of labels per a shard.", type=int, show_default=True)
@click.option('--min-shards', default=1, help="Minimum number of shards to generate. Excess shards can make it easier to parallelize the merge process.", type=int, show_default=True)
@click.option('--minishard-index-encoding', default="gzip", help="Minishard indices can be compressed. gzip or raw. Default: gzip")
@click.option('--data-encoding', default="gzip", help="Shard data can be compressed. gzip or raw. Default: gzip")
Expand All @@ -1088,7 +1089,8 @@ def skeleton_sharded_merge(
ctx, path, queue,
min_cable_length, max_cable_length,
tick_threshold,
shard_index_bytes, minishard_index_bytes, min_shards,
shard_index_bytes, minishard_index_bytes,
max_labels_per_shard, min_shards,
minishard_index_encoding, data_encoding,
spatial_index_db
):
Expand All @@ -1115,6 +1117,7 @@ def skeleton_sharded_merge(
minishard_index_encoding=minishard_index_encoding,
data_encoding=data_encoding,
spatial_index_db=spatial_index_db,
max_labels_per_shard=max_labels_per_shard,
)

parallel = int(ctx.obj.get("parallel", 1))
Expand Down

0 comments on commit df6e292

Please sign in to comment.