Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(runtime): Add prototype Runloop runtime impl #4598

Merged
merged 19 commits into from
Nov 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions openhands/core/config/app_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class AppConfig:
file_uploads_max_file_size_mb: int = 0
file_uploads_restrict_file_types: bool = False
file_uploads_allowed_extensions: list[str] = field(default_factory=lambda: ['.*'])
runloop_api_key: str | None = None

defaults_dict: ClassVar[dict] = {}

Expand Down Expand Up @@ -139,6 +140,7 @@ def __str__(self):
'jwt_secret',
'modal_api_token_id',
'modal_api_token_secret',
'runloop_api_key',
]:
attr_value = '******' if attr_value else None

Expand Down
4 changes: 4 additions & 0 deletions openhands/runtime/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ def get_runtime_cls(name: str):
from openhands.runtime.impl.modal.modal_runtime import ModalRuntime

return ModalRuntime
elif name == 'runloop':
from openhands.runtime.impl.runloop.runloop_runtime import RunloopRuntime

return RunloopRuntime
else:
raise ValueError(f'Runtime {name} not supported')

Expand Down
31 changes: 31 additions & 0 deletions openhands/runtime/impl/runloop/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Runloop Runtime
Runloop provides a fast, secure and scalable AI sandbox (Devbox).
Check out the [runloop docs](https://docs.runloop.ai/overview/what-is-runloop)
for more detail

## Access
Runloop is currently available in a closed beta. For early access, or
just to say hello, sign up at https://www.runloop.ai/hello

## Set up
With your runloop API,
```bash
export RUNLOOP_API_KEY=<your-api-key>
```

Configure the runtime
```bash
export RUNTIME="runloop"
```

## Interact with your devbox
Runloop provides additional tools to interact with your Devbox based
runtime environment. See the [docs](https://docs.runloop.ai/tools) for an up
to date list of tools.

### Dashboard
View logs, ssh into, or view your Devbox status from the [dashboard](https://platform.runloop.ai)

### CLI
Use the Runloop CLI to view logs, execute commands, and more.
See the setup instructions [here](https://docs.runloop.ai/tools/cli)
272 changes: 272 additions & 0 deletions openhands/runtime/impl/runloop/runloop_runtime.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
import logging
import threading
import time
from typing import Callable

import requests
import tenacity
from runloop_api_client import Runloop
from runloop_api_client.types import DevboxView
from runloop_api_client.types.shared_params import LaunchParameters

from openhands.core.config import AppConfig
from openhands.core.logger import openhands_logger as logger
from openhands.events import EventStream
from openhands.runtime.impl.eventstream.eventstream_runtime import (
EventStreamRuntime,
LogBuffer,
)
from openhands.runtime.plugins import PluginRequirement
from openhands.runtime.utils.command import get_remote_startup_command
from openhands.runtime.utils.request import send_request
from openhands.utils.tenacity_stop import stop_if_should_exit


class RunloopLogBuffer(LogBuffer):
"""Synchronous buffer for Runloop devbox logs.

This class provides a thread-safe way to collect, store, and retrieve logs
from a Docker container. It uses a list to store log lines and provides methods
for appending, retrieving, and clearing logs.
"""

def __init__(self, runloop_api_client: Runloop, devbox_id: str):
self.client_ready = False
self.init_msg = 'Runtime client initialized.'

self.buffer: list[str] = []
self.lock = threading.Lock()
self._stop_event = threading.Event()
self.runloop_api_client = runloop_api_client
self.devbox_id = devbox_id
self.log_index = 0
self.log_stream_thread = threading.Thread(target=self.stream_logs)
self.log_stream_thread.daemon = True
self.log_stream_thread.start()

def stream_logs(self):
"""Stream logs from the Docker container in a separate thread.

This method runs in its own thread to handle the blocking
operation of reading log lines from the Docker SDK's synchronous generator.
"""

try:
# TODO(Runloop) Replace with stream
while True:
raw_logs = self.runloop_api_client.devboxes.logs.list(
self.devbox_id
).logs[self.log_index :]
logs = [
log.message
for log in raw_logs
if log.message and log.cmd_id is None
]

self.log_index += len(raw_logs)
if self._stop_event.is_set():
break
if logs:
for log_line in logs:
self.append(log_line)
if self.init_msg in log_line:
self.client_ready = True

time.sleep(1)
except Exception as e:
logger.error(f'Error streaming runloop logs: {e}')

# NB: Match LogBuffer behavior on below methods

def get_and_clear(self) -> list[str]:
with self.lock:
logs = list(self.buffer)
self.buffer.clear()
return logs

def append(self, log_line: str):
with self.lock:
self.buffer.append(log_line)

def close(self, timeout: float = 5.0):
self._stop_event.set()
self.log_stream_thread.join(timeout)


class RunloopRuntime(EventStreamRuntime):
"""The RunloopRuntime class is an EventStreamRuntime that utilizes Runloop Devbox as a runtime environment."""

_sandbox_port: int = 4444

def __init__(
self,
config: AppConfig,
event_stream: EventStream,
sid: str = 'default',
plugins: list[PluginRequirement] | None = None,
env_vars: dict[str, str] | None = None,
status_callback: Callable | None = None,
attach_to_existing: bool = False,
):
assert config.runloop_api_key is not None, 'Runloop API key is required'
self.devbox: DevboxView | None = None
self.config = config
self.runloop_api_client = Runloop(
bearer_token=config.runloop_api_key,
)
self.session = requests.Session()
self.container_name = self.container_name_prefix + sid
self.action_semaphore = threading.Semaphore(1) # Ensure one action at a time
self.init_base_runtime(
config,
event_stream,
sid,
plugins,
env_vars,
status_callback,
attach_to_existing,
Copy link
Collaborator

@rbren rbren Oct 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll need to make better use of this var--we use it pretty aggressively now. We create several Runtime objects within a session, and they all need to get access to the same runtime.

Can we track the runtime on runloop by sid, so that we can retrieve it if available?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yes - so essentially, if attach_to_existing, we should look up by sid and first try to use the existing sandbox. is that the correct understanding? if so, I will get that in

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup exactly!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great - do you know if any tests cover this flow? i didn't see at first glance

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm no I don't think we actually have integration tests here! We definitely should...

)
# Buffer for container logs
self.log_buffer: LogBuffer | None = None

@tenacity.retry(
stop=tenacity.stop_after_attempt(120),
wait=tenacity.wait_fixed(1),
)
def _wait_for_devbox(self, devbox: DevboxView) -> DevboxView:
"""Pull devbox status until it is running"""
if devbox == 'running':
return devbox

devbox = self.runloop_api_client.devboxes.retrieve(id=devbox.id)
if devbox.status != 'running':
raise ConnectionRefusedError('Devbox is not running')

# Devbox is connected and running
logging.debug(f'devbox.id={devbox.id} is running')
return devbox

def _create_new_devbox(self) -> DevboxView:
# Note: Runloop connect
sandbox_workspace_dir = self.config.workspace_mount_path_in_sandbox
plugin_args = []
if self.plugins is not None and len(self.plugins) > 0:
plugin_args.append('--plugins')
plugin_args.extend([plugin.name for plugin in self.plugins])

browsergym_args = []
if self.config.sandbox.browsergym_eval_env is not None:
browsergym_args = [
'-browsergym-eval-env',
self.config.sandbox.browsergym_eval_env,
]

# Copied from EventstreamRuntime
start_command = get_remote_startup_command(
self._sandbox_port,
sandbox_workspace_dir,
'openhands' if self.config.run_as_openhands else 'root',
self.config.sandbox.user_id,
plugin_args,
browsergym_args,
)

# Add some additional commands based on our image
# NB: start off as root, action_execution_server will ultimately choose user but expects all context
# (ie browser) to be installed as root
start_command = (
'export MAMBA_ROOT_PREFIX=/openhands/micromamba && '
'cd /openhands/code && '
+ '/openhands/micromamba/bin/micromamba run -n openhands poetry config virtualenvs.path /openhands/poetry && '
+ ' '.join(start_command)
)
entrypoint = f"sudo bash -c '{start_command}'"

devbox = self.runloop_api_client.devboxes.create(
entrypoint=entrypoint,
setup_commands=[f'mkdir -p {self.config.workspace_mount_path_in_sandbox}'],
name=self.sid,
environment_variables={'DEBUG': 'true'} if self.config.debug else {},
prebuilt='openhands',
launch_parameters=LaunchParameters(
available_ports=[self._sandbox_port],
resource_size_request="LARGE",
),
metadata={'container-name': self.container_name},
)
return self._wait_for_devbox(devbox)

async def connect(self):
self.send_status_message('STATUS$STARTING_RUNTIME')

if self.attach_to_existing:
active_devboxes = self.runloop_api_client.devboxes.list(
status='running'
).devboxes
self.devbox = next(
(devbox for devbox in active_devboxes if devbox.name == self.sid), None
)

if self.devbox is None:
self.devbox = self._create_new_devbox()

# Create tunnel - this will return a stable url, so is safe to call if we are attaching to existing
tunnel = self.runloop_api_client.devboxes.create_tunnel(
id=self.devbox.id,
port=self._sandbox_port,
)

# Hook up logs
self.log_buffer = RunloopLogBuffer(self.runloop_api_client, self.devbox.id)
self.api_url = f'https://{tunnel.url}'
logger.info(f'Container started. Server url: {self.api_url}')

# End Runloop connect
# NOTE: Copied from EventStreamRuntime
logger.info('Waiting for client to become ready...')
self.send_status_message('STATUS$WAITING_FOR_CLIENT')
self._wait_until_alive()

if not self.attach_to_existing:
self.setup_initial_env()

logger.info(
f'Container initialized with plugins: {[plugin.name for plugin in self.plugins]}'
)
self.send_status_message(' ')

@tenacity.retry(
stop=tenacity.stop_after_delay(120) | stop_if_should_exit(),
wait=tenacity.wait_fixed(1),
reraise=(ConnectionRefusedError,),
)
def _wait_until_alive(self):
# NB(Runloop): Remote logs are not guaranteed realtime, removing client_ready check from logs
self._refresh_logs()
if not self.log_buffer:
raise RuntimeError('Runtime client is not ready.')
response = send_request(
self.session,
'GET',
f'{self.api_url}/alive',
timeout=5,
)
if response.status_code == 200:
return
else:
msg = f'Action execution API is not alive. Response: {response}'
logger.error(msg)
raise RuntimeError(msg)

def close(self, rm_all_containers: bool = True):
if self.log_buffer:
self.log_buffer.close()

if self.session:
self.session.close()

if self.attach_to_existing:
return

if self.devbox:
self.runloop_api_client.devboxes.shutdown(self.devbox.id)
Loading
Loading