Skip to content

Commit

Permalink
RunningApp -> AppLayout
Browse files Browse the repository at this point in the history
  • Loading branch information
erikbern committed Jan 27, 2025
1 parent 20a7519 commit 7447327
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 57 deletions.
6 changes: 3 additions & 3 deletions modal/_container_entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
_find_callables_for_obj,
_PartialFunctionFlags,
)
from modal.running_app import RunningApp
from modal.app_layout import AppLayout
from modal_proto import api_pb2

from ._runtime.container_io_manager import (
Expand Down Expand Up @@ -468,12 +468,12 @@ def main(container_args: api_pb2.ContainerArguments, client: Client):
batch_wait_ms = function_def.batch_linger_ms or 0

# Get ids and metadata for objects (primarily functions and classes) on the app
container_app: RunningApp = RunningApp(container_args.app_layout)
app_layout: AppLayout = AppLayout(container_args.app_layout)

# Initialize objects on the app.
# This is basically only functions and classes - anything else is deprecated and will be unsupported soon
app: App = synchronizer._translate_out(active_app)
app._init_container(client, container_args.app_id, container_app)
app._init_container(client, container_args.app_id, app_layout)

# Hydrate all function dependencies.
# TODO(erikbern): we an remove this once we
Expand Down
44 changes: 22 additions & 22 deletions modal/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
)
from .proxy import _Proxy
from .retries import Retries
from .running_app import RunningApp
from .app_layout import AppLayout
from .schedule import Schedule
from .scheduler_placement import SchedulerPlacement
from .secret import _Secret
Expand Down Expand Up @@ -167,7 +167,7 @@ def foo():

# Running apps only (container apps or running local)
_app_id: Optional[str] # Kept after app finishes
_running_app: Optional[RunningApp] # Various app info
_app_layout: Optional[AppLayout] # Various app info
_client: Optional[_Client]
_interactive: Optional[bool]

