diff --git a/docs/source/serving/sky-serve.rst b/docs/source/serving/sky-serve.rst index 1c4ee3f2751..3ccbed140c0 100644 --- a/docs/source/serving/sky-serve.rst +++ b/docs/source/serving/sky-serve.rst @@ -22,7 +22,7 @@ Why SkyServe? How it works: -- Each service gets an endpoint that automatically redirects requests to its replicas. +- Each service gets an endpoint that automatically distributes requests to its replicas. - Replicas of the same service can run in different regions and clouds — reducing cloud costs and increasing availability. - SkyServe handles the load balancing, recovery, and autoscaling of the replicas. @@ -127,7 +127,7 @@ Run :code:`sky serve up service.yaml` to deploy the service with automatic price If you see the :code:`STATUS` column becomes :code:`READY`, then the service is ready to accept traffic! -Simply ``curl -L`` the service endpoint, which automatically load-balances across the two replicas: +Simply ``curl`` the service endpoint, which automatically load-balances across the two replicas: .. tab-set:: @@ -136,7 +136,7 @@ Simply ``curl -L`` the service endpoint, which automatically load-balances acros .. code-block:: console - $ curl -L 3.84.15.251:30001/v1/chat/completions \ + $ curl 3.84.15.251:30001/v1/chat/completions \ -X POST \ -d '{"model": "mistralai/Mixtral-8x7B-Instruct-v0.1", "messages": [{"role": "user", "content": "Who are you?"}]}' \ -H 'Content-Type: application/json' @@ -149,7 +149,7 @@ Simply ``curl -L`` the service endpoint, which automatically load-balances acros .. code-block:: console - $ curl -L 44.211.131.51:30001/generate \ + $ curl 44.211.131.51:30001/generate \ -X POST \ -d '{"inputs":"What is Deep Learning?","parameters":{"max_new_tokens":20}}' \ -H 'Content-Type: application/json' @@ -240,7 +240,7 @@ Under the hood, :code:`sky serve up`: #. Launches a controller which handles autoscaling, monitoring and load balancing; #. Returns a Service Endpoint which will be used to accept traffic; #. Meanwhile, the controller provisions replica VMs which later run the services; -#. Once any replica is ready, the requests sent to the Service Endpoint will be **HTTP-redirect** to one of the endpoint replicas. +#. Once any replica is ready, the requests sent to the Service Endpoint will be distributed to one of the endpoint replicas. After the controller is provisioned, you'll see the following in :code:`sky serve status` output: @@ -264,7 +264,7 @@ sending requests to :code:`` (e.g., ``44.201.119.3:30001``): .. code-block:: console - $ curl -L + $ curl My First SkyServe Service @@ -274,12 +274,6 @@ sending requests to :code:`` (e.g., ``44.201.119.3:30001``): -.. note:: - - Since we are using HTTP-redirect, we need to use :code:`curl -L - `. The :code:`curl` command by default won't follow the - redirect. - Tutorial: Serve a Chatbot LLM! ------------------------------ @@ -368,7 +362,7 @@ Send a request using the following cURL command: .. code-block:: console - $ curl -L http:///v1/chat/completions \ + $ curl http:///v1/chat/completions \ -X POST \ -d '{"model":"vicuna-7b-v1.3","messages":[{"role":"system","content":"You are a helpful assistant."},{"role":"user","content":"Who are you?"}],"temperature":0}' \ -H 'Content-Type: application/json' @@ -468,7 +462,7 @@ SkyServe has a centralized controller VM that manages the deployment of your ser It is composed of the following components: #. **Controller**: The controller will monitor the status of the replicas and re-launch a new replica if one of them fails. It also autoscales the number of replicas if autoscaling config is set (see :ref:`Service YAML spec ` for more information). -#. **Load Balancer**: The load balancer will route the traffic to all ready replicas. It is a lightweight HTTP server that listens on the service endpoint and **HTTP-redirects** the requests to one of the replicas. +#. **Load Balancer**: The load balancer will route the traffic to all ready replicas. It is a lightweight HTTP server that listens on the service endpoint and distribute the requests to one of the replicas. All of the process group shares a single controller VM. The controller VM will be launched in the cloud with the best price/performance ratio. You can also :ref:`customize the controller resources ` based on your needs. diff --git a/examples/cog/README.md b/examples/cog/README.md index 4fa4890420f..b2193e2e18f 100644 --- a/examples/cog/README.md +++ b/examples/cog/README.md @@ -28,7 +28,7 @@ After the service is launched, access the deployment with the following: ```console ENDPOINT=$(sky serve status --endpoint cog) -curl -L http://$ENDPOINT/predictions -X POST \ +curl http://$ENDPOINT/predictions -X POST \ -H 'Content-Type: application/json' \ -d '{"input": {"image": "https://blog.skypilot.co/introducing-sky-serve/images/sky-serve-thumbnail.png"}}' \ | jq -r '.output | split(",")[1]' | base64 --decode > output.png diff --git a/examples/serve/misc/cancel/README.md b/examples/serve/misc/cancel/README.md index 65b88c2d540..61c24383909 100644 --- a/examples/serve/misc/cancel/README.md +++ b/examples/serve/misc/cancel/README.md @@ -1,6 +1,6 @@ # SkyServe cancel example -This example demonstrates the redirect support canceling a request. +This example demonstrates the SkyServe load balancer support canceling a request. ## Running the example @@ -33,7 +33,7 @@ Client disconnected, stopping computation. You can also run ```bash -curl -L http:/// +curl http:/// ``` and manually Ctrl + C to cancel the request and see logs. diff --git a/examples/serve/stable_diffusion_service.yaml b/examples/serve/stable_diffusion_service.yaml index 86ef257e7ca..2adaf6e4ca6 100644 --- a/examples/serve/stable_diffusion_service.yaml +++ b/examples/serve/stable_diffusion_service.yaml @@ -18,7 +18,7 @@ file_mounts: /stable_diffusion: examples/stable_diffusion setup: | - sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose + sudo curl "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose sudo chmod +x /usr/local/bin/docker-compose cd stable-diffusion-webui-docker sudo rm -r stable-diffusion-webui-docker diff --git a/examples/stable_diffusion/stable_diffusion_docker.yaml b/examples/stable_diffusion/stable_diffusion_docker.yaml index 9c07790ba6b..47499fa2ea4 100644 --- a/examples/stable_diffusion/stable_diffusion_docker.yaml +++ b/examples/stable_diffusion/stable_diffusion_docker.yaml @@ -7,7 +7,7 @@ file_mounts: /stable_diffusion: . setup: | - sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose + sudo curl "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose sudo chmod +x /usr/local/bin/docker-compose cd stable-diffusion-webui-docker sudo rm -r stable-diffusion-webui-docker diff --git a/llm/codellama/README.md b/llm/codellama/README.md index 1ed02e301d1..8e5025d22b5 100644 --- a/llm/codellama/README.md +++ b/llm/codellama/README.md @@ -68,7 +68,7 @@ Launching a cluster 'code-llama'. Proceed? [Y/n]: ```bash IP=$(sky status --ip code-llama) -curl -L http://$IP:8000/v1/completions \ +curl http://$IP:8000/v1/completions \ -H "Content-Type: application/json" \ -d '{ "model": "codellama/CodeLlama-70b-Instruct-hf", @@ -131,7 +131,7 @@ availability of the service while minimizing the cost. ```bash ENDPOINT=$(sky serve status --endpoint code-llama) -curl -L http://$ENDPOINT/v1/completions \ +curl http://$ENDPOINT/v1/completions \ -H "Content-Type: application/json" \ -d '{ "model": "codellama/CodeLlama-70b-Instruct-hf", @@ -146,7 +146,7 @@ We can also access the Code Llama service with the openAI Chat API. ```bash ENDPOINT=$(sky serve status --endpoint code-llama) -curl -L http://$ENDPOINT/v1/chat/completions \ +curl http://$ENDPOINT/v1/chat/completions \ -H "Content-Type: application/json" \ -d '{ "model": "codellama/CodeLlama-70b-Instruct-hf", diff --git a/llm/dbrx/README.md b/llm/dbrx/README.md index 4cb6ad47d6e..e0ad216e92c 100644 --- a/llm/dbrx/README.md +++ b/llm/dbrx/README.md @@ -256,7 +256,7 @@ ENDPOINT=$(sky serve status --endpoint dbrx) To curl the endpoint: ```console -curl -L $ENDPOINT/v1/chat/completions \ +curl $ENDPOINT/v1/chat/completions \ -H "Content-Type: application/json" \ -d '{ "model": "databricks/dbrx-instruct", diff --git a/llm/gemma/README.md b/llm/gemma/README.md index 2801c4fd6f3..ef5027b2807 100644 --- a/llm/gemma/README.md +++ b/llm/gemma/README.md @@ -37,7 +37,7 @@ After the cluster is launched, we can access the model with the following comman ```bash IP=$(sky status --ip gemma) -curl -L http://$IP:8000/v1/completions \ +curl http://$IP:8000/v1/completions \ -H "Content-Type: application/json" \ -d '{ "model": "google/gemma-7b-it", @@ -50,7 +50,7 @@ Chat API is also supported: ```bash IP=$(sky status --ip gemma) -curl -L http://$IP:8000/v1/chat/completions \ +curl http://$IP:8000/v1/chat/completions \ -H "Content-Type: application/json" \ -d '{ "model": "google/gemma-7b-it", @@ -78,7 +78,7 @@ After the cluster is launched, we can access the model with the following comman ```bash ENDPOINT=$(sky serve status --endpoint gemma) -curl -L http://$ENDPOINT/v1/completions \ +curl http://$ENDPOINT/v1/completions \ -H "Content-Type: application/json" \ -d '{ "model": "google/gemma-7b-it", @@ -89,7 +89,7 @@ curl -L http://$ENDPOINT/v1/completions \ Chat API is also supported: ```bash -curl -L http://$ENDPOINT/v1/chat/completions \ +curl http://$ENDPOINT/v1/chat/completions \ -H "Content-Type: application/json" \ -d '{ "model": "google/gemma-7b-it", diff --git a/llm/mixtral/README.md b/llm/mixtral/README.md index 208b40ca14b..0bddb77c665 100644 --- a/llm/mixtral/README.md +++ b/llm/mixtral/README.md @@ -53,7 +53,7 @@ We can now access the model through the OpenAI API with the IP and port: ```bash IP=$(sky status --ip mixtral) -curl -L http://$IP:8000/v1/completions \ +curl http://$IP:8000/v1/completions \ -H "Content-Type: application/json" \ -d '{ "model": "mistralai/Mixtral-8x7B-Instruct-v0.1", @@ -66,7 +66,7 @@ Chat API is also supported: ```bash IP=$(sky status --ip mixtral) -curl -L http://$IP:8000/v1/chat/completions \ +curl http://$IP:8000/v1/chat/completions \ -H "Content-Type: application/json" \ -d '{ "model": "mistralai/Mixtral-8x7B-Instruct-v0.1", @@ -119,7 +119,7 @@ After the `sky serve up` command, there will be a single endpoint for the servic ```bash ENDPOINT=$(sky serve status --endpoint mixtral) -curl -L http://$ENDPOINT/v1/completions \ +curl http://$ENDPOINT/v1/completions \ -H "Content-Type: application/json" \ -d '{ "model": "mistralai/Mixtral-8x7B-Instruct-v0.1", @@ -132,7 +132,7 @@ Chat API is also supported: ```bash ENDPOINT=$(sky serve status --endpoint mixtral) -curl -L http://$ENDPOINT/v1/chat/completions \ +curl http://$ENDPOINT/v1/chat/completions \ -H "Content-Type: application/json" \ -d '{ "model": "mistralai/Mixtral-8x7B-Instruct-v0.1", diff --git a/llm/qwen/README.md b/llm/qwen/README.md index 6ab9bb22ffc..113bbd9e740 100644 --- a/llm/qwen/README.md +++ b/llm/qwen/README.md @@ -34,7 +34,7 @@ sky launch -c qwen serve-110b.yaml ```bash IP=$(sky status --ip qwen) -curl -L http://$IP:8000/v1/completions \ +curl http://$IP:8000/v1/completions \ -H "Content-Type: application/json" \ -d '{ "model": "Qwen/Qwen1.5-110B-Chat", @@ -45,7 +45,7 @@ curl -L http://$IP:8000/v1/completions \ 3. Send a request for chat completion: ```bash -curl -L http://$IP:8000/v1/chat/completions \ +curl http://$IP:8000/v1/chat/completions \ -H "Content-Type: application/json" \ -d '{ "model": "Qwen/Qwen1.5-110B-Chat", @@ -92,11 +92,11 @@ As shown, the service is now backed by 2 replicas, one on Azure and one on GCP, type is chosen to be **the cheapest available one** on the clouds. That said, it maximizes the availability of the service while minimizing the cost. -3. To access the model, we use a `curl -L` command (`-L` to follow redirect) to send the request to the endpoint: +3. To access the model, we use a `curl` command to send the request to the endpoint: ```bash ENDPOINT=$(sky serve status --endpoint qwen) -curl -L http://$ENDPOINT/v1/chat/completions \ +curl http://$ENDPOINT/v1/chat/completions \ -H "Content-Type: application/json" \ -d '{ "model": "Qwen/Qwen1.5-72B-Chat", diff --git a/llm/sglang/README.md b/llm/sglang/README.md index 3ffcc2f484b..fc79529148a 100644 --- a/llm/sglang/README.md +++ b/llm/sglang/README.md @@ -68,7 +68,7 @@ ENDPOINT=$(sky serve status --endpoint sglang-llava) ```bash -curl -L $ENDPOINT/v1/chat/completions \ +curl $ENDPOINT/v1/chat/completions \ -H "Content-Type: application/json" \ -d '{ "model": "liuhaotian/llava-v1.6-vicuna-7b", @@ -149,7 +149,7 @@ ENDPOINT=$(sky serve status --endpoint sglang-llama2) 4. Once it status is `READY`, you can use the endpoint to interact with the model: ```bash -curl -L $ENDPOINT/v1/chat/completions \ +curl $ENDPOINT/v1/chat/completions \ -H "Content-Type: application/json" \ -d '{ "model": "meta-llama/Llama-2-7b-chat-hf", diff --git a/llm/tgi/README.md b/llm/tgi/README.md index 8fb68222d68..8c8360d0465 100644 --- a/llm/tgi/README.md +++ b/llm/tgi/README.md @@ -17,7 +17,7 @@ A user can access the model with the following command: ```bash ENDPOINT=$(sky status --endpoint 8080 tgi) -curl -L $(sky serve status tgi --endpoint)/generate \ +curl $(sky serve status tgi --endpoint)/generate \ -H 'Content-Type: application/json' \ -d '{ "inputs": "What is Deep Learning?", @@ -51,7 +51,7 @@ After the service is launched, we can access the model with the following comman ```bash ENDPOINT=$(sky serve status --endpoint tgi) -curl -L $ENDPOINT/generate \ +curl $ENDPOINT/generate \ -H 'Content-Type: application/json' \ -d '{ "inputs": "What is Deep Learning?", diff --git a/llm/vllm/README.md b/llm/vllm/README.md index 568b8ff70bd..61932cd8571 100644 --- a/llm/vllm/README.md +++ b/llm/vllm/README.md @@ -154,7 +154,7 @@ ENDPOINT=$(sky serve status --endpoint vllm-llama2) 4. Once it status is `READY`, you can use the endpoint to interact with the model: ```bash -curl -L $ENDPOINT/v1/chat/completions \ +curl $ENDPOINT/v1/chat/completions \ -H "Content-Type: application/json" \ -d '{ "model": "meta-llama/Llama-2-7b-chat-hf", @@ -171,7 +171,7 @@ curl -L $ENDPOINT/v1/chat/completions \ }' ``` -Notice that it is the same with previously curl command, except for thr `-L` argument. You should get a similar response as the following: +Notice that it is the same with previously curl command. You should get a similar response as the following: ```console { diff --git a/sky/serve/README.md b/sky/serve/README.md index 1131849a8d3..838f4dd6d3b 100644 --- a/sky/serve/README.md +++ b/sky/serve/README.md @@ -2,7 +2,7 @@ Serving library for SkyPilot. -The goal of Sky Serve is simple - expose one endpoint, that redirects to serving endpoints running on different resources, regions and clouds. +The goal of Sky Serve is simple - exposing one endpoint, that distributes any incoming traffic to serving endpoints running on different resources, regions, and clouds. Sky Serve transparently handles load balancing, failover and autoscaling of the serving endpoints. @@ -11,8 +11,8 @@ Sky Serve transparently handles load balancing, failover and autoscaling of the ![Architecture](../../docs/source/images/sky-serve-architecture.png) Sky Serve has four key components: -1. Redirector - receiving requests and redirecting them to healthy endpoints. -2. Load balancers - spread requests across healthy endpoints according to different policies. +1. Load Balancers - receiving requests and distributing them to healthy endpoints. +2. Load Balancing Policies - spread requests across healthy endpoints according to different policies. 3. Autoscalers - scale up and down the number of serving endpoints according to different policies. 4. Replica Managers - monitoring replica status and handle recovery of unhealthy endpoints. diff --git a/sky/serve/constants.py b/sky/serve/constants.py index 07f3e837ed4..89ca683ada5 100644 --- a/sky/serve/constants.py +++ b/sky/serve/constants.py @@ -21,6 +21,18 @@ # interval. LB_CONTROLLER_SYNC_INTERVAL_SECONDS = 20 +# The maximum retry times for load balancer for each request. After changing to +# proxy implementation, we do retry for failed requests. +# TODO(tian): Expose this option to users in yaml file. +LB_MAX_RETRY = 3 + +# The timeout in seconds for load balancer to wait for a response from replica. +# Large LLMs like Llama2-70b is able to process the request within ~30 seconds. +# We set the timeout to 120s to be safe. For reference, FastChat uses 100s: +# https://github.com/lm-sys/FastChat/blob/f2e6ca964af7ad0585cadcf16ab98e57297e2133/fastchat/constants.py#L39 # pylint: disable=line-too-long +# TODO(tian): Expose this option to users in yaml file. +LB_STREAM_TIMEOUT = 120 + # Interval in seconds to probe replica endpoint. ENDPOINT_PROBE_INTERVAL_SECONDS = 10 diff --git a/sky/serve/core.py b/sky/serve/core.py index 79aa53f7b58..f193a85285b 100644 --- a/sky/serve/core.py +++ b/sky/serve/core.py @@ -285,7 +285,7 @@ def up( f'{backend_utils.BOLD}watch -n10 sky serve status {service_name}' f'{backend_utils.RESET_BOLD}' '\nTo send a test request:\t\t' - f'{backend_utils.BOLD}curl -L {endpoint}' + f'{backend_utils.BOLD}curl {endpoint}' f'{backend_utils.RESET_BOLD}' '\n' f'\n{fore.GREEN}SkyServe is spinning up your service now.' diff --git a/sky/serve/load_balancer.py b/sky/serve/load_balancer.py index 7864e242148..24d0958489d 100644 --- a/sky/serve/load_balancer.py +++ b/sky/serve/load_balancer.py @@ -1,29 +1,30 @@ -"""LoadBalancer: redirect any incoming request to an endpoint replica.""" +"""LoadBalancer: Distribute any incoming request to all ready replicas.""" +import asyncio import logging import threading -import time +from typing import Dict, Union +import aiohttp import fastapi -import requests +import httpx +from starlette import background import uvicorn from sky import sky_logging from sky.serve import constants from sky.serve import load_balancing_policies as lb_policies from sky.serve import serve_utils +from sky.utils import common_utils logger = sky_logging.init_logger(__name__) class SkyServeLoadBalancer: - """SkyServeLoadBalancer: redirect incoming traffic. + """SkyServeLoadBalancer: distribute incoming traffic with proxy. - This class accept any traffic to the controller and redirect it + This class accept any traffic to the controller and proxies it to the appropriate endpoint replica according to the load balancing policy. - - NOTE: HTTP redirect is used. Thus, when using `curl`, be sure to use - `curl -L`. """ def __init__(self, controller_url: str, load_balancer_port: int) -> None: @@ -34,14 +35,27 @@ def __init__(self, controller_url: str, load_balancer_port: int) -> None: load_balancer_port: The port where the load balancer listens to. """ self._app = fastapi.FastAPI() - self._controller_url = controller_url - self._load_balancer_port = load_balancer_port + self._controller_url: str = controller_url + self._load_balancer_port: int = load_balancer_port self._load_balancing_policy: lb_policies.LoadBalancingPolicy = ( lb_policies.RoundRobinPolicy()) self._request_aggregator: serve_utils.RequestsAggregator = ( serve_utils.RequestTimestamp()) - - def _sync_with_controller(self): + # TODO(tian): httpx.Client has a resource limit of 100 max connections + # for each client. We should wait for feedback on the best max + # connections. + # Reference: https://www.python-httpx.org/advanced/resource-limits/ + # + # If more than 100 requests are sent to the same replica, the + # httpx.Client will queue the requests and send them when a + # connection is available. + # Reference: https://github.com/encode/httpcore/blob/a8f80980daaca98d556baea1783c5568775daadc/httpcore/_async/connection_pool.py#L69-L71 # pylint: disable=line-too-long + self._client_pool: Dict[str, httpx.AsyncClient] = dict() + # We need this lock to avoid getting from the client pool while + # updating it from _sync_with_controller. + self._client_pool_lock: threading.Lock = threading.Lock() + + async def _sync_with_controller(self): """Sync with controller periodically. Every `constants.LB_CONTROLLER_SYNC_INTERVAL_SECONDS` seconds, the @@ -51,58 +65,157 @@ def _sync_with_controller(self): autoscaling decisions. """ # Sleep for a while to wait the controller bootstrap. - time.sleep(5) + await asyncio.sleep(5) while True: - with requests.Session() as session: + close_client_tasks = [] + async with aiohttp.ClientSession() as session: try: # Send request information - response = session.post( - self._controller_url + '/controller/load_balancer_sync', - json={ - 'request_aggregator': - self._request_aggregator.to_dict() - }, - timeout=5) - # Clean up after reporting request information to avoid OOM. - self._request_aggregator.clear() - response.raise_for_status() - ready_replica_urls = response.json().get( - 'ready_replica_urls') - except requests.RequestException as e: - print(f'An error occurred: {e}') + async with session.post( + self._controller_url + + '/controller/load_balancer_sync', + json={ + 'request_aggregator': + self._request_aggregator.to_dict() + }, + timeout=5, + ) as response: + # Clean up after reporting request info to avoid OOM. + self._request_aggregator.clear() + response.raise_for_status() + response_json = await response.json() + ready_replica_urls = response_json.get( + 'ready_replica_urls', []) + except aiohttp.ClientError as e: + logger.error('An error occurred when syncing with ' + f'the controller: {e}') else: logger.info(f'Available Replica URLs: {ready_replica_urls}') - self._load_balancing_policy.set_ready_replicas( - ready_replica_urls) - time.sleep(constants.LB_CONTROLLER_SYNC_INTERVAL_SECONDS) - - async def _redirect_handler(self, request: fastapi.Request): + with self._client_pool_lock: + self._load_balancing_policy.set_ready_replicas( + ready_replica_urls) + for replica_url in ready_replica_urls: + if replica_url not in self._client_pool: + self._client_pool[replica_url] = ( + httpx.AsyncClient(base_url=replica_url)) + urls_to_close = set( + self._client_pool.keys()) - set(ready_replica_urls) + client_to_close = [] + for replica_url in urls_to_close: + client_to_close.append( + self._client_pool.pop(replica_url)) + for client in client_to_close: + close_client_tasks.append(client.aclose()) + + await asyncio.sleep(constants.LB_CONTROLLER_SYNC_INTERVAL_SECONDS) + # Await those tasks after the interval to avoid blocking. + await asyncio.gather(*close_client_tasks) + + async def _proxy_request_to( + self, url: str, request: fastapi.Request + ) -> Union[fastapi.responses.Response, Exception]: + """Proxy the request to the specified URL. + + Returns: + The response from the endpoint replica. Return the exception + encountered if anything goes wrong. + """ + logger.info(f'Proxy request to {url}') + try: + # We defer the get of the client here on purpose, for case when the + # replica is ready in `_proxy_with_retries` but refreshed before + # entering this function. In that case we will return an error here + # and retry to find next ready replica. We also need to wait for the + # update of the client pool to finish before getting the client. + with self._client_pool_lock: + client = self._client_pool.get(url, None) + if client is None: + return RuntimeError(f'Client for {url} not found.') + worker_url = httpx.URL(path=request.url.path, + query=request.url.query.encode('utf-8')) + proxy_request = client.build_request( + request.method, + worker_url, + headers=request.headers.raw, + content=await request.body(), + timeout=constants.LB_STREAM_TIMEOUT) + proxy_response = await client.send(proxy_request, stream=True) + return fastapi.responses.StreamingResponse( + content=proxy_response.aiter_raw(), + status_code=proxy_response.status_code, + headers=proxy_response.headers, + background=background.BackgroundTask(proxy_response.aclose)) + except (httpx.RequestError, httpx.HTTPStatusError) as e: + logger.error(f'Error when proxy request to {url}: ' + f'{common_utils.format_exception(e)}') + return e + + async def _proxy_with_retries( + self, request: fastapi.Request) -> fastapi.responses.Response: + """Try to proxy the request to the endpoint replica with retries.""" self._request_aggregator.add(request) - ready_replica_url = self._load_balancing_policy.select_replica(request) - - if ready_replica_url is None: - raise fastapi.HTTPException(status_code=503, - detail='No ready replicas. ' - 'Use "sky serve status [SERVICE_NAME]" ' - 'to check the replica status.') - - path = f'{ready_replica_url}{request.url.path}' - logger.info(f'Redirecting request to {path}') - return fastapi.responses.RedirectResponse(url=path) + # TODO(tian): Finetune backoff parameters. + backoff = common_utils.Backoff(initial_backoff=1) + # SkyServe supports serving on Spot Instances. To avoid preemptions + # during request handling, we add a retry here. + retry_cnt = 0 + while True: + retry_cnt += 1 + with self._client_pool_lock: + ready_replica_url = self._load_balancing_policy.select_replica( + request) + if ready_replica_url is None: + response_or_exception = fastapi.HTTPException( + # 503 means that the server is currently + # unable to handle the incoming requests. + status_code=503, + detail='No ready replicas. ' + 'Use "sky serve status [SERVICE_NAME]" ' + 'to check the replica status.') + else: + response_or_exception = await self._proxy_request_to( + ready_replica_url, request) + if not isinstance(response_or_exception, Exception): + return response_or_exception + # When the user aborts the request during streaming, the request + # will be disconnected. We do not need to retry for this case. + if await request.is_disconnected(): + # 499 means a client terminates the connection + # before the server is able to respond. + return fastapi.responses.Response(status_code=499) + # TODO(tian): Fail fast for errors like 404 not found. + if retry_cnt == constants.LB_MAX_RETRY: + if isinstance(response_or_exception, fastapi.HTTPException): + raise response_or_exception + exception = common_utils.remove_color( + common_utils.format_exception(response_or_exception, + use_bracket=True)) + raise fastapi.HTTPException( + # 500 means internal server error. + status_code=500, + detail=f'Max retries {constants.LB_MAX_RETRY} exceeded. ' + f'Last error encountered: {exception}. Please use ' + '"sky serve logs [SERVICE_NAME] --load-balancer" ' + 'for more information.') + current_backoff = backoff.current_backoff() + logger.error(f'Retry in {current_backoff} seconds.') + await asyncio.sleep(current_backoff) def run(self): self._app.add_api_route('/{path:path}', - self._redirect_handler, + self._proxy_with_retries, methods=['GET', 'POST', 'PUT', 'DELETE']) @self._app.on_event('startup') - def configure_logger(): + async def startup(): + # Configure logger uvicorn_access_logger = logging.getLogger('uvicorn.access') for handler in uvicorn_access_logger.handlers: handler.setFormatter(sky_logging.FORMATTER) - threading.Thread(target=self._sync_with_controller, daemon=True).start() + # Register controller synchronization task + asyncio.create_task(self._sync_with_controller()) logger.info('SkyServe Load Balancer started on ' f'http://0.0.0.0:{self._load_balancer_port}') diff --git a/sky/serve/load_balancing_policies.py b/sky/serve/load_balancing_policies.py index c8c9aa07765..34c1fa4249b 100644 --- a/sky/serve/load_balancing_policies.py +++ b/sky/serve/load_balancing_policies.py @@ -11,6 +11,14 @@ logger = sky_logging.init_logger(__name__) +def _request_repr(request: 'fastapi.Request') -> str: + return ('') + + class LoadBalancingPolicy: """Abstract class for load balancing policies.""" @@ -20,39 +28,43 @@ def __init__(self) -> None: def set_ready_replicas(self, ready_replicas: List[str]) -> None: raise NotImplementedError + def select_replica(self, request: 'fastapi.Request') -> Optional[str]: + replica = self._select_replica(request) + if replica is not None: + logger.info(f'Selected replica {replica} ' + f'for request {_request_repr(request)}') + else: + logger.warning('No replica selected for request ' + f'{_request_repr(request)}') + return replica + # TODO(tian): We should have an abstract class for Request to # compatible with all frameworks. - def select_replica(self, request: 'fastapi.Request') -> Optional[str]: + def _select_replica(self, request: 'fastapi.Request') -> Optional[str]: raise NotImplementedError class RoundRobinPolicy(LoadBalancingPolicy): """Round-robin load balancing policy.""" - def __init__(self, *args, **kwargs) -> None: - super().__init__(*args, **kwargs) + def __init__(self) -> None: + super().__init__() self.index = 0 def set_ready_replicas(self, ready_replicas: List[str]) -> None: - if set(ready_replicas) != set(self.ready_replicas): - # If the autoscaler keeps scaling up and down the replicas, - # we need this shuffle to not let the first replica have the - # most of the load. - random.shuffle(ready_replicas) - self.ready_replicas = ready_replicas - self.index = 0 + if set(self.ready_replicas) == set(ready_replicas): + return + # If the autoscaler keeps scaling up and down the replicas, + # we need this shuffle to not let the first replica have the + # most of the load. + random.shuffle(ready_replicas) + self.ready_replicas = ready_replicas + self.index = 0 - def select_replica(self, request: 'fastapi.Request') -> Optional[str]: + def _select_replica(self, request: 'fastapi.Request') -> Optional[str]: + del request # Unused. if not self.ready_replicas: return None ready_replica_url = self.ready_replicas[self.index] self.index = (self.index + 1) % len(self.ready_replicas) - request_repr = ('') - logger.info(f'Selected replica {ready_replica_url} ' - f'for request {request_repr}') return ready_replica_url diff --git a/sky/templates/sky-serve-controller.yaml.j2 b/sky/templates/sky-serve-controller.yaml.j2 index 351a89ae7f6..8f79b653a2b 100644 --- a/sky/templates/sky-serve-controller.yaml.j2 +++ b/sky/templates/sky-serve-controller.yaml.j2 @@ -11,8 +11,10 @@ setup: | {%- endfor %} # Install serve dependencies. + # TODO(tian): Gather those into serve constants. pip list | grep uvicorn > /dev/null 2>&1 || pip install uvicorn > /dev/null 2>&1 pip list | grep fastapi > /dev/null 2>&1 || pip install fastapi > /dev/null 2>&1 + pip list | grep httpx > /dev/null 2>&1 || pip install httpx > /dev/null 2>&1 file_mounts: {{remote_task_yaml_path}}: {{local_task_yaml_path}} diff --git a/tests/skyserve/auto_restart.yaml b/tests/skyserve/auto_restart.yaml index f7dc2a13f07..2a3a31051b9 100644 --- a/tests/skyserve/auto_restart.yaml +++ b/tests/skyserve/auto_restart.yaml @@ -7,6 +7,7 @@ service: resources: ports: 8080 + cloud: gcp cpus: 2+ workdir: examples/serve/http_server diff --git a/tests/skyserve/llm/get_response.py b/tests/skyserve/llm/get_response.py index f0fa530effc..9dd6ea53804 100644 --- a/tests/skyserve/llm/get_response.py +++ b/tests/skyserve/llm/get_response.py @@ -27,4 +27,6 @@ 'temperature': 0, }) + if resp.status_code != 200: + raise RuntimeError(f'Failed to get response: {resp.text}') print(resp.json()['choices'][0]['message']['content']) diff --git a/tests/skyserve/streaming/example.txt b/tests/skyserve/streaming/example.txt new file mode 100644 index 00000000000..0e9cd7421d3 --- /dev/null +++ b/tests/skyserve/streaming/example.txt @@ -0,0 +1 @@ +Hello! How can I help you today? \ No newline at end of file diff --git a/tests/skyserve/streaming/send_streaming_request.py b/tests/skyserve/streaming/send_streaming_request.py new file mode 100644 index 00000000000..7c56d929761 --- /dev/null +++ b/tests/skyserve/streaming/send_streaming_request.py @@ -0,0 +1,24 @@ +import argparse + +import requests + +with open('tests/skyserve/streaming/example.txt', 'r') as f: + WORD_TO_STREAM = f.read() + +parser = argparse.ArgumentParser() +parser.add_argument('--endpoint', type=str, required=True) +args = parser.parse_args() +url = f'http://{args.endpoint}/' + +expected = WORD_TO_STREAM.split() +index = 0 +with requests.get(url, stream=True) as response: + response.raise_for_status() + for chunk in response.iter_content(chunk_size=8192): + if chunk: + current = chunk.decode().strip() + assert current == expected[index], (current, expected[index]) + index += 1 +assert index == len(expected) + +print('Streaming test passed') diff --git a/tests/skyserve/streaming/server.py b/tests/skyserve/streaming/server.py new file mode 100644 index 00000000000..d9528af2205 --- /dev/null +++ b/tests/skyserve/streaming/server.py @@ -0,0 +1,24 @@ +import asyncio + +import fastapi +import uvicorn + +with open('example.txt', 'r') as f: + WORD_TO_STREAM = f.read() + +app = fastapi.FastAPI() + + +@app.get('/') +async def stream(): + + async def generate_words(): + for word in WORD_TO_STREAM.split(): + yield word + "\n" + await asyncio.sleep(0.2) + + return fastapi.responses.StreamingResponse(generate_words(), + media_type="text/plain") + + +uvicorn.run(app, host='0.0.0.0', port=8080) diff --git a/tests/skyserve/streaming/streaming.yaml b/tests/skyserve/streaming/streaming.yaml new file mode 100644 index 00000000000..a352d120dde --- /dev/null +++ b/tests/skyserve/streaming/streaming.yaml @@ -0,0 +1,13 @@ +service: + readiness_probe: / + replicas: 1 + +resources: + cpus: 2+ + ports: 8080 + +workdir: tests/skyserve/streaming + +setup: pip install fastapi uvicorn + +run: python server.py diff --git a/tests/test_smoke.py b/tests/test_smoke.py index 2d542001eb7..284ac5aa471 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -3183,7 +3183,7 @@ def _get_skyserve_http_test(name: str, cloud: str, f'sky serve up -n {name} -y tests/skyserve/http/{cloud}.yaml', _SERVE_WAIT_UNTIL_READY.format(name=name, replica_num=2), f'{_SERVE_ENDPOINT_WAIT.format(name=name)}; ' - 'curl -L http://$endpoint | grep "Hi, SkyPilot here"', + 'curl http://$endpoint | grep "Hi, SkyPilot here"', ], _TEARDOWN_SERVICE.format(name=name), timeout=timeout_minutes * 60, @@ -3305,11 +3305,11 @@ def test_skyserve_spot_recovery(): f'sky serve up -n {name} -y tests/skyserve/spot/recovery.yaml', _SERVE_WAIT_UNTIL_READY.format(name=name, replica_num=1), f'{_SERVE_ENDPOINT_WAIT.format(name=name)}; ' - 'request_output=$(curl -L http://$endpoint); echo "$request_output"; echo "$request_output" | grep "Hi, SkyPilot here"', + 'request_output=$(curl http://$endpoint); echo "$request_output"; echo "$request_output" | grep "Hi, SkyPilot here"', _terminate_gcp_replica(name, zone, 1), _SERVE_WAIT_UNTIL_READY.format(name=name, replica_num=1), f'{_SERVE_ENDPOINT_WAIT.format(name=name)}; ' - 'request_output=$(curl -L http://$endpoint); echo "$request_output"; echo "$request_output" | grep "Hi, SkyPilot here"', + 'request_output=$(curl http://$endpoint); echo "$request_output"; echo "$request_output" | grep "Hi, SkyPilot here"', ], _TEARDOWN_SERVICE.format(name=name), timeout=20 * 60, @@ -3404,7 +3404,7 @@ def test_skyserve_user_bug_restart(generic_cloud: str): f'echo "$s" | grep -B 100 "NO_REPLICA" | grep "0/0"', f'sky serve update {name} --cloud {generic_cloud} -y tests/skyserve/auto_restart.yaml', f'{_SERVE_ENDPOINT_WAIT.format(name=name)}; ' - 'until curl -L http://$endpoint | grep "Hi, SkyPilot here!"; do sleep 2; done; sleep 2; ' + 'until curl http://$endpoint | grep "Hi, SkyPilot here!"; do sleep 2; done; sleep 2; ' + _check_replica_in_status(name, [(1, False, 'READY'), (1, False, 'FAILED')]), ], @@ -3452,7 +3452,7 @@ def test_skyserve_auto_restart(): f'sky serve up -n {name} -y tests/skyserve/auto_restart.yaml', _SERVE_WAIT_UNTIL_READY.format(name=name, replica_num=1), f'{_SERVE_ENDPOINT_WAIT.format(name=name)}; ' - 'request_output=$(curl -L http://$endpoint); echo "$request_output"; echo "$request_output" | grep "Hi, SkyPilot here"', + 'request_output=$(curl http://$endpoint); echo "$request_output"; echo "$request_output" | grep "Hi, SkyPilot here"', # sleep for 20 seconds (initial delay) to make sure it will # be restarted f'sleep 20', @@ -3472,7 +3472,7 @@ def test_skyserve_auto_restart(): ' sleep 10;' f'done); sleep {serve.LB_CONTROLLER_SYNC_INTERVAL_SECONDS};', f'{_SERVE_ENDPOINT_WAIT.format(name=name)}; ' - 'request_output=$(curl -L http://$endpoint); echo "$request_output"; echo "$request_output" | grep "Hi, SkyPilot here"', + 'request_output=$(curl http://$endpoint); echo "$request_output"; echo "$request_output" | grep "Hi, SkyPilot here"', ], _TEARDOWN_SERVICE.format(name=name), timeout=20 * 60, @@ -3505,6 +3505,25 @@ def test_skyserve_cancel(generic_cloud: str): run_one_test(test) +@pytest.mark.serve +def test_skyserve_streaming(generic_cloud: str): + """Test skyserve with streaming""" + name = _get_service_name() + test = Test( + f'test-skyserve-streaming', + [ + f'sky serve up -n {name} --cloud {generic_cloud} -y tests/skyserve/streaming/streaming.yaml', + _SERVE_WAIT_UNTIL_READY.format(name=name, replica_num=1), + f'{_SERVE_ENDPOINT_WAIT.format(name=name)}; ' + 'python3 tests/skyserve/streaming/send_streaming_request.py ' + '--endpoint $endpoint | grep "Streaming test passed"', + ], + _TEARDOWN_SERVICE.format(name=name), + timeout=20 * 60, + ) + run_one_test(test) + + @pytest.mark.serve def test_skyserve_update(generic_cloud: str): """Test skyserve with update""" @@ -3514,14 +3533,14 @@ def test_skyserve_update(generic_cloud: str): [ f'sky serve up -n {name} --cloud {generic_cloud} -y tests/skyserve/update/old.yaml', _SERVE_WAIT_UNTIL_READY.format(name=name, replica_num=2), - f'{_SERVE_ENDPOINT_WAIT.format(name=name)}; curl -L http://$endpoint | grep "Hi, SkyPilot here"', + f'{_SERVE_ENDPOINT_WAIT.format(name=name)}; curl http://$endpoint | grep "Hi, SkyPilot here"', f'sky serve update {name} --cloud {generic_cloud} --mode blue_green -y tests/skyserve/update/new.yaml', # sleep before update is registered. 'sleep 20', f'{_SERVE_ENDPOINT_WAIT.format(name=name)}; ' - 'until curl -L http://$endpoint | grep "Hi, new SkyPilot here!"; do sleep 2; done;' + 'until curl http://$endpoint | grep "Hi, new SkyPilot here!"; do sleep 2; done;' # Make sure the traffic is not mixed - 'curl -L http://$endpoint | grep "Hi, new SkyPilot here"', + 'curl http://$endpoint | grep "Hi, new SkyPilot here"', # The latest 2 version should be READY and the older versions should be shutting down (_check_replica_in_status(name, [(2, False, 'READY'), (2, False, 'SHUTTING_DOWN')]) + @@ -3545,14 +3564,14 @@ def test_skyserve_rolling_update(generic_cloud: str): [ f'sky serve up -n {name} --cloud {generic_cloud} -y tests/skyserve/update/old.yaml', _SERVE_WAIT_UNTIL_READY.format(name=name, replica_num=2), - f'{_SERVE_ENDPOINT_WAIT.format(name=name)}; curl -L http://$endpoint | grep "Hi, SkyPilot here"', + f'{_SERVE_ENDPOINT_WAIT.format(name=name)}; curl http://$endpoint | grep "Hi, SkyPilot here"', f'sky serve update {name} --cloud {generic_cloud} -y tests/skyserve/update/new.yaml', # Make sure the traffic is mixed across two versions, the replicas # with even id will sleep 60 seconds before being ready, so we # should be able to get observe the period that the traffic is mixed # across two versions. f'{_SERVE_ENDPOINT_WAIT.format(name=name)}; ' - 'until curl -L http://$endpoint | grep "Hi, new SkyPilot here!"; do sleep 2; done; sleep 2; ' + 'until curl http://$endpoint | grep "Hi, new SkyPilot here!"; do sleep 2; done; sleep 2; ' # The latest version should have one READY and the one of the older versions should be shutting down f'{single_new_replica} {_check_service_version(name, "1,2")} ' # Check the output from the old version, immediately after the @@ -3561,7 +3580,7 @@ def test_skyserve_rolling_update(generic_cloud: str): # TODO(zhwu): we should have a more generalized way for checking the # mixed version of replicas to avoid depending on the specific # round robin load balancing policy. - 'curl -L http://$endpoint | grep "Hi, SkyPilot here"', + 'curl http://$endpoint | grep "Hi, SkyPilot here"', ], _TEARDOWN_SERVICE.format(name=name), timeout=20 * 60, @@ -3579,7 +3598,7 @@ def test_skyserve_fast_update(generic_cloud: str): [ f'sky serve up -n {name} -y --cloud {generic_cloud} tests/skyserve/update/bump_version_before.yaml', _SERVE_WAIT_UNTIL_READY.format(name=name, replica_num=2), - f'{_SERVE_ENDPOINT_WAIT.format(name=name)}; curl -L http://$endpoint | grep "Hi, SkyPilot here"', + f'{_SERVE_ENDPOINT_WAIT.format(name=name)}; curl http://$endpoint | grep "Hi, SkyPilot here"', f'sky serve update {name} --cloud {generic_cloud} --mode blue_green -y tests/skyserve/update/bump_version_after.yaml', # sleep to wait for update to be registered. 'sleep 30', @@ -3592,7 +3611,7 @@ def test_skyserve_fast_update(generic_cloud: str): _check_service_version(name, "2")), _SERVE_WAIT_UNTIL_READY.format(name=name, replica_num=3) + _check_service_version(name, "2"), - f'{_SERVE_ENDPOINT_WAIT.format(name=name)}; curl -L http://$endpoint | grep "Hi, SkyPilot here"', + f'{_SERVE_ENDPOINT_WAIT.format(name=name)}; curl http://$endpoint | grep "Hi, SkyPilot here"', # Test rolling update f'sky serve update {name} --cloud {generic_cloud} -y tests/skyserve/update/bump_version_before.yaml', # sleep to wait for update to be registered. @@ -3602,7 +3621,7 @@ def test_skyserve_fast_update(generic_cloud: str): (1, False, 'SHUTTING_DOWN')]), _SERVE_WAIT_UNTIL_READY.format(name=name, replica_num=2) + _check_service_version(name, "3"), - f'{_SERVE_ENDPOINT_WAIT.format(name=name)}; curl -L http://$endpoint | grep "Hi, SkyPilot here"', + f'{_SERVE_ENDPOINT_WAIT.format(name=name)}; curl http://$endpoint | grep "Hi, SkyPilot here"', ], _TEARDOWN_SERVICE.format(name=name), timeout=30 * 60, @@ -3621,7 +3640,7 @@ def test_skyserve_update_autoscale(generic_cloud: str): _SERVE_WAIT_UNTIL_READY.format(name=name, replica_num=2) + _check_service_version(name, "1"), f'{_SERVE_ENDPOINT_WAIT.format(name=name)}; ' - 'curl -L http://$endpoint | grep "Hi, SkyPilot here"', + 'curl http://$endpoint | grep "Hi, SkyPilot here"', f'sky serve update {name} --cloud {generic_cloud} --mode blue_green -y tests/skyserve/update/num_min_one.yaml', # sleep before update is registered. 'sleep 20', @@ -3629,7 +3648,7 @@ def test_skyserve_update_autoscale(generic_cloud: str): _SERVE_WAIT_UNTIL_READY.format(name=name, replica_num=1) + _check_service_version(name, "2"), f'{_SERVE_ENDPOINT_WAIT.format(name=name)}; ' - 'curl -L http://$endpoint | grep "Hi, SkyPilot here!"', + 'curl http://$endpoint | grep "Hi, SkyPilot here!"', # Rolling Update f'sky serve update {name} --cloud {generic_cloud} -y tests/skyserve/update/num_min_two.yaml', # sleep before update is registered. @@ -3638,7 +3657,7 @@ def test_skyserve_update_autoscale(generic_cloud: str): _SERVE_WAIT_UNTIL_READY.format(name=name, replica_num=2) + _check_service_version(name, "3"), f'{_SERVE_ENDPOINT_WAIT.format(name=name)}; ' - 'curl -L http://$endpoint | grep "Hi, SkyPilot here!"', + 'curl http://$endpoint | grep "Hi, SkyPilot here!"', ], _TEARDOWN_SERVICE.format(name=name), timeout=30 * 60, @@ -3680,7 +3699,7 @@ def test_skyserve_new_autoscaler_update(mode: str, generic_cloud: str): _SERVE_WAIT_UNTIL_READY.format(name=name, replica_num=2) + _check_service_version(name, "1"), f'{_SERVE_ENDPOINT_WAIT.format(name=name)}; ' - 's=$(curl -L http://$endpoint); echo "$s"; echo "$s" | grep "Hi, SkyPilot here"', + 's=$(curl http://$endpoint); echo "$s"; echo "$s" | grep "Hi, SkyPilot here"', f'sky serve update {name} --cloud {generic_cloud} --mode {mode} -y tests/skyserve/update/new_autoscaler_after.yaml', # Wait for update to be registered f'sleep 120', @@ -3691,7 +3710,7 @@ def test_skyserve_new_autoscaler_update(mode: str, generic_cloud: str): *update_check, _SERVE_WAIT_UNTIL_READY.format(name=name, replica_num=5), f'{_SERVE_ENDPOINT_WAIT.format(name=name)}; ' - 'curl -L http://$endpoint | grep "Hi, SkyPilot here"', + 'curl http://$endpoint | grep "Hi, SkyPilot here"', _check_replica_in_status(name, [(4, True, 'READY'), (1, False, 'READY')]), ],