-
Notifications
You must be signed in to change notification settings - Fork 2
Home
As an example - albeit a bad one as it doesn't highlight the good side of asyncio
, Lets set up a pipeline to simply reverse a string
Lets define it in a class to group related coros together
from pipelines.processor import Processor
from pipelines.plumber import Plumber
import asyncio
class StringFuncs:
@classmethod
async def reverse(cls, self:Processor=None, q_elt:tuple=None):
return q_elt[0][::-1]
# This coro shall be called as StringFuncs.reverse
The arguments self
and q_elt
have to be there in the coroutine signature as the first two arguments. They contain the references to the Processor
instance the coro is associated with and the tuple containing the input to the coro respectively.
We need a way to provide input to the pipeline. This is achieved by creating a node of type InputProcessor
. However, the developer does not need to worry about this. The only change that we need to keep in mind is the coroutine signature.
Let's define an input coroutine that generates random hex strings and add it to the StringFuncs
class.
class StringFuncs:
...
@classmethod
async def input_coro(cls, self:Processor=None, output_q:asyncio.Queue=None):
# This coroutine generates 20 random input strings and populates the
# output_q of whatever node it runs on.
import uuid
acc = [ str(uuid.uuid4()) for _ in range(20) ]
for i in acc:
await output_q.put(i)
# unnecessary but async
await asyncio.sleep(0)
class StringFuncs:
...
@classmethod
async def output_coro(cls, self, q_elt):
print('output~> ', q_elt)
The pipeline is represented by a graph like so -
input_d = {
'nodes': {
'inp': { 'coro': StringFuncs.input_coro },
'rev': { 'coro': StringFuncs.reverse },
'out': { 'coro': StringFuncs.output_coro },
},
'graph': {
'inp': ('rev', 'out'), # output of node 'inp' ~> 'rev' and 'out'
'rev': ('out', ), # and so on...
'out': None,
},
}
Now that the graph defining the pipeline is built, we need to instantiate it using the Plumber
. The Plumber
takes two arguments - the graph dict and a coro_map
which is basically a function that maps the coro
value in the nodes
dict to the appropriate function
object i.e. it maps
input_d['nodes']['inp']['coro'] ~> StringFuncs.input_coro
In our toy application, it can be trivially defined as -
coro_map = lambda x: x
And so we can build and run the pipeline as follows -
_t = Plumber(input_d, coro_map=lambda x: x)
_t.create_pipeline()
output~> ('afcaae36-213f-46ff-bdb0-ab417fef65c9', '9c56fef714ba-0bdb-ff64-f312-63eaacfa')
output~> ('81456b84-efb1-4791-baa9-c9555a70bfbd', 'dbfb07a5559c-9aab-1974-1bfe-48b65418')
output~> ('8a480d0f-6f3c-4733-92f9-ae5cfa1748d9', '9d8471afc5ea-9f29-3374-c3f6-f0d084a8')
...
The example code can be found in demos/readme_demo.py
Feel free to raise issues!
I'm looking for people who can help me backpressure test this 🚀