From 9a03146ef3414ad8ff8b4cf0318faa1b307d4d17 Mon Sep 17 00:00:00 2001 From: Leif Johansson Date: Tue, 10 Dec 2019 22:04:36 +0100 Subject: [PATCH] support for macros and scopes --- examples/batch-mdq-loop.fd | 12 +++++------- src/pyff/builtins.py | 16 ++++++++++++++-- src/pyff/pipes.py | 14 +++++++++++++- 3 files changed, 32 insertions(+), 10 deletions(-) diff --git a/examples/batch-mdq-loop.fd b/examples/batch-mdq-loop.fd index 63e1e97a..4b8356c0 100644 --- a/examples/batch-mdq-loop.fd +++ b/examples/batch-mdq-loop.fd @@ -1,4 +1,5 @@ - when sign: + - drop_xsi_type: - finalize: cacheDuration: PT5H validUntil: P10D @@ -12,16 +13,13 @@ - http://fed.openathens.net/oafed/metadata - http://edugain.cdn.samlbits.net - select: + - fork: + - then sign: + - break - map: - log_entity: - fork: - - drop_xsi_type: - - finalize: - cacheDuration: PT5H - validUntil: P10D - - sign: - key: sign.key - cert: sign.crt + - then sign: - publish: output: /tmp/mdq/entities hash_link: true diff --git a/src/pyff/builtins.py b/src/pyff/builtins.py index 61a1e572..27a97043 100644 --- a/src/pyff/builtins.py +++ b/src/pyff/builtins.py @@ -81,6 +81,7 @@ def _p(e): ip = Plumbing(pipeline=req.args, pid="{}.each[{}]".format(req.plumbing.pid, entity_id)) ireq = Plumbing.Request(ip, req.md, t=e, scheduler=req.scheduler) ireq.set_id(entity_id) + ireq.set_parent(req) return ip.iprocess(ireq) from multiprocessing.pool import ThreadPool @@ -89,6 +90,16 @@ def _p(e): log.info("processed {} entities".format(len(result))) +@pipe(name="then") +def _then(req, *opts): + """ + Call a named 'when' clause and return - akin to macro invocations for pyFF + """ + for cb in [PipelineCallback(p, req, store=req.md.store) for p in opts]: + req.t = cb(req.t) + return req.t + + @pipe(name="log_entity") def _log_entity(req, *opts): """ @@ -221,6 +232,7 @@ def fork(req, *opts): ip = Plumbing(pipeline=req.args, pid="%s.fork" % req.plumbing.pid) ireq = Plumbing.Request(ip, req.md, t=nt, scheduler=req.scheduler) ireq.set_id(req.id) + ireq.set_parent(req) ip.iprocess(ireq) if req.t is not None and ireq.t is not None and len(root(ireq.t)) > 0: @@ -608,10 +620,10 @@ def load(req, *opts): params['verify'] = elt if params['via'] is not None: - params['via'] = [PipelineCallback(pipe, req, store=req.md.store) for pipe in params['via']] + params['via'] = [PipelineCallback(p, req, store=req.md.store) for p in params['via']] if params['cleanup'] is not None: - params['cleanup'] = [PipelineCallback(pipe, req, store=req.md.store) for pipe in params['cleanup']] + params['cleanup'] = [PipelineCallback(p, req, store=req.md.store) for p in params['cleanup']] params.update(opts) diff --git a/src/pyff/pipes.py b/src/pyff/pipes.py index bfe5b6a3..bbcda557 100644 --- a/src/pyff/pipes.py +++ b/src/pyff/pipes.py @@ -114,7 +114,7 @@ class PipelineCallback(object): def __init__(self, entry_point, req, store=None): self.entry_point = entry_point - self.plumbing = Plumbing(req.plumbing.pipeline, "%s-via-%s" % (req.plumbing.id, entry_point)) + self.plumbing = Plumbing(req.scope_of(entry_point).plumbing.pipeline, "%s-via-%s" % (req.plumbing.id, entry_point)) self.req = req self.store = store @@ -213,6 +213,15 @@ def __init__(self, pl, md, t=None, name=None, args=None, state=None, store=None, self.scheduler = scheduler self.raise_exceptions = raise_exceptions self.exception = None + self.parent = None + + def scope_of(self, entry_point): + if 'with {}'.format(entry_point) in self.plumbing.pipeline: + return self + elif self.parent is None: + return self + else: + return self.parent.scope_of(entry_point) @property def id(self): @@ -227,6 +236,9 @@ def id(self): def set_id(self, _id): self._id = _id + def set_parent(self, _parent): + self.parent = _parent + @property def store(self): if self._store: