From c6b27dc0a1a315a55a819a0f467684ab0fc1c52b Mon Sep 17 00:00:00 2001 From: Aina Date: Fri, 24 Feb 2023 11:08:19 +0100 Subject: [PATCH 01/22] Create cache dir and file path getter. --- src/synpp/pipeline.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/synpp/pipeline.py b/src/synpp/pipeline.py index 031adf2..c1b21f0 100644 --- a/src/synpp/pipeline.py +++ b/src/synpp/pipeline.py @@ -221,6 +221,11 @@ def hash_name(name, config): else: return name +def get_cache_directory_path(working_directory, hash): + return "%s/%s.cache" % (working_directory, hash) + +def get_cache_file_path(working_directory, hash): + return "%s/%s.p" % (working_directory, hash) class ConfiguredStage: def __init__(self, instance, config, configuration_context): From 8e805cb6c3c010eaa4d2104c6ad683bda021f1f5 Mon Sep 17 00:00:00 2001 From: Aina Date: Fri, 24 Feb 2023 11:15:14 +0100 Subject: [PATCH 02/22] Use path getters. --- src/synpp/pipeline.py | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/src/synpp/pipeline.py b/src/synpp/pipeline.py index c1b21f0..12c78ca 100644 --- a/src/synpp/pipeline.py +++ b/src/synpp/pipeline.py @@ -344,7 +344,7 @@ def stage(self, name, config = {}): return self.cache[dependency] else: if not dependency in self.dependency_cache: - with open("%s/%s.p" % (self.working_directory, dependency), "rb") as f: + with open(get_cache_file_path(self.working_directory, dependency), "rb") as f: self.logger.info("Loading cache for %s ..." % dependency) self.dependency_cache[dependency] = pickle.load(f) @@ -358,7 +358,7 @@ def path(self, name = None, config = {}): return self.cache_path dependency = self._get_dependency({ "descriptor": name, "config": config }) - return "%s/%s.cache" % (self.working_directory, dependency) + return get_cache_directory_path(self.working_directory, dependency) def set_info(self, name, value): self.stage_info[name] = value @@ -647,8 +647,8 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non if not working_directory is None: for hash in sorted_hashes: - directory_path = "%s/%s.cache" % (working_directory, hash) - file_path = "%s/%s.p" % (working_directory, hash) + directory_path = get_cache_directory_path(working_directory, hash) + file_path = get_cache_file_path(working_directory, hash) if os.path.exists(directory_path) and os.path.exists(file_path): cache_available.add(hash) @@ -710,9 +710,6 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non # 4.5) Devalidate if cache is not existant if not working_directory is None: for hash in sorted_cached_hashes: - directory_path = "%s/%s.cache" % (working_directory, hash) - file_path = "%s/%s.p" % (working_directory, hash) - if not hash in cache_available: stale_hashes.add(hash) @@ -738,7 +735,7 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non # 4.8) Manually devalidate stages for hash in sorted_cached_hashes: stage = registry[hash] - cache_path = "%s/%s.cache" % (working_directory, hash) + cache_path = get_cache_directory_path(working_directory, hash) context = ValidateContext(stage["config"], cache_path) validation_token = stage["wrapper"].validate(context) @@ -804,7 +801,7 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non stage_dependency_info[dependency_hash] = meta[dependency_hash]["info"] # Prepare cache path - cache_path = "%s/%s.cache" % (working_directory, hash) + cache_path = get_cache_directory_path(working_directory, hash) if not working_directory is None: if os.path.exists(cache_path): @@ -821,7 +818,7 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non if working_directory is None: cache[hash] = result else: - with open("%s/%s.p" % (working_directory, hash), "wb+") as f: + with open(get_cache_file_path(working_directory, hash), "wb+") as f: logger.info("Writing cache for %s" % hash) pickle.dump(result, f, protocol=4) @@ -847,8 +844,8 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non ephemeral_counts[dependency_hash] -= 1 if ephemeral_counts[dependency_hash] == 0: - cache_directory_path = "%s/%s.cache" % (working_directory, dependency_hash) - cache_file_path = "%s/%s.p" % (working_directory, dependency_hash) + cache_directory_path = get_cache_directory_path(working_directory, dependency_hash) + cache_file_path = get_cache_file_path(working_directory, dependency_hash) rmtree(cache_directory_path) os.remove(cache_file_path) @@ -867,7 +864,7 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non # Load remaining previously cached results for hash in required_hashes: if results[required_hashes.index(hash)] is None: - with open("%s/%s.p" % (working_directory, hash), "rb") as f: + with open(get_cache_file_path(working_directory, hash), "rb") as f: logger.info("Loading cache for %s ..." % hash) results[required_hashes.index(hash)] = pickle.load(f) From 70830d182871cf80cca468f29c26a36c743c9524 Mon Sep 17 00:00:00 2001 From: Aina Date: Fri, 24 Feb 2023 11:15:46 +0100 Subject: [PATCH 03/22] Clean parameter names of getters. --- src/synpp/pipeline.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/synpp/pipeline.py b/src/synpp/pipeline.py index 12c78ca..baadcad 100644 --- a/src/synpp/pipeline.py +++ b/src/synpp/pipeline.py @@ -221,11 +221,11 @@ def hash_name(name, config): else: return name -def get_cache_directory_path(working_directory, hash): - return "%s/%s.cache" % (working_directory, hash) +def get_cache_directory_path(working_directory, stage_hash): + return "%s/%s.cache" % (working_directory, stage_hash) -def get_cache_file_path(working_directory, hash): - return "%s/%s.p" % (working_directory, hash) +def get_cache_file_path(working_directory, stage_hash): + return "%s/%s.p" % (working_directory, stage_hash) class ConfiguredStage: def __init__(self, instance, config, configuration_context): From 16a5821475e27563062b5c097c5c0d67f3e703fb Mon Sep 17 00:00:00 2001 From: Aina Date: Fri, 24 Feb 2023 12:17:17 +0100 Subject: [PATCH 04/22] clean --- src/synpp/pipeline.py | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/src/synpp/pipeline.py b/src/synpp/pipeline.py index baadcad..0a677fd 100644 --- a/src/synpp/pipeline.py +++ b/src/synpp/pipeline.py @@ -785,17 +785,6 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non logger.info("Executing stage %s ..." % hash) stage = registry[hash] - # Load the dependencies, either from cache or from file - #stage_dependencies = [] - #stage_dependency_info = {} - - #if name in dependencies: - # stage_dependencies = dependencies[name] - # - # for parent in stage_dependencies: - # stage_dependency_info[parent] = meta[parent]["info"] - #stage_dependencies = - stage_dependency_info = {} for dependency_hash in stage["dependencies"]: stage_dependency_info[dependency_hash] = meta[dependency_hash]["info"] From b12f7bcc7a4ec7cf0295814fb744a0cb6e3adc06 Mon Sep 17 00:00:00 2001 From: Aina Date: Fri, 24 Feb 2023 17:33:22 +0100 Subject: [PATCH 05/22] remove pipeline json file --- src/synpp/pipeline.py | 349 +++++++++++++++++++----------------------- 1 file changed, 157 insertions(+), 192 deletions(-) diff --git a/src/synpp/pipeline.py b/src/synpp/pipeline.py index 0a677fd..42468c3 100644 --- a/src/synpp/pipeline.py +++ b/src/synpp/pipeline.py @@ -64,10 +64,10 @@ class NoDefaultValue: class StageInstance: - def __init__(self, instance, name, module_hash): + def __init__(self, instance, name, source_code): self.instance = instance self.name = name - self.module_hash = module_hash + self.source_code = source_code if not hasattr(self.instance, "execute"): raise RuntimeError("Stage %s does not have execute method" % self.name) @@ -88,11 +88,8 @@ def execute(self, context): return self.instance.execute(context) -def get_stage_hash(descriptor): - source = inspect.getsource(descriptor) - hash = hashlib.md5() - hash.update(source.encode("utf-8")) - return hash.hexdigest() +def get_source_code(descriptor): + return inspect.getsource(descriptor) def synpp_import_module(name, package=None, externals={}): @@ -133,25 +130,25 @@ def resolve_stage(descriptor, externals: dict = {}, aliases: dict = {}): return None # definitely not a stage if inspect.ismodule(descriptor): - stage_hash = get_stage_hash(descriptor) - return StageInstance(descriptor, descriptor.__name__, stage_hash) + source_code = get_source_code(descriptor) + return StageInstance(descriptor, descriptor.__name__, source_code) if inspect.isclass(descriptor): - stage_hash = get_stage_hash(descriptor) - return StageInstance(descriptor(), "%s.%s" % (descriptor.__module__, descriptor.__name__), stage_hash) + source_code = get_source_code(descriptor) + return StageInstance(descriptor(), "%s.%s" % (descriptor.__module__, descriptor.__name__), source_code) if inspect.isfunction(descriptor): if not hasattr(descriptor, 'stage_params'): raise PipelineError("Functions need to be decorated with @synpp.stage in order to be used in the pipeline.") function_stage = DecoratedStage(execute_func=descriptor, stage_params=descriptor.stage_params) - stage_hash = get_stage_hash(descriptor) - return StageInstance(function_stage, "%s.%s" % (descriptor.__module__, descriptor.__name__), stage_hash) + source_code = get_source_code(descriptor) + return StageInstance(function_stage, "%s.%s" % (descriptor.__module__, descriptor.__name__), source_code) if hasattr(descriptor, 'execute'): # Last option: arbitrary object which looks like a stage clazz = descriptor.__class__ - stage_hash = get_stage_hash(clazz) - return StageInstance(descriptor, "%s.%s" % (clazz.__module__, clazz.__name__), stage_hash) + source_code = get_source_code(clazz) + return StageInstance(descriptor, "%s.%s" % (clazz.__module__, clazz.__name__), source_code) # couldn't resolve stage (this is something else) return None @@ -213,19 +210,30 @@ def configure_name(name, config): return "%s(%s)" % (name, ",".join(values)) -def hash_name(name, config): +def get_stage_id(stage_name, config): + name = stage_name if len(config) > 0: - hash = hashlib.md5() - hash.update(json.dumps(config, sort_keys = True).encode("utf-8")) - return "%s__%s" % (name, hash.hexdigest()) - else: - return name + encoded_config = json.dumps(config, sort_keys = True).encode("utf-8") + config_digest = hashlib.md5(encoded_config).hexdigest() + name += "__" + config_digest + return name + +def get_cache_prefix(stage_id, source_code): + cache_prefix = stage_id + "__" + hashlib.md5(source_code.encode()).hexdigest() + return cache_prefix -def get_cache_directory_path(working_directory, stage_hash): - return "%s/%s.cache" % (working_directory, stage_hash) +def get_cache_id(stage_id, source_code, validation_token): + cache_id = get_cache_prefix(stage_id, source_code) + "__" + str(validation_token) + return cache_id -def get_cache_file_path(working_directory, stage_hash): - return "%s/%s.p" % (working_directory, stage_hash) +def get_cache_directory_path(working_directory, cache_id): + return "%s/%s.cache" % (working_directory, cache_id) + +def get_cache_file_path(working_directory, cache_id): + return "%s/%s.p" % (working_directory, cache_id) + +def get_info_path(working_directory, cache_id): + return "%s/%s.info" % (working_directory, cache_id) class ConfiguredStage: def __init__(self, instance, config, configuration_context): @@ -234,7 +242,7 @@ def __init__(self, instance, config, configuration_context): self.configuration_context = configuration_context self.configured_name = configure_name(instance.name, configuration_context.required_config) - self.hashed_name = hash_name(instance.name, configuration_context.required_config) + self.hashed_name = get_stage_id(instance.name, configuration_context.required_config) def configure(self, context): if hasattr(self.instance, "configure"): @@ -315,7 +323,7 @@ def path(self): class ExecuteContext(Context): - def __init__(self, required_config, required_stages, aliases, working_directory, dependencies, cache_path, pipeline_config, logger, cache, dependency_info): + def __init__(self, required_config, required_stages, aliases, working_directory, dependencies, cache_path, pipeline_config, logger, cache, dependency_info, cache_ids): self.required_config = required_config self.working_directory = working_directory self.dependencies = dependencies @@ -328,6 +336,7 @@ def __init__(self, required_config, required_stages, aliases, working_directory, self.dependency_info = dependency_info self.aliases = aliases self.required_stages = required_stages + self.cache_ids = cache_ids self.progress_context = None @@ -344,7 +353,7 @@ def stage(self, name, config = {}): return self.cache[dependency] else: if not dependency in self.dependency_cache: - with open(get_cache_file_path(self.working_directory, dependency), "rb") as f: + with open(get_cache_file_path(self.working_directory, self.cache_ids[dependency]), "rb") as f: self.logger.info("Loading cache for %s ..." % dependency) self.dependency_cache[dependency] = pickle.load(f) @@ -358,7 +367,7 @@ def path(self, name = None, config = {}): return self.cache_path dependency = self._get_dependency({ "descriptor": name, "config": config }) - return get_cache_directory_path(self.working_directory, dependency) + return get_cache_directory_path(self.working_directory, self.cache_ids[dependency]) def set_info(self, name, value): self.stage_info[name] = value @@ -442,7 +451,7 @@ def process_stages(definitions, global_config, externals={}, aliases={}): }) # Check for cycles - cycle_hash = hash_name(definition["wrapper"].name, definition["config"]) + cycle_hash = get_stage_id(definition["wrapper"].name, definition["config"]) if "cycle_hashes" in definition and cycle_hash in definition["cycle_hashes"]: print(definition["cycle_hashes"]) @@ -536,7 +545,7 @@ def process_stages(definitions, global_config, externals={}, aliases={}): required_hashes = {} for stage in stages: - stage["hash"] = hash_name(stage["wrapper"].name, stage["config"]) + stage["hash"] = get_stage_id(stage["wrapper"].name, stage["config"]) if "required-index" in stage: index = stage["required-index"] @@ -565,17 +574,6 @@ def process_stages(definitions, global_config, externals={}, aliases={}): return registry - -def update_json(meta, working_directory): - if os.path.exists("%s/pipeline.json" % working_directory): - shutil.move("%s/pipeline.json" % working_directory, "%s/pipeline.json.bk" % working_directory) - - with open("%s/pipeline.json.new" % working_directory, "w+") as f: - json.dump(meta, f) - - shutil.move("%s/pipeline.json.new" % working_directory, "%s/pipeline.json" % working_directory) - - def run(definitions, config = {}, working_directory = None, flowchart_path = None, dryrun = False, verbose = False, logger = logging.getLogger("synpp"), rerun_required=True, ensure_working_directory=False, externals = {}, aliases = {}): @@ -607,8 +605,8 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non graph = nx.DiGraph() flowchart = nx.MultiDiGraph() # graph to later plot - for hash in registry.keys(): - graph.add_node(hash) + for stage_id in registry.keys(): + graph.add_node(stage_id) for stage in registry.values(): stage_name = stage['descriptor'] @@ -616,10 +614,10 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non if not flowchart.has_node(stage_name): flowchart.add_node(stage_name) - for hash in stage["dependencies"]: - graph.add_edge(hash, stage["hash"]) + for stage_id in stage["dependencies"]: + graph.add_edge(stage_id, stage["hash"]) - dependency_name = registry.get(hash)['descriptor'] + dependency_name = registry.get(stage_id)['descriptor'] if not flowchart.has_edge(dependency_name, stage_name): flowchart.add_edge(dependency_name, stage_name) @@ -642,135 +640,111 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non sorted_hashes = list(nx.topological_sort(graph)) + # Compute cache prefixes by appending source code digest + source_codes = dict() + for stage_id in sorted_hashes: + source_codes[stage_id] = "" + for dependency_stage_id in nx.ancestors(graph, stage_id): + source_codes[stage_id] += registry[dependency_stage_id]["wrapper"].source_code + source_codes[stage_id] += registry[stage_id]["wrapper"].source_code + # Check where cache is available cache_available = set() + stored_validation_tokens = {} if not working_directory is None: - for hash in sorted_hashes: - directory_path = get_cache_directory_path(working_directory, hash) - file_path = get_cache_file_path(working_directory, hash) - - if os.path.exists(directory_path) and os.path.exists(file_path): - cache_available.add(hash) - registry[hash]["ephemeral"] = False + for stage_id in sorted_hashes: + prefix = get_cache_prefix(stage_id, source_codes[stage_id]) + prefixed = [filename[:-2] for filename in os.listdir(working_directory) if filename.startswith(prefix) and filename.endswith(".p")] + if prefixed: + stored_validation_tokens[stage_id] = [filename.split("__")[-1] for filename in prefixed] + cache_available.add(stage_id) + registry[stage_id]["ephemeral"] = False # Set up ephemeral stage counts ephemeral_counts = {} for stage in registry.values(): - for hash in stage["dependencies"]: - dependency = registry[hash] - - if dependency["ephemeral"] and not hash in cache_available: - if not hash in ephemeral_counts: - ephemeral_counts[hash] = 0 - - ephemeral_counts[hash] += 1 + for stage_id in stage["dependencies"]: + dependency = registry[stage_id] - # 3) Load information about stages - meta = {} + if dependency["ephemeral"] and not stage_id in cache_available: + if not stage_id in ephemeral_counts: + ephemeral_counts[stage_id] = 0 - if not working_directory is None: - try: - with open("%s/pipeline.json" % working_directory) as f: - meta = json.load(f) - logger.info("Found pipeline metadata in %s/pipeline.json" % working_directory) - except FileNotFoundError: - logger.info("Did not find pipeline metadata in %s/pipeline.json" % working_directory) + ephemeral_counts[stage_id] += 1 # 4) Devalidate stages - sorted_cached_hashes = sorted_hashes - ephemeral_counts.keys() + sorted_cached_hashes = sorted_hashes stale_hashes = set() - # 4.1) Devalidate if they are required (optional, otherwise will reload from cache) - if rerun_required: - stale_hashes.update(required_hashes) - - # 4.2) Devalidate if not in meta - for hash in sorted_cached_hashes: - if not hash in meta: - stale_hashes.add(hash) - - # 4.3) Devalidate if configuration values have changed - # This devalidation step is obsolete since we have implicit config parameters + # Get current validation tokens + current_validation_tokens = { + stage_id: + str( + registry[stage_id]["wrapper"].validate( + ValidateContext(registry[stage_id]["config"], get_cache_directory_path(working_directory, stage_id)) + ) + ) for stage_id in sorted_cached_hashes + } - # 4.4) Devalidate if module hash of a stage has changed - for hash in sorted_cached_hashes: - if hash in meta: - if not "module_hash" in meta[hash]: - stale_hashes.add(hash) # Backwards compatibility + # Cache mapper between stage id and cache id. + cache_ids = {stage_id: get_cache_id(stage_id, source_codes[stage_id], current_validation_tokens[stage_id]) for stage_id in sorted_cached_hashes} - else: - previous_module_hash = meta[hash]["module_hash"] - current_module_hash = registry[hash]["wrapper"].module_hash + # 4.8) Manually devalidate stages + for stage_id in sorted_cached_hashes: + if stage_id not in stored_validation_tokens or current_validation_tokens[stage_id] not in stored_validation_tokens[stage_id]: + print(f"Devalidation {stage_id}: Manually devalidate") + stale_hashes.add(stage_id) - if previous_module_hash != current_module_hash: - stale_hashes.add(hash) + # 4.1) Devalidate if they are required (optional, otherwise will reload from cache) + if rerun_required: + print(f"Devalidation {required_hashes}: Requirement") + stale_hashes.update(required_hashes) # 4.5) Devalidate if cache is not existant - if not working_directory is None: - for hash in sorted_cached_hashes: - if not hash in cache_available: - stale_hashes.add(hash) + for stage_id in sorted_cached_hashes: + if not stage_id in cache_available: + print(f"Devalidation {stage_id}: No cache") + stale_hashes.add(stage_id) # 4.6) Devalidate if parent has been updated - for hash in sorted_cached_hashes: - if not hash in stale_hashes and hash in meta: - for dependency_hash, dependency_update in meta[hash]["dependencies"].items(): - if not dependency_hash in meta: - stale_hashes.add(hash) - else: - if meta[dependency_hash]["updated"] > dependency_update: - stale_hashes.add(hash) - - # 4.7) Devalidate if parents are not the same anymore - for hash in sorted_cached_hashes: - if not hash in stale_hashes and hash in meta: - cached_hashes = set(meta[hash]["dependencies"].keys()) - current_hashes = set(registry[hash]["dependencies"] if "dependencies" in registry[hash] else []) - - if not cached_hashes == current_hashes: - stale_hashes.add(hash) - - # 4.8) Manually devalidate stages - for hash in sorted_cached_hashes: - stage = registry[hash] - cache_path = get_cache_directory_path(working_directory, hash) - context = ValidateContext(stage["config"], cache_path) - - validation_token = stage["wrapper"].validate(context) - existing_token = meta[hash]["validation_token"] if hash in meta and "validation_token" in meta[hash] else None + if working_directory is not None: + for stage_id in sorted_cached_hashes: + if not stage_id in stale_hashes: + ctime = os.stat(get_cache_file_path(working_directory, cache_ids[stage_id])).st_mtime_ns + # print(f"Cached {stage_id}: {ctime}") + for dependency_stage_id in nx.ancestors(graph, stage_id): + dependency_ctime = os.stat(get_cache_file_path(working_directory, cache_ids[dependency_stage_id])).st_mtime_ns + if dependency_ctime > ctime: + print(f"Devalidation {stage_id}: Parent {dependency_stage_id} updated ({dependency_ctime} > {ctime})") + stale_hashes.add(stage_id) + break + if dependency_ctime == ctime: + print(f"{stage_id} and {dependency_stage_id}: {dependency_ctime} == {ctime}") - if not validation_token == existing_token: - stale_hashes.add(hash) # 4.9) Devalidate descendants of devalidated stages - for hash in set(stale_hashes): - for descendant_hash in nx.descendants(graph, hash): + for stage_id in set(stale_hashes): + for descendant_hash in nx.descendants(graph, stage_id): if not descendant_hash in stale_hashes: + print(f"Devalidation {stage_id}: Descendent of devalidated") stale_hashes.add(descendant_hash) # 4.10) Devalidate ephemeral stages if necessary pending = set(stale_hashes) while len(pending) > 0: - for dependency_hash in registry[pending.pop()]["dependencies"]: - if registry[dependency_hash]["ephemeral"]: - if not dependency_hash in stale_hashes: - pending.add(dependency_hash) + for dependency_stage_id in registry[pending.pop()]["dependencies"]: + if registry[dependency_stage_id]["ephemeral"]: + if not dependency_stage_id in stale_hashes: + pending.add(dependency_stage_id) - stale_hashes.add(dependency_hash) + print(f"Devalidation {stage_id}: Ephemeral") + stale_hashes.add(dependency_stage_id) logger.info("Devalidating %d stages:" % len(stale_hashes)) - for hash in stale_hashes: logger.info("- %s" % hash) - - # 5) Reset meta information - for hash in stale_hashes: - if hash in meta: - del meta[hash] - - if not working_directory is None: - update_json(meta, working_directory) + for stage_id in stale_hashes: logger.info("- %s" % stage_id) logger.info("Successfully reset meta data") @@ -780,69 +754,60 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non progress = 0 - for hash in sorted_hashes: - if hash in stale_hashes: - logger.info("Executing stage %s ..." % hash) - stage = registry[hash] + infos = {} + for stage_id in sorted_hashes: + if stage_id in stale_hashes: + logger.info("Executing stage %s ..." % stage_id) + stage = registry[stage_id] - stage_dependency_info = {} - for dependency_hash in stage["dependencies"]: - stage_dependency_info[dependency_hash] = meta[dependency_hash]["info"] + for dependency_stage_id in stage["dependencies"]: + info_path = get_info_path(working_directory, cache_ids[dependency_stage_id]) + if dependency_stage_id not in infos and working_directory is not None: + with open(info_path, "rb") as f: + infos[dependency_stage_id] = pickle.load(f) # Prepare cache path - cache_path = get_cache_directory_path(working_directory, hash) + cache_id = cache_ids[stage_id] + cache_path = get_cache_directory_path(working_directory, cache_id) if not working_directory is None: if os.path.exists(cache_path): rmtree(cache_path) os.mkdir(cache_path) - - context = ExecuteContext(stage["config"], stage["required_stages"], stage["aliases"], working_directory, stage["dependencies"], cache_path, pipeline_config, logger, cache, stage_dependency_info) + context = ExecuteContext(stage["config"], stage["required_stages"], stage["aliases"], working_directory, stage["dependencies"], cache_path, pipeline_config, logger, cache, infos, cache_ids) result = stage["wrapper"].execute(context) - validation_token = stage["wrapper"].validate(ValidateContext(stage["config"], cache_path)) - if hash in required_hashes: - results[required_hashes.index(hash)] = result + if stage_id in required_hashes: + results[required_hashes.index(stage_id)] = result - if working_directory is None: - cache[hash] = result - else: - with open(get_cache_file_path(working_directory, hash), "wb+") as f: - logger.info("Writing cache for %s" % hash) + cache[stage_id] = result + if working_directory is not None: + with open(get_cache_file_path(working_directory, cache_id), "wb+") as f: + logger.info("Writing cache for %s" % stage_id) pickle.dump(result, f, protocol=4) - - # Update meta information - meta[hash] = { - "config": stage["config"], - "updated": datetime.datetime.utcnow().timestamp(), - "dependencies": { - dependency_hash: meta[dependency_hash]["updated"] for dependency_hash in stage["dependencies"] - }, - "info": context.stage_info, - "validation_token": validation_token, - "module_hash": stage["wrapper"].module_hash - } - - if not working_directory is None: - update_json(meta, working_directory) + # print(f"{stage_id}: {time.time_ns()}") + with open(get_info_path(working_directory, cache_id), "wb+") as f: + logger.info("Writing info for %s" % stage_id) + pickle.dump(context.stage_info, f, protocol=4) + infos[stage_id] = context.stage_info # Clear cache for ephemeral stages if they are no longer needed if not working_directory is None: - for dependency_hash in stage["dependencies"]: - if dependency_hash in ephemeral_counts: - ephemeral_counts[dependency_hash] -= 1 + for dependency_stage_id in stage["dependencies"]: + if dependency_stage_id in ephemeral_counts: + ephemeral_counts[dependency_stage_id] -= 1 - if ephemeral_counts[dependency_hash] == 0: - cache_directory_path = get_cache_directory_path(working_directory, dependency_hash) - cache_file_path = get_cache_file_path(working_directory, dependency_hash) + if ephemeral_counts[dependency_stage_id] == 0: + cache_directory_path = get_cache_directory_path(working_directory, cache_ids[dependency_stage_id]) + cache_file_path = get_cache_file_path(working_directory, cache_ids[dependency_stage_id]) rmtree(cache_directory_path) os.remove(cache_file_path) - logger.info("Removed ephemeral %s." % dependency_hash) - del ephemeral_counts[dependency_hash] + logger.info("Removed ephemeral %s." % dependency_stage_id) + del ephemeral_counts[dependency_stage_id] - logger.info("Finished running %s." % hash) + logger.info("Finished running %s." % stage_id) progress += 1 logger.info("Pipeline progress: %d/%d (%.2f%%)" % ( @@ -851,22 +816,22 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non if not rerun_required: # Load remaining previously cached results - for hash in required_hashes: - if results[required_hashes.index(hash)] is None: - with open(get_cache_file_path(working_directory, hash), "rb") as f: - logger.info("Loading cache for %s ..." % hash) - results[required_hashes.index(hash)] = pickle.load(f) + for stage_id in required_hashes: + if results[required_hashes.index(stage_id)] is None: + with open(get_cache_file_path(working_directory, cache_ids[stage_id]), "rb") as f: + logger.info("Loading cache for %s ..." % stage_id) + results[required_hashes.index(stage_id)] = pickle.load(f) if verbose: - info = {} + flattened_infos = {} - for hash in sorted(meta.keys()): - info.update(meta[hash]["info"]) + for stage_id in infos.keys(): + flattened_infos.update(infos[stage_id]) return { "results": results, "stale": stale_hashes, - "info": info, + "info": flattened_infos, "flowchart": node_link_data(flowchart) } else: From 77fe7f65556aba6fae122316ac18a4842a60a3dd Mon Sep 17 00:00:00 2001 From: Aina Date: Wed, 22 Mar 2023 14:39:50 +0100 Subject: [PATCH 06/22] Undo refactoring to make modifications clearer. --- src/synpp/pipeline.py | 186 ++++++++++++++++++++++-------------------- 1 file changed, 97 insertions(+), 89 deletions(-) diff --git a/src/synpp/pipeline.py b/src/synpp/pipeline.py index 42468c3..254e36e 100644 --- a/src/synpp/pipeline.py +++ b/src/synpp/pipeline.py @@ -64,10 +64,10 @@ class NoDefaultValue: class StageInstance: - def __init__(self, instance, name, source_code): + def __init__(self, instance, name, module_hash): self.instance = instance self.name = name - self.source_code = source_code + self.source_code = module_hash if not hasattr(self.instance, "execute"): raise RuntimeError("Stage %s does not have execute method" % self.name) @@ -210,8 +210,7 @@ def configure_name(name, config): return "%s(%s)" % (name, ",".join(values)) -def get_stage_id(stage_name, config): - name = stage_name +def hash_name(name, config): if len(config) > 0: encoded_config = json.dumps(config, sort_keys = True).encode("utf-8") config_digest = hashlib.md5(encoded_config).hexdigest() @@ -242,7 +241,7 @@ def __init__(self, instance, config, configuration_context): self.configuration_context = configuration_context self.configured_name = configure_name(instance.name, configuration_context.required_config) - self.hashed_name = get_stage_id(instance.name, configuration_context.required_config) + self.hashed_name = hash_name(instance.name, configuration_context.required_config) def configure(self, context): if hasattr(self.instance, "configure"): @@ -451,7 +450,7 @@ def process_stages(definitions, global_config, externals={}, aliases={}): }) # Check for cycles - cycle_hash = get_stage_id(definition["wrapper"].name, definition["config"]) + cycle_hash = hash_name(definition["wrapper"].name, definition["config"]) if "cycle_hashes" in definition and cycle_hash in definition["cycle_hashes"]: print(definition["cycle_hashes"]) @@ -545,7 +544,7 @@ def process_stages(definitions, global_config, externals={}, aliases={}): required_hashes = {} for stage in stages: - stage["hash"] = get_stage_id(stage["wrapper"].name, stage["config"]) + stage["hash"] = hash_name(stage["wrapper"].name, stage["config"]) if "required-index" in stage: index = stage["required-index"] @@ -605,8 +604,8 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non graph = nx.DiGraph() flowchart = nx.MultiDiGraph() # graph to later plot - for stage_id in registry.keys(): - graph.add_node(stage_id) + for hash in registry.keys(): + graph.add_node(hash) for stage in registry.values(): stage_name = stage['descriptor'] @@ -614,10 +613,10 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non if not flowchart.has_node(stage_name): flowchart.add_node(stage_name) - for stage_id in stage["dependencies"]: - graph.add_edge(stage_id, stage["hash"]) + for hash in stage["dependencies"]: + graph.add_edge(hash, stage["hash"]) - dependency_name = registry.get(stage_id)['descriptor'] + dependency_name = registry.get(hash)['descriptor'] if not flowchart.has_edge(dependency_name, stage_name): flowchart.add_edge(dependency_name, stage_name) @@ -642,37 +641,37 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non # Compute cache prefixes by appending source code digest source_codes = dict() - for stage_id in sorted_hashes: - source_codes[stage_id] = "" - for dependency_stage_id in nx.ancestors(graph, stage_id): - source_codes[stage_id] += registry[dependency_stage_id]["wrapper"].source_code - source_codes[stage_id] += registry[stage_id]["wrapper"].source_code + for hash in sorted_hashes: + source_codes[hash] = "" + for dependency_hash in nx.ancestors(graph, hash): + source_codes[hash] += registry[dependency_hash]["wrapper"].source_code + source_codes[hash] += registry[hash]["wrapper"].source_code # Check where cache is available cache_available = set() stored_validation_tokens = {} if not working_directory is None: - for stage_id in sorted_hashes: - prefix = get_cache_prefix(stage_id, source_codes[stage_id]) + for hash in sorted_hashes: + prefix = get_cache_prefix(hash, source_codes[hash]) prefixed = [filename[:-2] for filename in os.listdir(working_directory) if filename.startswith(prefix) and filename.endswith(".p")] if prefixed: - stored_validation_tokens[stage_id] = [filename.split("__")[-1] for filename in prefixed] - cache_available.add(stage_id) - registry[stage_id]["ephemeral"] = False + stored_validation_tokens[hash] = [filename.split("__")[-1] for filename in prefixed] + cache_available.add(hash) + registry[hash]["ephemeral"] = False # Set up ephemeral stage counts ephemeral_counts = {} for stage in registry.values(): - for stage_id in stage["dependencies"]: - dependency = registry[stage_id] + for hash in stage["dependencies"]: + dependency = registry[hash] - if dependency["ephemeral"] and not stage_id in cache_available: - if not stage_id in ephemeral_counts: - ephemeral_counts[stage_id] = 0 + if dependency["ephemeral"] and not hash in cache_available: + if not hash in ephemeral_counts: + ephemeral_counts[hash] = 0 - ephemeral_counts[stage_id] += 1 + ephemeral_counts[hash] += 1 # 4) Devalidate stages sorted_cached_hashes = sorted_hashes @@ -692,10 +691,10 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non cache_ids = {stage_id: get_cache_id(stage_id, source_codes[stage_id], current_validation_tokens[stage_id]) for stage_id in sorted_cached_hashes} # 4.8) Manually devalidate stages - for stage_id in sorted_cached_hashes: - if stage_id not in stored_validation_tokens or current_validation_tokens[stage_id] not in stored_validation_tokens[stage_id]: - print(f"Devalidation {stage_id}: Manually devalidate") - stale_hashes.add(stage_id) + for hash in sorted_cached_hashes: + if hash not in stored_validation_tokens or current_validation_tokens[hash] not in stored_validation_tokens[hash]: + print(f"Devalidation {hash}: Manually devalidate") + stale_hashes.add(hash) # 4.1) Devalidate if they are required (optional, otherwise will reload from cache) if rerun_required: @@ -703,48 +702,46 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non stale_hashes.update(required_hashes) # 4.5) Devalidate if cache is not existant - for stage_id in sorted_cached_hashes: - if not stage_id in cache_available: - print(f"Devalidation {stage_id}: No cache") - stale_hashes.add(stage_id) + for hash in sorted_cached_hashes: + if not hash in cache_available: + print(f"Devalidation {hash}: No cache") + stale_hashes.add(hash) # 4.6) Devalidate if parent has been updated if working_directory is not None: - for stage_id in sorted_cached_hashes: - if not stage_id in stale_hashes: - ctime = os.stat(get_cache_file_path(working_directory, cache_ids[stage_id])).st_mtime_ns + for hash in sorted_cached_hashes: + if not hash in stale_hashes: + ctime = os.stat(get_cache_file_path(working_directory, cache_ids[hash])).st_mtime_ns # print(f"Cached {stage_id}: {ctime}") - for dependency_stage_id in nx.ancestors(graph, stage_id): - dependency_ctime = os.stat(get_cache_file_path(working_directory, cache_ids[dependency_stage_id])).st_mtime_ns + for dependency_hash in nx.ancestors(graph, hash): + dependency_ctime = os.stat(get_cache_file_path(working_directory, cache_ids[dependency_hash])).st_mtime_ns if dependency_ctime > ctime: - print(f"Devalidation {stage_id}: Parent {dependency_stage_id} updated ({dependency_ctime} > {ctime})") - stale_hashes.add(stage_id) + print(f"Devalidation {hash}: Parent {dependency_hash} updated ({dependency_ctime} > {ctime})") + stale_hashes.add(hash) break if dependency_ctime == ctime: - print(f"{stage_id} and {dependency_stage_id}: {dependency_ctime} == {ctime}") + print(f"{hash} and {dependency_hash}: {dependency_ctime} == {ctime}") # 4.9) Devalidate descendants of devalidated stages - for stage_id in set(stale_hashes): - for descendant_hash in nx.descendants(graph, stage_id): + for hash in set(stale_hashes): + for descendant_hash in nx.descendants(graph, hash): if not descendant_hash in stale_hashes: - print(f"Devalidation {stage_id}: Descendent of devalidated") stale_hashes.add(descendant_hash) # 4.10) Devalidate ephemeral stages if necessary pending = set(stale_hashes) while len(pending) > 0: - for dependency_stage_id in registry[pending.pop()]["dependencies"]: - if registry[dependency_stage_id]["ephemeral"]: - if not dependency_stage_id in stale_hashes: - pending.add(dependency_stage_id) + for dependency_hash in registry[pending.pop()]["dependencies"]: + if registry[dependency_hash]["ephemeral"]: + if not dependency_hash in stale_hashes: + pending.add(dependency_hash) - print(f"Devalidation {stage_id}: Ephemeral") - stale_hashes.add(dependency_stage_id) + stale_hashes.add(dependency_hash) logger.info("Devalidating %d stages:" % len(stale_hashes)) - for stage_id in stale_hashes: logger.info("- %s" % stage_id) + for hash in stale_hashes: logger.info("- %s" % hash) logger.info("Successfully reset meta data") @@ -755,59 +752,70 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non progress = 0 infos = {} - for stage_id in sorted_hashes: - if stage_id in stale_hashes: - logger.info("Executing stage %s ..." % stage_id) - stage = registry[stage_id] - - for dependency_stage_id in stage["dependencies"]: - info_path = get_info_path(working_directory, cache_ids[dependency_stage_id]) - if dependency_stage_id not in infos and working_directory is not None: + for hash in sorted_hashes: + if hash in stale_hashes: + logger.info("Executing stage %s ..." % hash) + stage = registry[hash] + + # Load the dependencies, either from cache or from file + #stage_dependencies = [] + #stage_dependency_info = {} + + #if name in dependencies: + # stage_dependencies = dependencies[name] + # + # for parent in stage_dependencies: + # stage_dependency_info[parent] = meta[parent]["info"] + #stage_dependencies = + + for dependency_hash in stage["dependencies"]: + info_path = get_info_path(working_directory, cache_ids[dependency_hash]) + if dependency_hash not in infos and working_directory is not None: with open(info_path, "rb") as f: - infos[dependency_stage_id] = pickle.load(f) + infos[dependency_hash] = pickle.load(f) # Prepare cache path - cache_id = cache_ids[stage_id] + cache_id = cache_ids[hash] cache_path = get_cache_directory_path(working_directory, cache_id) if not working_directory is None: if os.path.exists(cache_path): rmtree(cache_path) os.mkdir(cache_path) + context = ExecuteContext(stage["config"], stage["required_stages"], stage["aliases"], working_directory, stage["dependencies"], cache_path, pipeline_config, logger, cache, infos, cache_ids) result = stage["wrapper"].execute(context) - if stage_id in required_hashes: - results[required_hashes.index(stage_id)] = result + if hash in required_hashes: + results[required_hashes.index(hash)] = result - cache[stage_id] = result + cache[hash] = result if working_directory is not None: with open(get_cache_file_path(working_directory, cache_id), "wb+") as f: - logger.info("Writing cache for %s" % stage_id) + logger.info("Writing cache for %s" % hash) pickle.dump(result, f, protocol=4) - # print(f"{stage_id}: {time.time_ns()}") with open(get_info_path(working_directory, cache_id), "wb+") as f: - logger.info("Writing info for %s" % stage_id) + logger.info("Writing info for %s" % hash) pickle.dump(context.stage_info, f, protocol=4) - infos[stage_id] = context.stage_info + infos[hash] = context.stage_info # Clear cache for ephemeral stages if they are no longer needed if not working_directory is None: - for dependency_stage_id in stage["dependencies"]: - if dependency_stage_id in ephemeral_counts: - ephemeral_counts[dependency_stage_id] -= 1 + for dependency_hash in stage["dependencies"]: + if dependency_hash in ephemeral_counts: + ephemeral_counts[dependency_hash] -= 1 - if ephemeral_counts[dependency_stage_id] == 0: - cache_directory_path = get_cache_directory_path(working_directory, cache_ids[dependency_stage_id]) - cache_file_path = get_cache_file_path(working_directory, cache_ids[dependency_stage_id]) + if ephemeral_counts[dependency_hash] == 0: + cache_directory_path = get_cache_directory_path(working_directory, cache_ids[dependency_hash]) + cache_file_path = get_cache_file_path(working_directory, cache_ids[dependency_hash]) rmtree(cache_directory_path) os.remove(cache_file_path) - logger.info("Removed ephemeral %s." % dependency_stage_id) - del ephemeral_counts[dependency_stage_id] + logger.info("Removed ephemeral %s." % dependency_hash) + del ephemeral_counts[dependency_hash] - logger.info("Finished running %s." % stage_id) + logger.info("Finished running %s." % hash) progress += 1 logger.info("Pipeline progress: %d/%d (%.2f%%)" % ( @@ -816,22 +824,22 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non if not rerun_required: # Load remaining previously cached results - for stage_id in required_hashes: - if results[required_hashes.index(stage_id)] is None: - with open(get_cache_file_path(working_directory, cache_ids[stage_id]), "rb") as f: - logger.info("Loading cache for %s ..." % stage_id) - results[required_hashes.index(stage_id)] = pickle.load(f) + for hash in required_hashes: + if results[required_hashes.index(hash)] is None: + with open(get_cache_file_path(working_directory, cache_ids[hash]), "rb") as f: + logger.info("Loading cache for %s ..." % hash) + results[required_hashes.index(hash)] = pickle.load(f) if verbose: - flattened_infos = {} + info = {} - for stage_id in infos.keys(): - flattened_infos.update(infos[stage_id]) + for hash in infos.keys(): + info.update(infos[hash]) return { "results": results, "stale": stale_hashes, - "info": flattened_infos, + "info": info, "flowchart": node_link_data(flowchart) } else: From 79a8648e3090d8cbe49cbf63c35a84c4fd81d4b9 Mon Sep 17 00:00:00 2001 From: Aina Date: Wed, 22 Mar 2023 14:51:01 +0100 Subject: [PATCH 07/22] add sleep between runs for devalidation by parent --- tests/test_devalidate.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tests/test_devalidate.py b/tests/test_devalidate.py index e9d2966..457dec8 100644 --- a/tests/test_devalidate.py +++ b/tests/test_devalidate.py @@ -1,5 +1,6 @@ import synpp from pytest import raises +import time def test_devalidate_by_config(tmpdir): working_directory = tmpdir.mkdir("sub") @@ -63,6 +64,8 @@ def test_devalidate_by_parent(tmpdir): assert "tests.fixtures.devalidation.B__42b7b4f2921788ea14dac5566e6f06d0" in result["stale"] assert "tests.fixtures.devalidation.C__42b7b4f2921788ea14dac5566e6f06d0" in result["stale"] + time.sleep(0.1) + result = synpp.run([{ "descriptor": "tests.fixtures.devalidation.C" }], config = { "a": 1 }, working_directory = working_directory, verbose = True) @@ -72,6 +75,8 @@ def test_devalidate_by_parent(tmpdir): assert not "tests.fixtures.devalidation.B__42b7b4f2921788ea14dac5566e6f06d0" in result["stale"] assert "tests.fixtures.devalidation.C__42b7b4f2921788ea14dac5566e6f06d0" in result["stale"] + time.sleep(0.1) + result = synpp.run([{ "descriptor": "tests.fixtures.devalidation.A2" }], config = { "a": 1 }, working_directory = working_directory, verbose = True) @@ -81,6 +86,8 @@ def test_devalidate_by_parent(tmpdir): assert not "tests.fixtures.devalidation.B__42b7b4f2921788ea14dac5566e6f06d0" in result["stale"] assert not "tests.fixtures.devalidation.C__42b7b4f2921788ea14dac5566e6f06d0" in result["stale"] + time.sleep(0.1) + result = synpp.run([{ "descriptor": "tests.fixtures.devalidation.C" }], config = { "a": 1 }, working_directory = working_directory, verbose = True) From d7393b4708189f469a9108a0e33e0ccc7381a0ce Mon Sep 17 00:00:00 2001 From: Aina Date: Fri, 7 Apr 2023 12:37:52 +0200 Subject: [PATCH 08/22] Remove redundant dictionary and set. --- src/synpp/pipeline.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/synpp/pipeline.py b/src/synpp/pipeline.py index 254e36e..0521ad1 100644 --- a/src/synpp/pipeline.py +++ b/src/synpp/pipeline.py @@ -648,16 +648,14 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non source_codes[hash] += registry[hash]["wrapper"].source_code # Check where cache is available - cache_available = set() - stored_validation_tokens = {} + cache_available = {} if not working_directory is None: for hash in sorted_hashes: prefix = get_cache_prefix(hash, source_codes[hash]) prefixed = [filename[:-2] for filename in os.listdir(working_directory) if filename.startswith(prefix) and filename.endswith(".p")] if prefixed: - stored_validation_tokens[hash] = [filename.split("__")[-1] for filename in prefixed] - cache_available.add(hash) + cache_available[hash] = [filename.split("__")[-1] for filename in prefixed] registry[hash]["ephemeral"] = False # Set up ephemeral stage counts @@ -692,7 +690,7 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non # 4.8) Manually devalidate stages for hash in sorted_cached_hashes: - if hash not in stored_validation_tokens or current_validation_tokens[hash] not in stored_validation_tokens[hash]: + if hash not in cache_available or current_validation_tokens[hash] not in cache_available[hash]: print(f"Devalidation {hash}: Manually devalidate") stale_hashes.add(hash) From 41f50d2ef1b6043cf64d6ae34206355aa4712324 Mon Sep 17 00:00:00 2001 From: Aina Date: Fri, 7 Apr 2023 12:39:52 +0200 Subject: [PATCH 09/22] Remove useless shallow copy of sorted_hashes. --- src/synpp/pipeline.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/synpp/pipeline.py b/src/synpp/pipeline.py index 0521ad1..d611961 100644 --- a/src/synpp/pipeline.py +++ b/src/synpp/pipeline.py @@ -672,7 +672,6 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non ephemeral_counts[hash] += 1 # 4) Devalidate stages - sorted_cached_hashes = sorted_hashes stale_hashes = set() # Get current validation tokens @@ -682,14 +681,14 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non registry[stage_id]["wrapper"].validate( ValidateContext(registry[stage_id]["config"], get_cache_directory_path(working_directory, stage_id)) ) - ) for stage_id in sorted_cached_hashes + ) for stage_id in sorted_hashes } # Cache mapper between stage id and cache id. - cache_ids = {stage_id: get_cache_id(stage_id, source_codes[stage_id], current_validation_tokens[stage_id]) for stage_id in sorted_cached_hashes} + cache_ids = {stage_id: get_cache_id(stage_id, source_codes[stage_id], current_validation_tokens[stage_id]) for stage_id in sorted_hashes} # 4.8) Manually devalidate stages - for hash in sorted_cached_hashes: + for hash in sorted_hashes: if hash not in cache_available or current_validation_tokens[hash] not in cache_available[hash]: print(f"Devalidation {hash}: Manually devalidate") stale_hashes.add(hash) @@ -700,14 +699,14 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non stale_hashes.update(required_hashes) # 4.5) Devalidate if cache is not existant - for hash in sorted_cached_hashes: + for hash in sorted_hashes: if not hash in cache_available: print(f"Devalidation {hash}: No cache") stale_hashes.add(hash) # 4.6) Devalidate if parent has been updated if working_directory is not None: - for hash in sorted_cached_hashes: + for hash in sorted_hashes: if not hash in stale_hashes: ctime = os.stat(get_cache_file_path(working_directory, cache_ids[hash])).st_mtime_ns # print(f"Cached {stage_id}: {ctime}") From 6f2c07542fe0977b0762eb5360f56fb3770ede27 Mon Sep 17 00:00:00 2001 From: Aina Date: Fri, 7 Apr 2023 14:49:14 +0200 Subject: [PATCH 10/22] - fix: keep cache only when no working_directory - create mapping for paths --- src/synpp/pipeline.py | 36 +++++++++++++++++------------------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/src/synpp/pipeline.py b/src/synpp/pipeline.py index d611961..7284a7d 100644 --- a/src/synpp/pipeline.py +++ b/src/synpp/pipeline.py @@ -686,6 +686,8 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non # Cache mapper between stage id and cache id. cache_ids = {stage_id: get_cache_id(stage_id, source_codes[stage_id], current_validation_tokens[stage_id]) for stage_id in sorted_hashes} + file_cache_paths = {stage_id: get_cache_file_path(working_directory, cache_id) for stage_id, cache_id in cache_ids.items()} + dir_cache_paths = {stage_id: get_cache_directory_path(working_directory, cache_id) for stage_id, cache_id in cache_ids.items()} # 4.8) Manually devalidate stages for hash in sorted_hashes: @@ -708,10 +710,10 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non if working_directory is not None: for hash in sorted_hashes: if not hash in stale_hashes: - ctime = os.stat(get_cache_file_path(working_directory, cache_ids[hash])).st_mtime_ns + ctime = os.stat(file_cache_paths[hash]).st_mtime_ns # print(f"Cached {stage_id}: {ctime}") for dependency_hash in nx.ancestors(graph, hash): - dependency_ctime = os.stat(get_cache_file_path(working_directory, cache_ids[dependency_hash])).st_mtime_ns + dependency_ctime = os.stat(file_cache_paths[dependency_hash]).st_mtime_ns if dependency_ctime > ctime: print(f"Devalidation {hash}: Parent {dependency_hash} updated ({dependency_ctime} > {ctime})") stale_hashes.add(hash) @@ -766,32 +768,28 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non #stage_dependencies = for dependency_hash in stage["dependencies"]: - info_path = get_info_path(working_directory, cache_ids[dependency_hash]) if dependency_hash not in infos and working_directory is not None: - with open(info_path, "rb") as f: + with open(get_info_path(working_directory, cache_ids[dependency_hash]), "rb") as f: infos[dependency_hash] = pickle.load(f) - # Prepare cache path - cache_id = cache_ids[hash] - cache_path = get_cache_directory_path(working_directory, cache_id) - if not working_directory is None: - if os.path.exists(cache_path): - rmtree(cache_path) - os.mkdir(cache_path) + if os.path.exists(dir_cache_paths[hash]): + rmtree(dir_cache_paths[hash]) + os.mkdir(dir_cache_paths[hash]) - context = ExecuteContext(stage["config"], stage["required_stages"], stage["aliases"], working_directory, stage["dependencies"], cache_path, pipeline_config, logger, cache, infos, cache_ids) + context = ExecuteContext(stage["config"], stage["required_stages"], stage["aliases"], working_directory, stage["dependencies"], dir_cache_paths[hash], pipeline_config, logger, cache, infos, cache_ids) result = stage["wrapper"].execute(context) if hash in required_hashes: results[required_hashes.index(hash)] = result - cache[hash] = result - if working_directory is not None: - with open(get_cache_file_path(working_directory, cache_id), "wb+") as f: + if working_directory is None: + cache[hash] = result + else: + with open(file_cache_paths[hash], "wb+") as f: logger.info("Writing cache for %s" % hash) pickle.dump(result, f, protocol=4) - with open(get_info_path(working_directory, cache_id), "wb+") as f: + with open(get_info_path(working_directory, cache_ids[hash]), "wb+") as f: logger.info("Writing info for %s" % hash) pickle.dump(context.stage_info, f, protocol=4) infos[hash] = context.stage_info @@ -803,8 +801,8 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non ephemeral_counts[dependency_hash] -= 1 if ephemeral_counts[dependency_hash] == 0: - cache_directory_path = get_cache_directory_path(working_directory, cache_ids[dependency_hash]) - cache_file_path = get_cache_file_path(working_directory, cache_ids[dependency_hash]) + cache_directory_path = dir_cache_paths[dependency_hash] + cache_file_path = file_cache_paths[dependency_hash] rmtree(cache_directory_path) os.remove(cache_file_path) @@ -823,7 +821,7 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non # Load remaining previously cached results for hash in required_hashes: if results[required_hashes.index(hash)] is None: - with open(get_cache_file_path(working_directory, cache_ids[hash]), "rb") as f: + with open(file_cache_paths[hash], "rb") as f: logger.info("Loading cache for %s ..." % hash) results[required_hashes.index(hash)] = pickle.load(f) From 5bd28e0f64ba442c3a9765833f2018bc2cf7ee98 Mon Sep 17 00:00:00 2001 From: Aina Date: Fri, 7 Apr 2023 15:03:05 +0200 Subject: [PATCH 11/22] cleaning --- src/synpp/pipeline.py | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/src/synpp/pipeline.py b/src/synpp/pipeline.py index 7284a7d..58d0fe9 100644 --- a/src/synpp/pipeline.py +++ b/src/synpp/pipeline.py @@ -654,6 +654,7 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non for hash in sorted_hashes: prefix = get_cache_prefix(hash, source_codes[hash]) prefixed = [filename[:-2] for filename in os.listdir(working_directory) if filename.startswith(prefix) and filename.endswith(".p")] + if prefixed: cache_available[hash] = [filename.split("__")[-1] for filename in prefixed] registry[hash]["ephemeral"] = False @@ -692,18 +693,15 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non # 4.8) Manually devalidate stages for hash in sorted_hashes: if hash not in cache_available or current_validation_tokens[hash] not in cache_available[hash]: - print(f"Devalidation {hash}: Manually devalidate") stale_hashes.add(hash) # 4.1) Devalidate if they are required (optional, otherwise will reload from cache) if rerun_required: - print(f"Devalidation {required_hashes}: Requirement") stale_hashes.update(required_hashes) # 4.5) Devalidate if cache is not existant for hash in sorted_hashes: if not hash in cache_available: - print(f"Devalidation {hash}: No cache") stale_hashes.add(hash) # 4.6) Devalidate if parent has been updated @@ -711,15 +709,11 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non for hash in sorted_hashes: if not hash in stale_hashes: ctime = os.stat(file_cache_paths[hash]).st_mtime_ns - # print(f"Cached {stage_id}: {ctime}") for dependency_hash in nx.ancestors(graph, hash): dependency_ctime = os.stat(file_cache_paths[dependency_hash]).st_mtime_ns if dependency_ctime > ctime: - print(f"Devalidation {hash}: Parent {dependency_hash} updated ({dependency_ctime} > {ctime})") stale_hashes.add(hash) break - if dependency_ctime == ctime: - print(f"{hash} and {dependency_hash}: {dependency_ctime} == {ctime}") # 4.9) Devalidate descendants of devalidated stages @@ -772,12 +766,15 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non with open(get_info_path(working_directory, cache_ids[dependency_hash]), "rb") as f: infos[dependency_hash] = pickle.load(f) + # Prepare cache path + cache_path = dir_cache_paths[hash] + if not working_directory is None: - if os.path.exists(dir_cache_paths[hash]): - rmtree(dir_cache_paths[hash]) - os.mkdir(dir_cache_paths[hash]) + if os.path.exists(cache_path): + rmtree(cache_path) + os.mkdir(cache_path) - context = ExecuteContext(stage["config"], stage["required_stages"], stage["aliases"], working_directory, stage["dependencies"], dir_cache_paths[hash], pipeline_config, logger, cache, infos, cache_ids) + context = ExecuteContext(stage["config"], stage["required_stages"], stage["aliases"], working_directory, stage["dependencies"], cache_path, pipeline_config, logger, cache, infos, cache_ids) result = stage["wrapper"].execute(context) if hash in required_hashes: From d75dfba2f660e67fadf2f8ae320cc7c9ecfee382 Mon Sep 17 00:00:00 2001 From: Aina Date: Tue, 18 Apr 2023 16:14:12 +0200 Subject: [PATCH 12/22] use stage digest instead of stage source code --- src/synpp/pipeline.py | 47 ++++++++++++++++++++++--------------------- 1 file changed, 24 insertions(+), 23 deletions(-) diff --git a/src/synpp/pipeline.py b/src/synpp/pipeline.py index 58d0fe9..4bc073e 100644 --- a/src/synpp/pipeline.py +++ b/src/synpp/pipeline.py @@ -67,7 +67,7 @@ class StageInstance: def __init__(self, instance, name, module_hash): self.instance = instance self.name = name - self.source_code = module_hash + self.module_hash = module_hash if not hasattr(self.instance, "execute"): raise RuntimeError("Stage %s does not have execute method" % self.name) @@ -88,8 +88,11 @@ def execute(self, context): return self.instance.execute(context) -def get_source_code(descriptor): - return inspect.getsource(descriptor) +def get_stage_hash(descriptor): + source = inspect.getsource(descriptor) + hash = hashlib.md5() + hash.update(source.encode("utf-8")) + return hash.hexdigest() def synpp_import_module(name, package=None, externals={}): @@ -130,25 +133,25 @@ def resolve_stage(descriptor, externals: dict = {}, aliases: dict = {}): return None # definitely not a stage if inspect.ismodule(descriptor): - source_code = get_source_code(descriptor) - return StageInstance(descriptor, descriptor.__name__, source_code) + stage_hash = get_stage_hash(descriptor) + return StageInstance(descriptor, descriptor.__name__, stage_hash) if inspect.isclass(descriptor): - source_code = get_source_code(descriptor) - return StageInstance(descriptor(), "%s.%s" % (descriptor.__module__, descriptor.__name__), source_code) + stage_hash = get_stage_hash(descriptor) + return StageInstance(descriptor(), "%s.%s" % (descriptor.__module__, descriptor.__name__), stage_hash) if inspect.isfunction(descriptor): if not hasattr(descriptor, 'stage_params'): raise PipelineError("Functions need to be decorated with @synpp.stage in order to be used in the pipeline.") function_stage = DecoratedStage(execute_func=descriptor, stage_params=descriptor.stage_params) - source_code = get_source_code(descriptor) - return StageInstance(function_stage, "%s.%s" % (descriptor.__module__, descriptor.__name__), source_code) + stage_hash = get_stage_hash(descriptor) + return StageInstance(function_stage, "%s.%s" % (descriptor.__module__, descriptor.__name__), stage_hash) if hasattr(descriptor, 'execute'): # Last option: arbitrary object which looks like a stage clazz = descriptor.__class__ - source_code = get_source_code(clazz) - return StageInstance(descriptor, "%s.%s" % (clazz.__module__, clazz.__name__), source_code) + stage_hash = get_stage_hash(clazz) + return StageInstance(descriptor, "%s.%s" % (clazz.__module__, clazz.__name__), stage_hash) # couldn't resolve stage (this is something else) return None @@ -212,19 +215,16 @@ def configure_name(name, config): def hash_name(name, config): if len(config) > 0: - encoded_config = json.dumps(config, sort_keys = True).encode("utf-8") - config_digest = hashlib.md5(encoded_config).hexdigest() - name += "__" + config_digest - return name + hash = hashlib.md5() + hash.update(json.dumps(config, sort_keys = True).encode("utf-8")) + return "%s__%s" % (name, hash.hexdigest()) + else: + return name def get_cache_prefix(stage_id, source_code): cache_prefix = stage_id + "__" + hashlib.md5(source_code.encode()).hexdigest() return cache_prefix -def get_cache_id(stage_id, source_code, validation_token): - cache_id = get_cache_prefix(stage_id, source_code) + "__" + str(validation_token) - return cache_id - def get_cache_directory_path(working_directory, cache_id): return "%s/%s.cache" % (working_directory, cache_id) @@ -573,6 +573,7 @@ def process_stages(definitions, global_config, externals={}, aliases={}): return registry + def run(definitions, config = {}, working_directory = None, flowchart_path = None, dryrun = False, verbose = False, logger = logging.getLogger("synpp"), rerun_required=True, ensure_working_directory=False, externals = {}, aliases = {}): @@ -640,12 +641,12 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non sorted_hashes = list(nx.topological_sort(graph)) # Compute cache prefixes by appending source code digest - source_codes = dict() + source_codes = {} for hash in sorted_hashes: source_codes[hash] = "" for dependency_hash in nx.ancestors(graph, hash): - source_codes[hash] += registry[dependency_hash]["wrapper"].source_code - source_codes[hash] += registry[hash]["wrapper"].source_code + source_codes[hash] += registry[dependency_hash]["wrapper"].module_hash + source_codes[hash] += registry[hash]["wrapper"].module_hash # Check where cache is available cache_available = {} @@ -686,7 +687,7 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non } # Cache mapper between stage id and cache id. - cache_ids = {stage_id: get_cache_id(stage_id, source_codes[stage_id], current_validation_tokens[stage_id]) for stage_id in sorted_hashes} + cache_ids = {stage_id: get_cache_prefix(stage_id, source_codes[stage_id]) + "__" + str(current_validation_tokens[stage_id]) for stage_id in sorted_hashes} file_cache_paths = {stage_id: get_cache_file_path(working_directory, cache_id) for stage_id, cache_id in cache_ids.items()} dir_cache_paths = {stage_id: get_cache_directory_path(working_directory, cache_id) for stage_id, cache_id in cache_ids.items()} From 071d78d54b13c645983bca984804fbf1f1fdeefa Mon Sep 17 00:00:00 2001 From: Aina Date: Tue, 18 Apr 2023 16:22:41 +0200 Subject: [PATCH 13/22] Clarify variable names and comment --- src/synpp/pipeline.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/synpp/pipeline.py b/src/synpp/pipeline.py index 4bc073e..09403ff 100644 --- a/src/synpp/pipeline.py +++ b/src/synpp/pipeline.py @@ -640,20 +640,20 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non sorted_hashes = list(nx.topological_sort(graph)) - # Compute cache prefixes by appending source code digest - source_codes = {} + # Concatenate source digest of dependencies + source_digests = {} for hash in sorted_hashes: - source_codes[hash] = "" + source_digests[hash] = "" for dependency_hash in nx.ancestors(graph, hash): - source_codes[hash] += registry[dependency_hash]["wrapper"].module_hash - source_codes[hash] += registry[hash]["wrapper"].module_hash + source_digests[hash] += registry[dependency_hash]["wrapper"].module_hash + source_digests[hash] += registry[hash]["wrapper"].module_hash # Check where cache is available cache_available = {} if not working_directory is None: for hash in sorted_hashes: - prefix = get_cache_prefix(hash, source_codes[hash]) + prefix = get_cache_prefix(hash, source_digests[hash]) prefixed = [filename[:-2] for filename in os.listdir(working_directory) if filename.startswith(prefix) and filename.endswith(".p")] if prefixed: @@ -687,7 +687,7 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non } # Cache mapper between stage id and cache id. - cache_ids = {stage_id: get_cache_prefix(stage_id, source_codes[stage_id]) + "__" + str(current_validation_tokens[stage_id]) for stage_id in sorted_hashes} + cache_ids = {stage_id: get_cache_prefix(stage_id, source_digests[stage_id]) + "__" + str(current_validation_tokens[stage_id]) for stage_id in sorted_hashes} file_cache_paths = {stage_id: get_cache_file_path(working_directory, cache_id) for stage_id, cache_id in cache_ids.items()} dir_cache_paths = {stage_id: get_cache_directory_path(working_directory, cache_id) for stage_id, cache_id in cache_ids.items()} From bfd61aa840e0a150f05d1bbdc404a9a91d52cc49 Mon Sep 17 00:00:00 2001 From: Aina Date: Tue, 18 Apr 2023 16:25:18 +0200 Subject: [PATCH 14/22] remove empty lines --- tests/test_devalidate.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/test_devalidate.py b/tests/test_devalidate.py index 457dec8..5f49a4a 100644 --- a/tests/test_devalidate.py +++ b/tests/test_devalidate.py @@ -65,7 +65,6 @@ def test_devalidate_by_parent(tmpdir): assert "tests.fixtures.devalidation.C__42b7b4f2921788ea14dac5566e6f06d0" in result["stale"] time.sleep(0.1) - result = synpp.run([{ "descriptor": "tests.fixtures.devalidation.C" }], config = { "a": 1 }, working_directory = working_directory, verbose = True) @@ -76,7 +75,6 @@ def test_devalidate_by_parent(tmpdir): assert "tests.fixtures.devalidation.C__42b7b4f2921788ea14dac5566e6f06d0" in result["stale"] time.sleep(0.1) - result = synpp.run([{ "descriptor": "tests.fixtures.devalidation.A2" }], config = { "a": 1 }, working_directory = working_directory, verbose = True) @@ -87,7 +85,6 @@ def test_devalidate_by_parent(tmpdir): assert not "tests.fixtures.devalidation.C__42b7b4f2921788ea14dac5566e6f06d0" in result["stale"] time.sleep(0.1) - result = synpp.run([{ "descriptor": "tests.fixtures.devalidation.C" }], config = { "a": 1 }, working_directory = working_directory, verbose = True) From c8e382b3c9ba34035ea27cf27c546467fad5dd48 Mon Sep 17 00:00:00 2001 From: Aina Date: Tue, 16 May 2023 18:45:28 +0200 Subject: [PATCH 15/22] fix manual devalidation (with token) behavior --- src/synpp/pipeline.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/synpp/pipeline.py b/src/synpp/pipeline.py index 09403ff..d3d2810 100644 --- a/src/synpp/pipeline.py +++ b/src/synpp/pipeline.py @@ -693,7 +693,7 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non # 4.8) Manually devalidate stages for hash in sorted_hashes: - if hash not in cache_available or current_validation_tokens[hash] not in cache_available[hash]: + if hash in cache_available and current_validation_tokens[hash] not in cache_available[hash]: stale_hashes.add(hash) # 4.1) Devalidate if they are required (optional, otherwise will reload from cache) @@ -716,7 +716,6 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non stale_hashes.add(hash) break - # 4.9) Devalidate descendants of devalidated stages for hash in set(stale_hashes): for descendant_hash in nx.descendants(graph, hash): From d06b2948dc007369e188a8caa99855528af72e42 Mon Sep 17 00:00:00 2001 From: Aina Date: Tue, 16 May 2023 18:57:38 +0200 Subject: [PATCH 16/22] devalidate if parent has been updated: check dependency only if not stale (it will devalidate at "Devalidate descendants of devalidated stages") --- src/synpp/pipeline.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/synpp/pipeline.py b/src/synpp/pipeline.py index d3d2810..5c59586 100644 --- a/src/synpp/pipeline.py +++ b/src/synpp/pipeline.py @@ -711,10 +711,11 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non if not hash in stale_hashes: ctime = os.stat(file_cache_paths[hash]).st_mtime_ns for dependency_hash in nx.ancestors(graph, hash): - dependency_ctime = os.stat(file_cache_paths[dependency_hash]).st_mtime_ns - if dependency_ctime > ctime: - stale_hashes.add(hash) - break + if dependency_hash not in stale_hashes: + dependency_ctime = os.stat(file_cache_paths[dependency_hash]).st_mtime_ns + if dependency_ctime > ctime: + stale_hashes.add(hash) + break # 4.9) Devalidate descendants of devalidated stages for hash in set(stale_hashes): From 24933f1020367268037571fa6563e850abbd383e Mon Sep 17 00:00:00 2001 From: Aina Date: Tue, 16 May 2023 19:53:13 +0200 Subject: [PATCH 17/22] clean log --- src/synpp/pipeline.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/synpp/pipeline.py b/src/synpp/pipeline.py index 5c59586..64ffee7 100644 --- a/src/synpp/pipeline.py +++ b/src/synpp/pipeline.py @@ -737,8 +737,6 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non logger.info("Devalidating %d stages:" % len(stale_hashes)) for hash in stale_hashes: logger.info("- %s" % hash) - logger.info("Successfully reset meta data") - # 6) Execute stages results = [None] * len(definitions) cache = {} From efae443f64a2955fdda842ea766ed4b7dacb4c5c Mon Sep 17 00:00:00 2001 From: Aina Date: Tue, 16 May 2023 19:53:49 +0200 Subject: [PATCH 18/22] separate with and without wd behaviors --- src/synpp/pipeline.py | 57 ++++++++++++++++++++++++------------------- 1 file changed, 32 insertions(+), 25 deletions(-) diff --git a/src/synpp/pipeline.py b/src/synpp/pipeline.py index 64ffee7..a9c8536 100644 --- a/src/synpp/pipeline.py +++ b/src/synpp/pipeline.py @@ -674,6 +674,7 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non ephemeral_counts[hash] += 1 # 4) Devalidate stages + sorted_cached_hashes = sorted_hashes - ephemeral_counts.keys() stale_hashes = set() # Get current validation tokens @@ -691,23 +692,29 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non file_cache_paths = {stage_id: get_cache_file_path(working_directory, cache_id) for stage_id, cache_id in cache_ids.items()} dir_cache_paths = {stage_id: get_cache_directory_path(working_directory, cache_id) for stage_id, cache_id in cache_ids.items()} - # 4.8) Manually devalidate stages - for hash in sorted_hashes: - if hash in cache_available and current_validation_tokens[hash] not in cache_available[hash]: - stale_hashes.add(hash) - # 4.1) Devalidate if they are required (optional, otherwise will reload from cache) if rerun_required: stale_hashes.update(required_hashes) - # 4.5) Devalidate if cache is not existant - for hash in sorted_hashes: - if not hash in cache_available: - stale_hashes.add(hash) + if working_directory is None: + # If no working_directory, devalidate all dependencies + for hash in list(stale_hashes): + for dependency_hash in nx.ancestors(graph, hash): + stale_hashes.add(dependency_hash) - # 4.6) Devalidate if parent has been updated - if working_directory is not None: - for hash in sorted_hashes: + else: + # 4.5) Devalidate if cache is not existant + for hash in sorted_cached_hashes: + if not hash in cache_available: + stale_hashes.add(hash) + + # 4.8) Manually devalidate stages + for hash in sorted_cached_hashes: + if hash in cache_available and current_validation_tokens[hash] not in cache_available[hash]: + stale_hashes.add(hash) + + # 4.6) Devalidate if parent has been updated + for hash in sorted_cached_hashes: if not hash in stale_hashes: ctime = os.stat(file_cache_paths[hash]).st_mtime_ns for dependency_hash in nx.ancestors(graph, hash): @@ -717,22 +724,22 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non stale_hashes.add(hash) break - # 4.9) Devalidate descendants of devalidated stages - for hash in set(stale_hashes): - for descendant_hash in nx.descendants(graph, hash): - if not descendant_hash in stale_hashes: - stale_hashes.add(descendant_hash) + # 4.9) Devalidate descendants of devalidated stages + for hash in set(stale_hashes): + for descendant_hash in nx.descendants(graph, hash): + if not descendant_hash in stale_hashes: + stale_hashes.add(descendant_hash) - # 4.10) Devalidate ephemeral stages if necessary - pending = set(stale_hashes) + # 4.10) Devalidate ephemeral stages if necessary + pending = set(stale_hashes) - while len(pending) > 0: - for dependency_hash in registry[pending.pop()]["dependencies"]: - if registry[dependency_hash]["ephemeral"]: - if not dependency_hash in stale_hashes: - pending.add(dependency_hash) + while len(pending) > 0: + for dependency_hash in registry[pending.pop()]["dependencies"]: + if registry[dependency_hash]["ephemeral"]: + if not dependency_hash in stale_hashes: + pending.add(dependency_hash) - stale_hashes.add(dependency_hash) + stale_hashes.add(dependency_hash) logger.info("Devalidating %d stages:" % len(stale_hashes)) for hash in stale_hashes: logger.info("- %s" % hash) From df37cbb6145b047070f9cd4c80445eedf2a7a993 Mon Sep 17 00:00:00 2001 From: Aina Date: Tue, 16 May 2023 20:12:30 +0200 Subject: [PATCH 19/22] fix ephemeral behavior and add corresponding test --- src/synpp/pipeline.py | 2 +- tests/fixtures/ephemeral/E.py | 5 +++++ tests/test_ephemeral.py | 21 +++++++++++++++++++++ 3 files changed, 27 insertions(+), 1 deletion(-) create mode 100644 tests/fixtures/ephemeral/E.py diff --git a/src/synpp/pipeline.py b/src/synpp/pipeline.py index a9c8536..84545b7 100644 --- a/src/synpp/pipeline.py +++ b/src/synpp/pipeline.py @@ -718,7 +718,7 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non if not hash in stale_hashes: ctime = os.stat(file_cache_paths[hash]).st_mtime_ns for dependency_hash in nx.ancestors(graph, hash): - if dependency_hash not in stale_hashes: + if dependency_hash not in stale_hashes and dependency_hash in cache_available: dependency_ctime = os.stat(file_cache_paths[dependency_hash]).st_mtime_ns if dependency_ctime > ctime: stale_hashes.add(hash) diff --git a/tests/fixtures/ephemeral/E.py b/tests/fixtures/ephemeral/E.py new file mode 100644 index 0000000..24bd06a --- /dev/null +++ b/tests/fixtures/ephemeral/E.py @@ -0,0 +1,5 @@ +def configure(context): + context.stage("tests.fixtures.ephemeral.D") + +def execute(context): + pass diff --git a/tests/test_ephemeral.py b/tests/test_ephemeral.py index dfea774..db0012a 100644 --- a/tests/test_ephemeral.py +++ b/tests/test_ephemeral.py @@ -85,3 +85,24 @@ def test_ephemeral_BD(tmpdir): assert not "tests.fixtures.ephemeral.B" in result["stale"] assert "tests.fixtures.ephemeral.C" in result["stale"] assert "tests.fixtures.ephemeral.D" in result["stale"] + +def test_ephemeral_E(tmpdir): + working_directory = tmpdir.mkdir("cache") + + result = synpp.run([ + { "descriptor": "tests.fixtures.ephemeral.E" }, + ], working_directory = working_directory, verbose = True) + + assert "tests.fixtures.ephemeral.A" in result["stale"] + assert "tests.fixtures.ephemeral.C" in result["stale"] + assert "tests.fixtures.ephemeral.D" in result["stale"] + assert "tests.fixtures.ephemeral.E" in result["stale"] + + result = synpp.run([ + { "descriptor": "tests.fixtures.ephemeral.E" }, + ], working_directory = working_directory, verbose = True) + + assert not "tests.fixtures.ephemeral.A" in result["stale"] + assert not "tests.fixtures.ephemeral.C" in result["stale"] + assert not "tests.fixtures.ephemeral.D" in result["stale"] + assert "tests.fixtures.ephemeral.E" in result["stale"] \ No newline at end of file From ade4a5666a0949dc0df765290ed080bcdcb543b0 Mon Sep 17 00:00:00 2001 From: Aina Date: Tue, 16 May 2023 23:21:05 +0200 Subject: [PATCH 20/22] fix hash code (still test to do) --- src/synpp/pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/synpp/pipeline.py b/src/synpp/pipeline.py index 84545b7..4b28655 100644 --- a/src/synpp/pipeline.py +++ b/src/synpp/pipeline.py @@ -644,7 +644,7 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non source_digests = {} for hash in sorted_hashes: source_digests[hash] = "" - for dependency_hash in nx.ancestors(graph, hash): + for dependency_hash in sorted(nx.ancestors(graph, hash)): source_digests[hash] += registry[dependency_hash]["wrapper"].module_hash source_digests[hash] += registry[hash]["wrapper"].module_hash From e4e26b406b55084fa3d0fd532e695c72f7017589 Mon Sep 17 00:00:00 2001 From: Aina Date: Wed, 17 May 2023 10:40:16 +0200 Subject: [PATCH 21/22] add test for devalidation stability between runs can we make this test better? --- tests/fixtures/devalidation/E.py | 11 +++++++++++ tests/fixtures/devalidation/E1.py | 5 +++++ tests/fixtures/devalidation/E2.py | 5 +++++ tests/test_devalidate.py | 33 +++++++++++++++++++++++++++++++ 4 files changed, 54 insertions(+) create mode 100644 tests/fixtures/devalidation/E.py create mode 100644 tests/fixtures/devalidation/E1.py create mode 100644 tests/fixtures/devalidation/E2.py diff --git a/tests/fixtures/devalidation/E.py b/tests/fixtures/devalidation/E.py new file mode 100644 index 0000000..01259f7 --- /dev/null +++ b/tests/fixtures/devalidation/E.py @@ -0,0 +1,11 @@ +def configure(context): + context.stage("tests.fixtures.devalidation.D", config={"d": 10}, alias="d") + context.stage("tests.fixtures.devalidation.C") + context.stage("tests.fixtures.devalidation.E1") + context.stage("tests.fixtures.devalidation.E2") + +def execute(context): + context.stage("tests.fixtures.devalidation.E1") + context.stage("tests.fixtures.devalidation.E2") + context.stage("d") + return context.stage("tests.fixtures.devalidation.C") diff --git a/tests/fixtures/devalidation/E1.py b/tests/fixtures/devalidation/E1.py new file mode 100644 index 0000000..1ae5e75 --- /dev/null +++ b/tests/fixtures/devalidation/E1.py @@ -0,0 +1,5 @@ +def configure(context): + pass + +def execute(context): + return 20 diff --git a/tests/fixtures/devalidation/E2.py b/tests/fixtures/devalidation/E2.py new file mode 100644 index 0000000..9a65ecc --- /dev/null +++ b/tests/fixtures/devalidation/E2.py @@ -0,0 +1,5 @@ +def configure(context): + pass + +def execute(context): + return 40 diff --git a/tests/test_devalidate.py b/tests/test_devalidate.py index 5f49a4a..4bda5e0 100644 --- a/tests/test_devalidate.py +++ b/tests/test_devalidate.py @@ -117,6 +117,39 @@ def test_devalidate_descendants(tmpdir): assert "tests.fixtures.devalidation.B__42b7b4f2921788ea14dac5566e6f06d0" in result["stale"] assert "tests.fixtures.devalidation.C__42b7b4f2921788ea14dac5566e6f06d0" in result["stale"] +import os +import sys +def test_devalidation_stability(tmpdir): + temp_directory = tmpdir.mkdir("sub") + working_directory = os.path.join(temp_directory, "cache") + config_yml_path = os.path.join(temp_directory, "config.yml") + config_yml = f"""run: + - tests.fixtures.devalidation.E + +working_directory: {working_directory} + +config: + a: 1""" + + with open(config_yml_path, "w") as f: + f.write(config_yml) + + os.system(f"python -m synpp {config_yml_path}") + + result = synpp.run([{ + "descriptor": "tests.fixtures.devalidation.E" + }], config = { "a": 1 }, working_directory = working_directory, verbose = True) + + assert not "tests.fixtures.devalidation.A1__42b7b4f2921788ea14dac5566e6f06d0" in result["stale"] + assert not "tests.fixtures.devalidation.A2" in result["stale"] + assert not "tests.fixtures.devalidation.B__42b7b4f2921788ea14dac5566e6f06d0" in result["stale"] + assert not "tests.fixtures.devalidation.C__42b7b4f2921788ea14dac5566e6f06d0" in result["stale"] + assert "tests.fixtures.devalidation.E__42b7b4f2921788ea14dac5566e6f06d0" in result["stale"] + assert not "tests.fixtures.devalidation.E1" in result["stale"] + assert not "tests.fixtures.devalidation.E2" in result["stale"] + print("done") + + def test_devalidate_token(tmpdir): working_directory = tmpdir.mkdir("sub") path = "%s/test.fixture" % working_directory From 6c790dd78f172b6fd145ed833fd903930268f60a Mon Sep 17 00:00:00 2001 From: Aina Date: Wed, 17 May 2023 10:40:28 +0200 Subject: [PATCH 22/22] clean --- tests/test_devalidate.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_devalidate.py b/tests/test_devalidate.py index 4bda5e0..a2f871a 100644 --- a/tests/test_devalidate.py +++ b/tests/test_devalidate.py @@ -147,7 +147,6 @@ def test_devalidation_stability(tmpdir): assert "tests.fixtures.devalidation.E__42b7b4f2921788ea14dac5566e6f06d0" in result["stale"] assert not "tests.fixtures.devalidation.E1" in result["stale"] assert not "tests.fixtures.devalidation.E2" in result["stale"] - print("done") def test_devalidate_token(tmpdir):