diff --git a/src/synpp/pipeline.py b/src/synpp/pipeline.py index 031adf2..4b28655 100644 --- a/src/synpp/pipeline.py +++ b/src/synpp/pipeline.py @@ -221,6 +221,18 @@ def hash_name(name, config): else: return name +def get_cache_prefix(stage_id, source_code): + cache_prefix = stage_id + "__" + hashlib.md5(source_code.encode()).hexdigest() + return cache_prefix + +def get_cache_directory_path(working_directory, cache_id): + return "%s/%s.cache" % (working_directory, cache_id) + +def get_cache_file_path(working_directory, cache_id): + return "%s/%s.p" % (working_directory, cache_id) + +def get_info_path(working_directory, cache_id): + return "%s/%s.info" % (working_directory, cache_id) class ConfiguredStage: def __init__(self, instance, config, configuration_context): @@ -310,7 +322,7 @@ def path(self): class ExecuteContext(Context): - def __init__(self, required_config, required_stages, aliases, working_directory, dependencies, cache_path, pipeline_config, logger, cache, dependency_info): + def __init__(self, required_config, required_stages, aliases, working_directory, dependencies, cache_path, pipeline_config, logger, cache, dependency_info, cache_ids): self.required_config = required_config self.working_directory = working_directory self.dependencies = dependencies @@ -323,6 +335,7 @@ def __init__(self, required_config, required_stages, aliases, working_directory, self.dependency_info = dependency_info self.aliases = aliases self.required_stages = required_stages + self.cache_ids = cache_ids self.progress_context = None @@ -339,7 +352,7 @@ def stage(self, name, config = {}): return self.cache[dependency] else: if not dependency in self.dependency_cache: - with open("%s/%s.p" % (self.working_directory, dependency), "rb") as f: + with open(get_cache_file_path(self.working_directory, self.cache_ids[dependency]), "rb") as f: self.logger.info("Loading cache for %s ..." % dependency) self.dependency_cache[dependency] = pickle.load(f) @@ -353,7 +366,7 @@ def path(self, name = None, config = {}): return self.cache_path dependency = self._get_dependency({ "descriptor": name, "config": config }) - return "%s/%s.cache" % (self.working_directory, dependency) + return get_cache_directory_path(self.working_directory, self.cache_ids[dependency]) def set_info(self, name, value): self.stage_info[name] = value @@ -561,16 +574,6 @@ def process_stages(definitions, global_config, externals={}, aliases={}): return registry -def update_json(meta, working_directory): - if os.path.exists("%s/pipeline.json" % working_directory): - shutil.move("%s/pipeline.json" % working_directory, "%s/pipeline.json.bk" % working_directory) - - with open("%s/pipeline.json.new" % working_directory, "w+") as f: - json.dump(meta, f) - - shutil.move("%s/pipeline.json.new" % working_directory, "%s/pipeline.json" % working_directory) - - def run(definitions, config = {}, working_directory = None, flowchart_path = None, dryrun = False, verbose = False, logger = logging.getLogger("synpp"), rerun_required=True, ensure_working_directory=False, externals = {}, aliases = {}): @@ -637,16 +640,24 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non sorted_hashes = list(nx.topological_sort(graph)) + # Concatenate source digest of dependencies + source_digests = {} + for hash in sorted_hashes: + source_digests[hash] = "" + for dependency_hash in sorted(nx.ancestors(graph, hash)): + source_digests[hash] += registry[dependency_hash]["wrapper"].module_hash + source_digests[hash] += registry[hash]["wrapper"].module_hash + # Check where cache is available - cache_available = set() + cache_available = {} if not working_directory is None: for hash in sorted_hashes: - directory_path = "%s/%s.cache" % (working_directory, hash) - file_path = "%s/%s.p" % (working_directory, hash) + prefix = get_cache_prefix(hash, source_digests[hash]) + prefixed = [filename[:-2] for filename in os.listdir(working_directory) if filename.startswith(prefix) and filename.endswith(".p")] - if os.path.exists(directory_path) and os.path.exists(file_path): - cache_available.add(hash) + if prefixed: + cache_available[hash] = [filename.split("__")[-1] for filename in prefixed] registry[hash]["ephemeral"] = False # Set up ephemeral stage counts @@ -662,122 +673,84 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non ephemeral_counts[hash] += 1 - # 3) Load information about stages - meta = {} - - if not working_directory is None: - try: - with open("%s/pipeline.json" % working_directory) as f: - meta = json.load(f) - logger.info("Found pipeline metadata in %s/pipeline.json" % working_directory) - except FileNotFoundError: - logger.info("Did not find pipeline metadata in %s/pipeline.json" % working_directory) - # 4) Devalidate stages sorted_cached_hashes = sorted_hashes - ephemeral_counts.keys() stale_hashes = set() + # Get current validation tokens + current_validation_tokens = { + stage_id: + str( + registry[stage_id]["wrapper"].validate( + ValidateContext(registry[stage_id]["config"], get_cache_directory_path(working_directory, stage_id)) + ) + ) for stage_id in sorted_hashes + } + + # Cache mapper between stage id and cache id. + cache_ids = {stage_id: get_cache_prefix(stage_id, source_digests[stage_id]) + "__" + str(current_validation_tokens[stage_id]) for stage_id in sorted_hashes} + file_cache_paths = {stage_id: get_cache_file_path(working_directory, cache_id) for stage_id, cache_id in cache_ids.items()} + dir_cache_paths = {stage_id: get_cache_directory_path(working_directory, cache_id) for stage_id, cache_id in cache_ids.items()} + # 4.1) Devalidate if they are required (optional, otherwise will reload from cache) if rerun_required: stale_hashes.update(required_hashes) - # 4.2) Devalidate if not in meta - for hash in sorted_cached_hashes: - if not hash in meta: - stale_hashes.add(hash) - - # 4.3) Devalidate if configuration values have changed - # This devalidation step is obsolete since we have implicit config parameters - - # 4.4) Devalidate if module hash of a stage has changed - for hash in sorted_cached_hashes: - if hash in meta: - if not "module_hash" in meta[hash]: - stale_hashes.add(hash) # Backwards compatibility - - else: - previous_module_hash = meta[hash]["module_hash"] - current_module_hash = registry[hash]["wrapper"].module_hash - - if previous_module_hash != current_module_hash: - stale_hashes.add(hash) + if working_directory is None: + # If no working_directory, devalidate all dependencies + for hash in list(stale_hashes): + for dependency_hash in nx.ancestors(graph, hash): + stale_hashes.add(dependency_hash) - # 4.5) Devalidate if cache is not existant - if not working_directory is None: + else: + # 4.5) Devalidate if cache is not existant for hash in sorted_cached_hashes: - directory_path = "%s/%s.cache" % (working_directory, hash) - file_path = "%s/%s.p" % (working_directory, hash) - if not hash in cache_available: stale_hashes.add(hash) - # 4.6) Devalidate if parent has been updated - for hash in sorted_cached_hashes: - if not hash in stale_hashes and hash in meta: - for dependency_hash, dependency_update in meta[hash]["dependencies"].items(): - if not dependency_hash in meta: - stale_hashes.add(hash) - else: - if meta[dependency_hash]["updated"] > dependency_update: - stale_hashes.add(hash) - - # 4.7) Devalidate if parents are not the same anymore - for hash in sorted_cached_hashes: - if not hash in stale_hashes and hash in meta: - cached_hashes = set(meta[hash]["dependencies"].keys()) - current_hashes = set(registry[hash]["dependencies"] if "dependencies" in registry[hash] else []) - - if not cached_hashes == current_hashes: + # 4.8) Manually devalidate stages + for hash in sorted_cached_hashes: + if hash in cache_available and current_validation_tokens[hash] not in cache_available[hash]: stale_hashes.add(hash) - # 4.8) Manually devalidate stages - for hash in sorted_cached_hashes: - stage = registry[hash] - cache_path = "%s/%s.cache" % (working_directory, hash) - context = ValidateContext(stage["config"], cache_path) - - validation_token = stage["wrapper"].validate(context) - existing_token = meta[hash]["validation_token"] if hash in meta and "validation_token" in meta[hash] else None - - if not validation_token == existing_token: - stale_hashes.add(hash) - - # 4.9) Devalidate descendants of devalidated stages - for hash in set(stale_hashes): - for descendant_hash in nx.descendants(graph, hash): - if not descendant_hash in stale_hashes: - stale_hashes.add(descendant_hash) - - # 4.10) Devalidate ephemeral stages if necessary - pending = set(stale_hashes) - - while len(pending) > 0: - for dependency_hash in registry[pending.pop()]["dependencies"]: - if registry[dependency_hash]["ephemeral"]: - if not dependency_hash in stale_hashes: - pending.add(dependency_hash) - - stale_hashes.add(dependency_hash) + # 4.6) Devalidate if parent has been updated + for hash in sorted_cached_hashes: + if not hash in stale_hashes: + ctime = os.stat(file_cache_paths[hash]).st_mtime_ns + for dependency_hash in nx.ancestors(graph, hash): + if dependency_hash not in stale_hashes and dependency_hash in cache_available: + dependency_ctime = os.stat(file_cache_paths[dependency_hash]).st_mtime_ns + if dependency_ctime > ctime: + stale_hashes.add(hash) + break + + # 4.9) Devalidate descendants of devalidated stages + for hash in set(stale_hashes): + for descendant_hash in nx.descendants(graph, hash): + if not descendant_hash in stale_hashes: + stale_hashes.add(descendant_hash) + + # 4.10) Devalidate ephemeral stages if necessary + pending = set(stale_hashes) + + while len(pending) > 0: + for dependency_hash in registry[pending.pop()]["dependencies"]: + if registry[dependency_hash]["ephemeral"]: + if not dependency_hash in stale_hashes: + pending.add(dependency_hash) + + stale_hashes.add(dependency_hash) logger.info("Devalidating %d stages:" % len(stale_hashes)) for hash in stale_hashes: logger.info("- %s" % hash) - # 5) Reset meta information - for hash in stale_hashes: - if hash in meta: - del meta[hash] - - if not working_directory is None: - update_json(meta, working_directory) - - logger.info("Successfully reset meta data") - # 6) Execute stages results = [None] * len(definitions) cache = {} progress = 0 + infos = {} for hash in sorted_hashes: if hash in stale_hashes: logger.info("Executing stage %s ..." % hash) @@ -794,21 +767,21 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non # stage_dependency_info[parent] = meta[parent]["info"] #stage_dependencies = - stage_dependency_info = {} for dependency_hash in stage["dependencies"]: - stage_dependency_info[dependency_hash] = meta[dependency_hash]["info"] + if dependency_hash not in infos and working_directory is not None: + with open(get_info_path(working_directory, cache_ids[dependency_hash]), "rb") as f: + infos[dependency_hash] = pickle.load(f) # Prepare cache path - cache_path = "%s/%s.cache" % (working_directory, hash) + cache_path = dir_cache_paths[hash] if not working_directory is None: if os.path.exists(cache_path): rmtree(cache_path) os.mkdir(cache_path) - context = ExecuteContext(stage["config"], stage["required_stages"], stage["aliases"], working_directory, stage["dependencies"], cache_path, pipeline_config, logger, cache, stage_dependency_info) + context = ExecuteContext(stage["config"], stage["required_stages"], stage["aliases"], working_directory, stage["dependencies"], cache_path, pipeline_config, logger, cache, infos, cache_ids) result = stage["wrapper"].execute(context) - validation_token = stage["wrapper"].validate(ValidateContext(stage["config"], cache_path)) if hash in required_hashes: results[required_hashes.index(hash)] = result @@ -816,24 +789,13 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non if working_directory is None: cache[hash] = result else: - with open("%s/%s.p" % (working_directory, hash), "wb+") as f: + with open(file_cache_paths[hash], "wb+") as f: logger.info("Writing cache for %s" % hash) pickle.dump(result, f, protocol=4) - - # Update meta information - meta[hash] = { - "config": stage["config"], - "updated": datetime.datetime.utcnow().timestamp(), - "dependencies": { - dependency_hash: meta[dependency_hash]["updated"] for dependency_hash in stage["dependencies"] - }, - "info": context.stage_info, - "validation_token": validation_token, - "module_hash": stage["wrapper"].module_hash - } - - if not working_directory is None: - update_json(meta, working_directory) + with open(get_info_path(working_directory, cache_ids[hash]), "wb+") as f: + logger.info("Writing info for %s" % hash) + pickle.dump(context.stage_info, f, protocol=4) + infos[hash] = context.stage_info # Clear cache for ephemeral stages if they are no longer needed if not working_directory is None: @@ -842,8 +804,8 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non ephemeral_counts[dependency_hash] -= 1 if ephemeral_counts[dependency_hash] == 0: - cache_directory_path = "%s/%s.cache" % (working_directory, dependency_hash) - cache_file_path = "%s/%s.p" % (working_directory, dependency_hash) + cache_directory_path = dir_cache_paths[dependency_hash] + cache_file_path = file_cache_paths[dependency_hash] rmtree(cache_directory_path) os.remove(cache_file_path) @@ -862,15 +824,15 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non # Load remaining previously cached results for hash in required_hashes: if results[required_hashes.index(hash)] is None: - with open("%s/%s.p" % (working_directory, hash), "rb") as f: + with open(file_cache_paths[hash], "rb") as f: logger.info("Loading cache for %s ..." % hash) results[required_hashes.index(hash)] = pickle.load(f) if verbose: info = {} - for hash in sorted(meta.keys()): - info.update(meta[hash]["info"]) + for hash in infos.keys(): + info.update(infos[hash]) return { "results": results, diff --git a/tests/fixtures/devalidation/E.py b/tests/fixtures/devalidation/E.py new file mode 100644 index 0000000..01259f7 --- /dev/null +++ b/tests/fixtures/devalidation/E.py @@ -0,0 +1,11 @@ +def configure(context): + context.stage("tests.fixtures.devalidation.D", config={"d": 10}, alias="d") + context.stage("tests.fixtures.devalidation.C") + context.stage("tests.fixtures.devalidation.E1") + context.stage("tests.fixtures.devalidation.E2") + +def execute(context): + context.stage("tests.fixtures.devalidation.E1") + context.stage("tests.fixtures.devalidation.E2") + context.stage("d") + return context.stage("tests.fixtures.devalidation.C") diff --git a/tests/fixtures/devalidation/E1.py b/tests/fixtures/devalidation/E1.py new file mode 100644 index 0000000..1ae5e75 --- /dev/null +++ b/tests/fixtures/devalidation/E1.py @@ -0,0 +1,5 @@ +def configure(context): + pass + +def execute(context): + return 20 diff --git a/tests/fixtures/devalidation/E2.py b/tests/fixtures/devalidation/E2.py new file mode 100644 index 0000000..9a65ecc --- /dev/null +++ b/tests/fixtures/devalidation/E2.py @@ -0,0 +1,5 @@ +def configure(context): + pass + +def execute(context): + return 40 diff --git a/tests/fixtures/ephemeral/E.py b/tests/fixtures/ephemeral/E.py new file mode 100644 index 0000000..24bd06a --- /dev/null +++ b/tests/fixtures/ephemeral/E.py @@ -0,0 +1,5 @@ +def configure(context): + context.stage("tests.fixtures.ephemeral.D") + +def execute(context): + pass diff --git a/tests/test_devalidate.py b/tests/test_devalidate.py index e9d2966..a2f871a 100644 --- a/tests/test_devalidate.py +++ b/tests/test_devalidate.py @@ -1,5 +1,6 @@ import synpp from pytest import raises +import time def test_devalidate_by_config(tmpdir): working_directory = tmpdir.mkdir("sub") @@ -63,6 +64,7 @@ def test_devalidate_by_parent(tmpdir): assert "tests.fixtures.devalidation.B__42b7b4f2921788ea14dac5566e6f06d0" in result["stale"] assert "tests.fixtures.devalidation.C__42b7b4f2921788ea14dac5566e6f06d0" in result["stale"] + time.sleep(0.1) result = synpp.run([{ "descriptor": "tests.fixtures.devalidation.C" }], config = { "a": 1 }, working_directory = working_directory, verbose = True) @@ -72,6 +74,7 @@ def test_devalidate_by_parent(tmpdir): assert not "tests.fixtures.devalidation.B__42b7b4f2921788ea14dac5566e6f06d0" in result["stale"] assert "tests.fixtures.devalidation.C__42b7b4f2921788ea14dac5566e6f06d0" in result["stale"] + time.sleep(0.1) result = synpp.run([{ "descriptor": "tests.fixtures.devalidation.A2" }], config = { "a": 1 }, working_directory = working_directory, verbose = True) @@ -81,6 +84,7 @@ def test_devalidate_by_parent(tmpdir): assert not "tests.fixtures.devalidation.B__42b7b4f2921788ea14dac5566e6f06d0" in result["stale"] assert not "tests.fixtures.devalidation.C__42b7b4f2921788ea14dac5566e6f06d0" in result["stale"] + time.sleep(0.1) result = synpp.run([{ "descriptor": "tests.fixtures.devalidation.C" }], config = { "a": 1 }, working_directory = working_directory, verbose = True) @@ -113,6 +117,38 @@ def test_devalidate_descendants(tmpdir): assert "tests.fixtures.devalidation.B__42b7b4f2921788ea14dac5566e6f06d0" in result["stale"] assert "tests.fixtures.devalidation.C__42b7b4f2921788ea14dac5566e6f06d0" in result["stale"] +import os +import sys +def test_devalidation_stability(tmpdir): + temp_directory = tmpdir.mkdir("sub") + working_directory = os.path.join(temp_directory, "cache") + config_yml_path = os.path.join(temp_directory, "config.yml") + config_yml = f"""run: + - tests.fixtures.devalidation.E + +working_directory: {working_directory} + +config: + a: 1""" + + with open(config_yml_path, "w") as f: + f.write(config_yml) + + os.system(f"python -m synpp {config_yml_path}") + + result = synpp.run([{ + "descriptor": "tests.fixtures.devalidation.E" + }], config = { "a": 1 }, working_directory = working_directory, verbose = True) + + assert not "tests.fixtures.devalidation.A1__42b7b4f2921788ea14dac5566e6f06d0" in result["stale"] + assert not "tests.fixtures.devalidation.A2" in result["stale"] + assert not "tests.fixtures.devalidation.B__42b7b4f2921788ea14dac5566e6f06d0" in result["stale"] + assert not "tests.fixtures.devalidation.C__42b7b4f2921788ea14dac5566e6f06d0" in result["stale"] + assert "tests.fixtures.devalidation.E__42b7b4f2921788ea14dac5566e6f06d0" in result["stale"] + assert not "tests.fixtures.devalidation.E1" in result["stale"] + assert not "tests.fixtures.devalidation.E2" in result["stale"] + + def test_devalidate_token(tmpdir): working_directory = tmpdir.mkdir("sub") path = "%s/test.fixture" % working_directory diff --git a/tests/test_ephemeral.py b/tests/test_ephemeral.py index dfea774..db0012a 100644 --- a/tests/test_ephemeral.py +++ b/tests/test_ephemeral.py @@ -85,3 +85,24 @@ def test_ephemeral_BD(tmpdir): assert not "tests.fixtures.ephemeral.B" in result["stale"] assert "tests.fixtures.ephemeral.C" in result["stale"] assert "tests.fixtures.ephemeral.D" in result["stale"] + +def test_ephemeral_E(tmpdir): + working_directory = tmpdir.mkdir("cache") + + result = synpp.run([ + { "descriptor": "tests.fixtures.ephemeral.E" }, + ], working_directory = working_directory, verbose = True) + + assert "tests.fixtures.ephemeral.A" in result["stale"] + assert "tests.fixtures.ephemeral.C" in result["stale"] + assert "tests.fixtures.ephemeral.D" in result["stale"] + assert "tests.fixtures.ephemeral.E" in result["stale"] + + result = synpp.run([ + { "descriptor": "tests.fixtures.ephemeral.E" }, + ], working_directory = working_directory, verbose = True) + + assert not "tests.fixtures.ephemeral.A" in result["stale"] + assert not "tests.fixtures.ephemeral.C" in result["stale"] + assert not "tests.fixtures.ephemeral.D" in result["stale"] + assert "tests.fixtures.ephemeral.E" in result["stale"] \ No newline at end of file