Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid monolithic state file #81

Open
wants to merge 22 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 97 additions & 135 deletions src/synpp/pipeline.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, sorry for the long delay. I tested it with certain scenarios and it looks good. It's nice to not have the monolithic file anymore. I marked one part in the code, why do we need these explicity validation tokens added to the cache folders / files? Currently this creates some strange file names with spaces (here for IDF):

image

Probably this may cause some problems on different OS?

Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,18 @@ def hash_name(name, config):
else:
return name

def get_cache_prefix(stage_id, source_code):
cache_prefix = stage_id + "__" + hashlib.md5(source_code.encode()).hexdigest()
return cache_prefix

def get_cache_directory_path(working_directory, cache_id):
return "%s/%s.cache" % (working_directory, cache_id)

def get_cache_file_path(working_directory, cache_id):
return "%s/%s.p" % (working_directory, cache_id)

def get_info_path(working_directory, cache_id):
return "%s/%s.info" % (working_directory, cache_id)

class ConfiguredStage:
def __init__(self, instance, config, configuration_context):
Expand Down Expand Up @@ -310,7 +322,7 @@ def path(self):


class ExecuteContext(Context):
def __init__(self, required_config, required_stages, aliases, working_directory, dependencies, cache_path, pipeline_config, logger, cache, dependency_info):
def __init__(self, required_config, required_stages, aliases, working_directory, dependencies, cache_path, pipeline_config, logger, cache, dependency_info, cache_ids):
self.required_config = required_config
self.working_directory = working_directory
self.dependencies = dependencies
Expand All @@ -323,6 +335,7 @@ def __init__(self, required_config, required_stages, aliases, working_directory,
self.dependency_info = dependency_info
self.aliases = aliases
self.required_stages = required_stages
self.cache_ids = cache_ids

self.progress_context = None

Expand All @@ -339,7 +352,7 @@ def stage(self, name, config = {}):
return self.cache[dependency]
else:
if not dependency in self.dependency_cache:
with open("%s/%s.p" % (self.working_directory, dependency), "rb") as f:
with open(get_cache_file_path(self.working_directory, self.cache_ids[dependency]), "rb") as f:
self.logger.info("Loading cache for %s ..." % dependency)
self.dependency_cache[dependency] = pickle.load(f)

Expand All @@ -353,7 +366,7 @@ def path(self, name = None, config = {}):
return self.cache_path

dependency = self._get_dependency({ "descriptor": name, "config": config })
return "%s/%s.cache" % (self.working_directory, dependency)
return get_cache_directory_path(self.working_directory, self.cache_ids[dependency])

def set_info(self, name, value):
self.stage_info[name] = value
Expand Down Expand Up @@ -561,16 +574,6 @@ def process_stages(definitions, global_config, externals={}, aliases={}):
return registry


def update_json(meta, working_directory):
if os.path.exists("%s/pipeline.json" % working_directory):
shutil.move("%s/pipeline.json" % working_directory, "%s/pipeline.json.bk" % working_directory)

with open("%s/pipeline.json.new" % working_directory, "w+") as f:
json.dump(meta, f)

shutil.move("%s/pipeline.json.new" % working_directory, "%s/pipeline.json" % working_directory)


def run(definitions, config = {}, working_directory = None, flowchart_path = None, dryrun = False, verbose = False,
logger = logging.getLogger("synpp"), rerun_required=True, ensure_working_directory=False,
externals = {}, aliases = {}):
Expand Down Expand Up @@ -637,16 +640,24 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non

sorted_hashes = list(nx.topological_sort(graph))

# Concatenate source digest of dependencies
source_digests = {}
for hash in sorted_hashes:
source_digests[hash] = ""
for dependency_hash in sorted(nx.ancestors(graph, hash)):
source_digests[hash] += registry[dependency_hash]["wrapper"].module_hash
source_digests[hash] += registry[hash]["wrapper"].module_hash

# Check where cache is available
cache_available = set()
cache_available = {}

if not working_directory is None:
for hash in sorted_hashes:
directory_path = "%s/%s.cache" % (working_directory, hash)
file_path = "%s/%s.p" % (working_directory, hash)
prefix = get_cache_prefix(hash, source_digests[hash])
prefixed = [filename[:-2] for filename in os.listdir(working_directory) if filename.startswith(prefix) and filename.endswith(".p")]

if os.path.exists(directory_path) and os.path.exists(file_path):
cache_available.add(hash)
if prefixed:
cache_available[hash] = [filename.split("__")[-1] for filename in prefixed]
registry[hash]["ephemeral"] = False

# Set up ephemeral stage counts
Expand All @@ -662,122 +673,84 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non

ephemeral_counts[hash] += 1

# 3) Load information about stages
meta = {}

if not working_directory is None:
try:
with open("%s/pipeline.json" % working_directory) as f:
meta = json.load(f)
logger.info("Found pipeline metadata in %s/pipeline.json" % working_directory)
except FileNotFoundError:
logger.info("Did not find pipeline metadata in %s/pipeline.json" % working_directory)

# 4) Devalidate stages
sorted_cached_hashes = sorted_hashes - ephemeral_counts.keys()
stale_hashes = set()

# Get current validation tokens
current_validation_tokens = {
stage_id:
str(
registry[stage_id]["wrapper"].validate(
ValidateContext(registry[stage_id]["config"], get_cache_directory_path(working_directory, stage_id))
)
) for stage_id in sorted_hashes
}

# Cache mapper between stage id and cache id.
cache_ids = {stage_id: get_cache_prefix(stage_id, source_digests[stage_id]) + "__" + str(current_validation_tokens[stage_id]) for stage_id in sorted_hashes}
Comment on lines +680 to +691
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this "__" + validation_tokens, can't this just be included in the hash? Like hashing the get_cache_prefix(...) and the validation tokens together?

file_cache_paths = {stage_id: get_cache_file_path(working_directory, cache_id) for stage_id, cache_id in cache_ids.items()}
dir_cache_paths = {stage_id: get_cache_directory_path(working_directory, cache_id) for stage_id, cache_id in cache_ids.items()}

# 4.1) Devalidate if they are required (optional, otherwise will reload from cache)
if rerun_required:
stale_hashes.update(required_hashes)

# 4.2) Devalidate if not in meta
for hash in sorted_cached_hashes:
if not hash in meta:
stale_hashes.add(hash)

# 4.3) Devalidate if configuration values have changed
# This devalidation step is obsolete since we have implicit config parameters

# 4.4) Devalidate if module hash of a stage has changed
for hash in sorted_cached_hashes:
if hash in meta:
if not "module_hash" in meta[hash]:
stale_hashes.add(hash) # Backwards compatibility

else:
previous_module_hash = meta[hash]["module_hash"]
current_module_hash = registry[hash]["wrapper"].module_hash

if previous_module_hash != current_module_hash:
stale_hashes.add(hash)
if working_directory is None:
# If no working_directory, devalidate all dependencies
for hash in list(stale_hashes):
for dependency_hash in nx.ancestors(graph, hash):
stale_hashes.add(dependency_hash)

# 4.5) Devalidate if cache is not existant
if not working_directory is None:
else:
# 4.5) Devalidate if cache is not existant
for hash in sorted_cached_hashes:
directory_path = "%s/%s.cache" % (working_directory, hash)
file_path = "%s/%s.p" % (working_directory, hash)

if not hash in cache_available:
stale_hashes.add(hash)

# 4.6) Devalidate if parent has been updated
for hash in sorted_cached_hashes:
if not hash in stale_hashes and hash in meta:
for dependency_hash, dependency_update in meta[hash]["dependencies"].items():
if not dependency_hash in meta:
stale_hashes.add(hash)
else:
if meta[dependency_hash]["updated"] > dependency_update:
stale_hashes.add(hash)

# 4.7) Devalidate if parents are not the same anymore
for hash in sorted_cached_hashes:
if not hash in stale_hashes and hash in meta:
cached_hashes = set(meta[hash]["dependencies"].keys())
current_hashes = set(registry[hash]["dependencies"] if "dependencies" in registry[hash] else [])

if not cached_hashes == current_hashes:
# 4.8) Manually devalidate stages
for hash in sorted_cached_hashes:
if hash in cache_available and current_validation_tokens[hash] not in cache_available[hash]:
stale_hashes.add(hash)

# 4.8) Manually devalidate stages
for hash in sorted_cached_hashes:
stage = registry[hash]
cache_path = "%s/%s.cache" % (working_directory, hash)
context = ValidateContext(stage["config"], cache_path)

validation_token = stage["wrapper"].validate(context)
existing_token = meta[hash]["validation_token"] if hash in meta and "validation_token" in meta[hash] else None

if not validation_token == existing_token:
stale_hashes.add(hash)

# 4.9) Devalidate descendants of devalidated stages
for hash in set(stale_hashes):
for descendant_hash in nx.descendants(graph, hash):
if not descendant_hash in stale_hashes:
stale_hashes.add(descendant_hash)

# 4.10) Devalidate ephemeral stages if necessary
pending = set(stale_hashes)

while len(pending) > 0:
for dependency_hash in registry[pending.pop()]["dependencies"]:
if registry[dependency_hash]["ephemeral"]:
if not dependency_hash in stale_hashes:
pending.add(dependency_hash)

stale_hashes.add(dependency_hash)
# 4.6) Devalidate if parent has been updated
for hash in sorted_cached_hashes:
if not hash in stale_hashes:
ctime = os.stat(file_cache_paths[hash]).st_mtime_ns
for dependency_hash in nx.ancestors(graph, hash):
if dependency_hash not in stale_hashes and dependency_hash in cache_available:
dependency_ctime = os.stat(file_cache_paths[dependency_hash]).st_mtime_ns
if dependency_ctime > ctime:
stale_hashes.add(hash)
break

# 4.9) Devalidate descendants of devalidated stages
for hash in set(stale_hashes):
for descendant_hash in nx.descendants(graph, hash):
if not descendant_hash in stale_hashes:
stale_hashes.add(descendant_hash)

# 4.10) Devalidate ephemeral stages if necessary
pending = set(stale_hashes)

while len(pending) > 0:
for dependency_hash in registry[pending.pop()]["dependencies"]:
if registry[dependency_hash]["ephemeral"]:
if not dependency_hash in stale_hashes:
pending.add(dependency_hash)

stale_hashes.add(dependency_hash)

logger.info("Devalidating %d stages:" % len(stale_hashes))
for hash in stale_hashes: logger.info("- %s" % hash)

# 5) Reset meta information
for hash in stale_hashes:
if hash in meta:
del meta[hash]

if not working_directory is None:
update_json(meta, working_directory)

logger.info("Successfully reset meta data")

# 6) Execute stages
results = [None] * len(definitions)
cache = {}

progress = 0

infos = {}
for hash in sorted_hashes:
if hash in stale_hashes:
logger.info("Executing stage %s ..." % hash)
Expand All @@ -794,46 +767,35 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non
# stage_dependency_info[parent] = meta[parent]["info"]
#stage_dependencies =

stage_dependency_info = {}
for dependency_hash in stage["dependencies"]:
stage_dependency_info[dependency_hash] = meta[dependency_hash]["info"]
if dependency_hash not in infos and working_directory is not None:
with open(get_info_path(working_directory, cache_ids[dependency_hash]), "rb") as f:
infos[dependency_hash] = pickle.load(f)

# Prepare cache path
cache_path = "%s/%s.cache" % (working_directory, hash)
cache_path = dir_cache_paths[hash]

if not working_directory is None:
if os.path.exists(cache_path):
rmtree(cache_path)
os.mkdir(cache_path)

context = ExecuteContext(stage["config"], stage["required_stages"], stage["aliases"], working_directory, stage["dependencies"], cache_path, pipeline_config, logger, cache, stage_dependency_info)
context = ExecuteContext(stage["config"], stage["required_stages"], stage["aliases"], working_directory, stage["dependencies"], cache_path, pipeline_config, logger, cache, infos, cache_ids)
result = stage["wrapper"].execute(context)
validation_token = stage["wrapper"].validate(ValidateContext(stage["config"], cache_path))

if hash in required_hashes:
results[required_hashes.index(hash)] = result

if working_directory is None:
cache[hash] = result
else:
with open("%s/%s.p" % (working_directory, hash), "wb+") as f:
with open(file_cache_paths[hash], "wb+") as f:
logger.info("Writing cache for %s" % hash)
pickle.dump(result, f, protocol=4)

# Update meta information
meta[hash] = {
"config": stage["config"],
"updated": datetime.datetime.utcnow().timestamp(),
"dependencies": {
dependency_hash: meta[dependency_hash]["updated"] for dependency_hash in stage["dependencies"]
},
"info": context.stage_info,
"validation_token": validation_token,
"module_hash": stage["wrapper"].module_hash
}

if not working_directory is None:
update_json(meta, working_directory)
with open(get_info_path(working_directory, cache_ids[hash]), "wb+") as f:
logger.info("Writing info for %s" % hash)
pickle.dump(context.stage_info, f, protocol=4)
infos[hash] = context.stage_info

# Clear cache for ephemeral stages if they are no longer needed
if not working_directory is None:
Expand All @@ -842,8 +804,8 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non
ephemeral_counts[dependency_hash] -= 1

if ephemeral_counts[dependency_hash] == 0:
cache_directory_path = "%s/%s.cache" % (working_directory, dependency_hash)
cache_file_path = "%s/%s.p" % (working_directory, dependency_hash)
cache_directory_path = dir_cache_paths[dependency_hash]
cache_file_path = file_cache_paths[dependency_hash]

rmtree(cache_directory_path)
os.remove(cache_file_path)
Expand All @@ -862,15 +824,15 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non
# Load remaining previously cached results
for hash in required_hashes:
if results[required_hashes.index(hash)] is None:
with open("%s/%s.p" % (working_directory, hash), "rb") as f:
with open(file_cache_paths[hash], "rb") as f:
logger.info("Loading cache for %s ..." % hash)
results[required_hashes.index(hash)] = pickle.load(f)

if verbose:
info = {}

for hash in sorted(meta.keys()):
info.update(meta[hash]["info"])
for hash in infos.keys():
info.update(infos[hash])

return {
"results": results,
Expand Down
11 changes: 11 additions & 0 deletions tests/fixtures/devalidation/E.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
def configure(context):
context.stage("tests.fixtures.devalidation.D", config={"d": 10}, alias="d")
context.stage("tests.fixtures.devalidation.C")
context.stage("tests.fixtures.devalidation.E1")
context.stage("tests.fixtures.devalidation.E2")

def execute(context):
context.stage("tests.fixtures.devalidation.E1")
context.stage("tests.fixtures.devalidation.E2")
context.stage("d")
return context.stage("tests.fixtures.devalidation.C")
5 changes: 5 additions & 0 deletions tests/fixtures/devalidation/E1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
def configure(context):
pass

def execute(context):
return 20
5 changes: 5 additions & 0 deletions tests/fixtures/devalidation/E2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
def configure(context):
pass

def execute(context):
return 40
5 changes: 5 additions & 0 deletions tests/fixtures/ephemeral/E.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
def configure(context):
context.stage("tests.fixtures.ephemeral.D")

def execute(context):
pass
Loading