Expand Down Expand Up @@ -212,7 +212,7 @@ def __init__(
self._web_endpoints = []

self._app_id = None
self._running_app = None # Set inside container, OR during the time an app is running locally
self._app_layout = None # Set inside container, OR during the time an app is running locally
self._client = None
self._interactive = None

Expand Down Expand Up @@ -274,7 +274,7 @@ async def lookup(
app = _App(name)
app._app_id = response.app_id
app._client = client
app._running_app = RunningApp()
app._app_layout = AppLayout()
return app

def set_description(self, description: str):
Expand All @@ -301,17 +301,17 @@ def _uncreate_all_objects(self):

@asynccontextmanager
async def _set_local_app(
self, client: _Client, running_app: RunningApp, app_id: str, interactive: bool
self, client: _Client, app_layout: AppLayout, app_id: str, interactive: bool
) -> AsyncGenerator[None, None]:
self._client = client
self._running_app = running_app
self._app_layout = app_layout
self._app_id = app_id
self._interactive = interactive
try:
yield
finally:
self._client = None
self._running_app = None
self._app_layout = None
self._interactive = None
self._uncreate_all_objects()

Expand Down Expand Up @@ -390,7 +390,7 @@ def _get_default_image(self):
return _default_image

def _get_watch_mounts(self):
if not self._running_app:
if not self._app_layout:
raise ExecutionError("`_get_watch_mounts` requires a running app.")

all_mounts = [
Expand All @@ -416,48 +416,48 @@ def _add_function(self, function: _Function, is_web_endpoint: bool):
if function.tag in self._classes:
logger.warning(f"Warning: tag {function.tag} exists but is overridden by function")

if self._running_app:
if self._app_layout:
# If this is inside a container, then objects can be defined after app initialization.
# So we may have to initialize objects once they get bound to the app.
if function.tag in self._running_app.function_ids:
object_id: str = self._running_app.function_ids[function.tag]
metadata: Message = self._running_app.object_handle_metadata[object_id]
if function.tag in self._app_layout.function_ids:
object_id: str = self._app_layout.function_ids[function.tag]
metadata: Message = self._app_layout.object_handle_metadata[object_id]
function._hydrate(object_id, self._client, metadata)

self._functions[function.tag] = function
if is_web_endpoint:
self._web_endpoints.append(function.tag)

def _add_class(self, tag: str, cls: _Cls):
if self._running_app:
if self._app_layout:
# If this is inside a container, then objects can be defined after app initialization.
# So we may have to initialize objects once they get bound to the app.
if tag in self._running_app.class_ids:
object_id: str = self._running_app.class_ids[tag]
metadata: Message = self._running_app.object_handle_metadata[object_id]
if tag in self._app_layout.class_ids:
object_id: str = self._app_layout.class_ids[tag]
metadata: Message = self._app_layout.object_handle_metadata[object_id]
cls._hydrate(object_id, self._client, metadata)

self._classes[tag] = cls

def _init_container(self, client: _Client, app_id: str, running_app: RunningApp):
def _init_container(self, client: _Client, app_id: str, app_layout: AppLayout):
self._app_id = app_id
self._running_app = running_app
self._app_layout = app_layout
self._client = client

_App._container_app = self

# Hydrate function objects
for tag, object_id in running_app.function_ids.items():
for tag, object_id in app_layout.function_ids.items():
if tag in self._functions:
obj = self._functions[tag]
handle_metadata = running_app.object_handle_metadata[object_id]
handle_metadata = app_layout.object_handle_metadata[object_id]
obj._hydrate(object_id, client, handle_metadata)

# Hydrate class objects
for tag, object_id in running_app.class_ids.items():
for tag, object_id in app_layout.class_ids.items():
if tag in self._classes:
obj = self._classes[tag]
handle_metadata = running_app.object_handle_metadata[object_id]
handle_metadata = app_layout.object_handle_metadata[object_id]
obj._hydrate(object_id, client, handle_metadata)

@property
Expand Down
2 changes: 1 addition & 1 deletion modal/running_app.py → modal/app_layout.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from modal_proto import api_pb2


class RunningApp:
class AppLayout:
function_ids: dict[str, str]
class_ids: dict[str, str]
object_handle_metadata: dict[str, Optional[Message]]
Expand Down
62 changes: 31 additions & 31 deletions modal/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from .exception import InteractiveTimeoutError, InvalidError, RemoteError, _CliUserExecutionError
from .functions import _Function
from .output import _get_output_manager, enable_output
from .running_app import RunningApp
from .app_layout import AppLayout
from .sandbox import _Sandbox
from .secret import _Secret
from .stream_type import StreamType
Expand Down Expand Up @@ -65,7 +65,7 @@ async def _heartbeat(client: _Client, app_id: str) -> None:

async def _init_local_app_existing(
client: _Client, existing_app_id: str, environment_name: str
) -> tuple[RunningApp, AppResult]:
) -> tuple[AppLayout, AppResult]:
# Get all the objects first
obj_req = api_pb2.AppGetLayoutRequest(app_id=existing_app_id)
obj_resp, _ = await gather_cancel_on_exc(
Expand All @@ -74,17 +74,17 @@ async def _init_local_app_existing(
_get_environment_cached(environment_name, client),
)
app_page_url = f"https://modal.com/apps/{existing_app_id}" # TODO (elias): this should come from the backend
running_app: RunningApp = RunningApp(obj_resp.app_layout)
app_layout: AppLayout = AppLayout(obj_resp.app_layout)
app_result: AppResult = AppResult(existing_app_id, app_page_url=app_page_url)
return (running_app, app_result)
return (app_layout, app_result)


async def _init_local_app_new(
client: _Client,
description: str,
app_state: int, # ValueType
environment_name: str = "",
) -> tuple[RunningApp, AppResult]:
) -> tuple[AppLayout, AppResult]:
app_req = api_pb2.AppCreateRequest(
description=description,
environment_name=environment_name,
Expand All @@ -96,21 +96,21 @@ async def _init_local_app_new(
_get_environment_cached(environment_name, client),
)
logger.debug(f"Created new app with id {app_resp.app_id}")
running_app: RunningApp = RunningApp()
app_layout: AppLayout = AppLayout()
app_result: AppResult = AppResult(
app_resp.app_id,
app_page_url=app_resp.app_page_url,
app_logs_url=app_resp.app_logs_url,
)
return (running_app, app_result)
return (app_layout, app_result)


async def _init_local_app_from_name(
client: _Client,
name: str,
namespace: Any,
environment_name: str = "",
) -> tuple[RunningApp, AppResult]:
) -> tuple[AppLayout, AppResult]:
# Look up any existing deployment
app_req = api_pb2.AppGetByDeploymentNameRequest(
name=name,
Expand All @@ -131,7 +131,7 @@ async def _init_local_app_from_name(

async def _create_all_objects(
client: _Client,
running_app: RunningApp,
app_layout: AppLayout,
app: _App,
app_id: str,
environment_name: str,
Expand All @@ -145,9 +145,9 @@ async def _create_all_objects(
)
with resolver.display():
# Get current objects, and reset all objects
tag_to_object_id = {**running_app.function_ids, **running_app.class_ids}
running_app.function_ids = {}
running_app.class_ids = {}
tag_to_object_id = {**app_layout.function_ids, **app_layout.class_ids}
app_layout.function_ids = {}
app_layout.class_ids = {}

# Assign all objects
for tag, obj in indexed_objects.items():
Expand Down Expand Up @@ -175,9 +175,9 @@ async def _load(tag, obj):
existing_object_id = tag_to_object_id.get(tag)
await resolver.load(obj, existing_object_id)
if _Function._is_id_type(obj.object_id):
running_app.function_ids[tag] = obj.object_id
app_layout.function_ids[tag] = obj.object_id
elif _Cls._is_id_type(obj.object_id):
running_app.class_ids[tag] = obj.object_id
app_layout.class_ids[tag] = obj.object_id
else:
raise RuntimeError(f"Unexpected object {obj.object_id}")

Expand All @@ -186,7 +186,7 @@ async def _load(tag, obj):

async def _publish_app(
client: _Client,
running_app: RunningApp,
app_layout: AppLayout,
app_result: AppResult,
app_state: int, # api_pb2.AppState.value
functions: dict[str, _Function],
Expand All @@ -203,8 +203,8 @@ async def _publish_app(
name=name,
deployment_tag=tag,
app_state=app_state, # type: ignore : should be a api_pb2.AppState.value
function_ids=running_app.function_ids,
class_ids=running_app.class_ids,
function_ids=app_layout.function_ids,
class_ids=app_layout.class_ids,
definition_ids=definition_ids,
)
try:
Expand Down Expand Up @@ -279,7 +279,7 @@ async def _run_app(
" Are you calling app.run() directly?"
" Consider using the `modal run` shell command."
)
if app._running_app:
if app._app_layout:
raise InvalidError(
"App is already running and can't be started again.\n"
"You should not use `app.run` or `run_app` within a Modal `local_entrypoint`"
Expand Down Expand Up @@ -309,17 +309,17 @@ async def _run_app(
)
interactive = False

running_app: RunningApp
app_layout: AppLayout
app_result: AppResult
running_app, app_result = await _init_local_app_new(
app_layout, app_result = await _init_local_app_new(
client,
app.description or "",
environment_name=environment_name or "",
app_state=app_state,
)

logs_timeout = config["logs_timeout"]
async with app._set_local_app(client, running_app, app_result.app_id, interactive), TaskContext(
async with app._set_local_app(client, app_layout, app_result.app_id, interactive), TaskContext(
grace=logs_timeout
) as tc:
# Start heartbeats loop to keep the client alive
Expand Down Expand Up @@ -352,10 +352,10 @@ def heartbeat():

try:
# Create all members
await _create_all_objects(client, running_app, app, app_result.app_id, environment_name)
await _create_all_objects(client, app_layout, app, app_result.app_id, environment_name)

# Publish the app
await _publish_app(client, running_app, app_result, app_state, app._functions, app._classes)
await _publish_app(client, app_layout, app_result, app_state, app._functions, app._classes)
except asyncio.CancelledError as e:
# this typically happens on sigint/ctrl-C during setup (the KeyboardInterrupt happens in the main thread)
if output_mgr := _get_output_manager():
Expand Down Expand Up @@ -446,21 +446,21 @@ async def _serve_update(
# Used by child process to reinitialize a served app
client = await _Client.from_env()
try:
running_app: RunningApp
app_layout: AppLayout
app_result: AppResult
running_app, app_result = await _init_local_app_existing(client, existing_app_id, environment_name)
app_layout, app_result = await _init_local_app_existing(client, existing_app_id, environment_name)

# Create objects
await _create_all_objects(
client,
running_app,
app_layout,
app,
app_result.app_id,
environment_name,
)

# Publish the updated app
await _publish_app(client, running_app, app_result, api_pb2.APP_STATE_UNSPECIFIED, app._functions, app._classes)
await _publish_app(client, app_layout, app_result, api_pb2.APP_STATE_UNSPECIFIED, app._functions, app._classes)

# Communicate to the parent process
is_ready.set()
Expand Down Expand Up @@ -527,9 +527,9 @@ async def _deploy_app(

t0 = time.time()

running_app: RunningApp
app_layout: AppLayout
app_result: AppResult
running_app, app_result = await _init_local_app_from_name(
app_layout, app_result = await _init_local_app_from_name(
client, name, namespace, environment_name=environment_name
)

Expand All @@ -544,14 +544,14 @@ def heartbeat():
# Create all members
await _create_all_objects(
client,
running_app,
app_layout,
app,
app_result.app_id,
environment_name=environment_name,
)

app_url, warnings = await _publish_app(
client, running_app, app_result, api_pb2.APP_STATE_DEPLOYED, app._functions, app._classes, name, tag
client, app_layout, app_result, api_pb2.APP_STATE_DEPLOYED, app._functions, app._classes, name, tag
)
except Exception as e:
# Note that AppClientDisconnect only stops the app if it's still initializing, and is a no-op otherwise.
Expand Down

0 comments on commit 7447327

Please sign in to comment.