-
Notifications
You must be signed in to change notification settings - Fork 56
/
Copy pathworker.py
98 lines (80 loc) · 3.1 KB
/
worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import asyncio
import dataclasses
import logging
from datetime import datetime, timedelta
from ipaddress import IPv4Address
from typing import List
from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.worker import Worker
from temporalio.worker.workflow_sandbox import (
SandboxedWorkflowRunner,
SandboxRestrictions,
)
# We always want to pass through external modules to the sandbox that we know
# are safe for workflow use
with workflow.unsafe.imports_passed_through():
from pydantic import BaseModel
from pydantic_converter.converter import pydantic_data_converter
class MyPydanticModel(BaseModel):
some_ip: IPv4Address
some_date: datetime
@activity.defn
async def my_activity(models: List[MyPydanticModel]) -> List[MyPydanticModel]:
activity.logger.info("Got models in activity: %s" % models)
return models
@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self, models: List[MyPydanticModel]) -> List[MyPydanticModel]:
workflow.logger.info("Got models in workflow: %s" % models)
return await workflow.execute_activity(
my_activity, models, start_to_close_timeout=timedelta(minutes=1)
)
# Due to known issues with Pydantic's use of issubclass and our inability to
# override the check in sandbox, Pydantic will think datetime is actually date
# in the sandbox. At the expense of protecting against datetime.now() use in
# workflows, we're going to remove datetime module restrictions. See sdk-python
# README's discussion of known sandbox issues for more details.
def new_sandbox_runner() -> SandboxedWorkflowRunner:
# TODO(cretz): Use with_child_unrestricted when https://github.com/temporalio/sdk-python/issues/254
# is fixed and released
invalid_module_member_children = dict(
SandboxRestrictions.invalid_module_members_default.children
)
del invalid_module_member_children["datetime"]
return SandboxedWorkflowRunner(
restrictions=dataclasses.replace(
SandboxRestrictions.default,
invalid_module_members=dataclasses.replace(
SandboxRestrictions.invalid_module_members_default,
children=invalid_module_member_children,
),
)
)
interrupt_event = asyncio.Event()
async def main():
logging.basicConfig(level=logging.INFO)
# Connect client using the Pydantic converter
client = await Client.connect(
"localhost:7233", data_converter=pydantic_data_converter
)
# Run a worker for the workflow
async with Worker(
client,
task_queue="pydantic_converter-task-queue",
workflows=[MyWorkflow],
activities=[my_activity],
workflow_runner=new_sandbox_runner(),
):
# Wait until interrupted
print("Worker started, ctrl+c to exit")
await interrupt_event.wait()
print("Shutting down")
if __name__ == "__main__":
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
interrupt_event.set()
loop.run_until_complete(loop.shutdown_asyncgens())