Skip to content

Commit

Permalink
Merge RunResult and DeployResult
Browse files Browse the repository at this point in the history
  • Loading branch information
erikbern committed Jan 27, 2025
1 parent 2343d6d commit 20a7519
Showing 1 changed file with 44 additions and 56 deletions.
100 changes: 44 additions & 56 deletions modal/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,25 +46,13 @@


@dataclasses.dataclass(frozen=True)
class DeployResult:
class AppResult:
"""Dataclass representing the result of deploying an app."""

app_id: str
app_page_url: str
app_logs_url: str
warnings: list[str]


@dataclasses.dataclass(frozen=True)
class RunResult:
"""Dataclass representing the result of running an app."""

# TODO(erikbern): merge with DeployResult?

app_id: str
app_page_url: str
app_logs_url: Optional[str] = None
# warnings: list[str]
warnings: list[str] = dataclasses.field(default_factory=list)


async def _heartbeat(client: _Client, app_id: str) -> None:
Expand All @@ -77,7 +65,7 @@ async def _heartbeat(client: _Client, app_id: str) -> None:

async def _init_local_app_existing(
client: _Client, existing_app_id: str, environment_name: str
) -> tuple[RunningApp, RunResult]:
) -> tuple[RunningApp, AppResult]:
# Get all the objects first
obj_req = api_pb2.AppGetLayoutRequest(app_id=existing_app_id)
obj_resp, _ = await gather_cancel_on_exc(
Expand All @@ -87,16 +75,16 @@ async def _init_local_app_existing(
)
app_page_url = f"https://modal.com/apps/{existing_app_id}" # TODO (elias): this should come from the backend
running_app: RunningApp = RunningApp(obj_resp.app_layout)
run_result: RunResult = RunResult(existing_app_id, app_page_url=app_page_url)
return (running_app, run_result)
app_result: AppResult = AppResult(existing_app_id, app_page_url=app_page_url)
return (running_app, app_result)


async def _init_local_app_new(
client: _Client,
description: str,
app_state: int, # ValueType
environment_name: str = "",
) -> tuple[RunningApp, RunResult]:
) -> tuple[RunningApp, AppResult]:
app_req = api_pb2.AppCreateRequest(
description=description,
environment_name=environment_name,
Expand All @@ -109,20 +97,20 @@ async def _init_local_app_new(
)
logger.debug(f"Created new app with id {app_resp.app_id}")
running_app: RunningApp = RunningApp()
run_result: RunResult = RunResult(
app_result: AppResult = AppResult(
app_resp.app_id,
app_page_url=app_resp.app_page_url,
app_logs_url=app_resp.app_logs_url,
)
return (running_app, run_result)
return (running_app, app_result)


async def _init_local_app_from_name(
client: _Client,
name: str,
namespace: Any,
environment_name: str = "",
) -> tuple[RunningApp, RunResult]:
) -> tuple[RunningApp, AppResult]:
# Look up any existing deployment
app_req = api_pb2.AppGetByDeploymentNameRequest(
name=name,
Expand Down Expand Up @@ -199,7 +187,7 @@ async def _load(tag, obj):
async def _publish_app(
client: _Client,
running_app: RunningApp,
run_result: RunResult,
app_result: AppResult,
app_state: int, # api_pb2.AppState.value
functions: dict[str, _Function],
classes: dict[str, _Cls],
Expand All @@ -211,7 +199,7 @@ async def _publish_app(
definition_ids = {obj.object_id: obj._get_metadata().definition_id for obj in functions.values()} # type: ignore

request = api_pb2.AppPublishRequest(
app_id=run_result.app_id,
app_id=app_result.app_id,
name=name,
deployment_tag=tag,
app_state=app_state, # type: ignore : should be a api_pb2.AppState.value
Expand Down Expand Up @@ -322,23 +310,23 @@ async def _run_app(
interactive = False

running_app: RunningApp
run_result: RunResult
running_app, run_result = await _init_local_app_new(
app_result: AppResult
running_app, app_result = await _init_local_app_new(
client,
app.description or "",
environment_name=environment_name or "",
app_state=app_state,
)

logs_timeout = config["logs_timeout"]
async with app._set_local_app(client, running_app, run_result.app_id, interactive), TaskContext(
async with app._set_local_app(client, running_app, app_result.app_id, interactive), TaskContext(
grace=logs_timeout
) as tc:
# Start heartbeats loop to keep the client alive
# we don't log heartbeat exceptions in detached mode
# as losing the local connection will not affect the running app
def heartbeat():
return _heartbeat(client, run_result.app_id)
return _heartbeat(client, app_result.app_id)

tc.infinite_loop(heartbeat, sleep=HEARTBEAT_INTERVAL, log_exception=not detach)
logs_loop: Optional[asyncio.Task] = None
Expand All @@ -351,32 +339,32 @@ def heartbeat():

with output_mgr.make_live(output_mgr.step_progress("Initializing...")):
initialized_msg = (
f"Initialized. [grey70]View run at [underline]{run_result.app_page_url}[/underline][/grey70]"
f"Initialized. [grey70]View run at [underline]{app_result.app_page_url}[/underline][/grey70]"
)
output_mgr.print(output_mgr.step_completed(initialized_msg))
output_mgr.update_app_page_url(run_result.app_page_url or "ERROR:NO_APP_PAGE")
output_mgr.update_app_page_url(app_result.app_page_url or "ERROR:NO_APP_PAGE")

# Start logs loop

logs_loop = tc.create_task(
get_app_logs_loop(client, output_mgr, app_id=run_result.app_id, app_logs_url=run_result.app_logs_url)
get_app_logs_loop(client, output_mgr, app_id=app_result.app_id, app_logs_url=app_result.app_logs_url)
)

try:
# Create all members
await _create_all_objects(client, running_app, app, run_result.app_id, environment_name)
await _create_all_objects(client, running_app, app, app_result.app_id, environment_name)

# Publish the app
await _publish_app(client, running_app, run_result, app_state, app._functions, app._classes)
await _publish_app(client, running_app, app_result, app_state, app._functions, app._classes)
except asyncio.CancelledError as e:
# this typically happens on sigint/ctrl-C during setup (the KeyboardInterrupt happens in the main thread)
if output_mgr := _get_output_manager():
output_mgr.print("Aborting app initialization...\n")

await _status_based_disconnect(client, run_result.app_id, e)
await _status_based_disconnect(client, app_result.app_id, e)
raise
except BaseException as e:
await _status_based_disconnect(client, run_result.app_id, e)
await _status_based_disconnect(client, app_result.app_id, e)
raise

try:
Expand All @@ -392,26 +380,26 @@ def heartbeat():
else:
yield app
# successful completion!
await _status_based_disconnect(client, run_result.app_id, exc_info=None)
await _status_based_disconnect(client, app_result.app_id, exc_info=None)
except KeyboardInterrupt as e:
# this happens only if sigint comes in during the yield block above
if detach:
if output_mgr := _get_output_manager():
output_mgr.print(output_mgr.step_completed("Shutting down Modal client."))
output_mgr.print(
"The detached app keeps running. You can track its progress at: "
f"[magenta]{run_result.app_page_url}[/magenta]"
f"[magenta]{app_result.app_page_url}[/magenta]"
""
)
if logs_loop:
logs_loop.cancel()
await _status_based_disconnect(client, run_result.app_id, e)
await _status_based_disconnect(client, app_result.app_id, e)
else:
if output_mgr := _get_output_manager():
output_mgr.print(
"Disconnecting from Modal - This will terminate your Modal app in a few seconds.\n"
)
await _status_based_disconnect(client, run_result.app_id, e)
await _status_based_disconnect(client, app_result.app_id, e)
if logs_loop:
try:
await asyncio.wait_for(logs_loop, timeout=logs_timeout)
Expand All @@ -422,13 +410,13 @@ def heartbeat():
output_mgr.print(
output_mgr.step_completed(
"App aborted. "
f"[grey70]View run at [underline]{run_result.app_page_url}[/underline][/grey70]"
f"[grey70]View run at [underline]{app_result.app_page_url}[/underline][/grey70]"
)
)
return
except BaseException as e:
logger.info("Exception during app run")
await _status_based_disconnect(client, run_result.app_id, e)
await _status_based_disconnect(client, app_result.app_id, e)
raise

# wait for logs gracefully, even though the task context would do the same
Expand All @@ -443,7 +431,7 @@ def heartbeat():
if output_mgr := _get_output_manager():
output_mgr.print(
output_mgr.step_completed(
f"App completed. [grey70]View run at [underline]{run_result.app_page_url}[/underline][/grey70]"
f"App completed. [grey70]View run at [underline]{app_result.app_page_url}[/underline][/grey70]"
)
)

Expand All @@ -459,20 +447,20 @@ async def _serve_update(
client = await _Client.from_env()
try:
running_app: RunningApp
run_result: RunResult
running_app, run_result = await _init_local_app_existing(client, existing_app_id, environment_name)
app_result: AppResult
running_app, app_result = await _init_local_app_existing(client, existing_app_id, environment_name)

# Create objects
await _create_all_objects(
client,
running_app,
app,
run_result.app_id,
app_result.app_id,
environment_name,
)

# Publish the updated app
await _publish_app(client, running_app, run_result, api_pb2.APP_STATE_UNSPECIFIED, app._functions, app._classes)
await _publish_app(client, running_app, app_result, api_pb2.APP_STATE_UNSPECIFIED, app._functions, app._classes)

# Communicate to the parent process
is_ready.set()
Expand All @@ -488,7 +476,7 @@ async def _deploy_app(
client: Optional[_Client] = None,
environment_name: Optional[str] = None,
tag: str = "",
) -> DeployResult:
) -> AppResult:
"""Deploy an app and export its objects persistently.
Typically, using the command-line tool `modal deploy <module or script>`
Expand Down Expand Up @@ -540,15 +528,15 @@ async def _deploy_app(
t0 = time.time()

running_app: RunningApp
run_result: RunResult
running_app, run_result = await _init_local_app_from_name(
app_result: AppResult
running_app, app_result = await _init_local_app_from_name(
client, name, namespace, environment_name=environment_name
)

async with TaskContext(0) as tc:
# Start heartbeats loop to keep the client alive
def heartbeat():
return _heartbeat(client, run_result.app_id)
return _heartbeat(client, app_result.app_id)

tc.infinite_loop(heartbeat, sleep=HEARTBEAT_INTERVAL)

Expand All @@ -558,26 +546,26 @@ def heartbeat():
client,
running_app,
app,
run_result.app_id,
app_result.app_id,
environment_name=environment_name,
)

app_url, warnings = await _publish_app(
client, running_app, run_result, api_pb2.APP_STATE_DEPLOYED, app._functions, app._classes, name, tag
client, running_app, app_result, api_pb2.APP_STATE_DEPLOYED, app._functions, app._classes, name, tag
)
except Exception as e:
# Note that AppClientDisconnect only stops the app if it's still initializing, and is a no-op otherwise.
await _disconnect(client, run_result.app_id, reason=api_pb2.APP_DISCONNECT_REASON_DEPLOYMENT_EXCEPTION)
await _disconnect(client, app_result.app_id, reason=api_pb2.APP_DISCONNECT_REASON_DEPLOYMENT_EXCEPTION)
raise e

if output_mgr := _get_output_manager():
t = time.time() - t0
output_mgr.print(output_mgr.step_completed(f"App deployed in {t:.3f}s! 🎉"))
output_mgr.print(f"\nView Deployment: [magenta]{app_url}[/magenta]")
return DeployResult(
app_id=run_result.app_id,
app_page_url=run_result.app_page_url,
app_logs_url=run_result.app_logs_url, # type: ignore
return AppResult(
app_id=app_result.app_id,
app_page_url=app_result.app_page_url,
app_logs_url=app_result.app_logs_url, # type: ignore
warnings=[warning.message for warning in warnings],
)

Expand Down

0 comments on commit 20a7519

Please sign in to comment.