Skip to content

Commit

Permalink
grpc added
Browse files Browse the repository at this point in the history
  • Loading branch information
saeid93 committed Nov 28, 2022
1 parent 63f90b2 commit 06909b0
Show file tree
Hide file tree
Showing 329 changed files with 294,252 additions and 666,605 deletions.
15 changes: 10 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ Auto-Tuning Video Analytics Pipelines](https://web.stanford.edu/~faromero/llama.
1. [Clarifai Workflows](https://clarifai.com/clarifai/main/workflows)
2. [Facebook DLRM](https://github.com/facebookresearch/dlrm)

## Load Tester
This repo also includes a small async load tester for sending workloads to the models/pipeliens.
You can find it under [async load tester](./async_load_tester) folder.

## Sources of Models

### Audio and Text Models
Expand All @@ -72,8 +76,9 @@ Source:
Please give a star if this repo helped you learning somthing new :)

### TODOs
1. Examples of Grpc data transfer
2. Send data in compresss way
3. Add performance evaluation scripts and load tester
4. Complete Unfinished pipelines
5. Example of using [Triton Client](https://github.com/triton-inference-server/client/tree/main/src/python/examples) for interacting with MLSserver examples
1. Send data in compresss way
2. Add performance evaluation scripts and load tester
3. Complete Unfinished pipelines
4. Examples of using [Triton Client](https://github.com/triton-inference-server/client/tree/main/src/python/examples) for interacting with MLSserver examples
5. Examples of using [Triton Inference Server](https://github.com/triton-inference-server/server) as the serving backend
6. Pipelines implementation in upcoming Seldon core V2
5 changes: 0 additions & 5 deletions async_load_tester/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,3 @@ workload = [7, 12, 0, 31, ...] # each item of the list is the number of request
tester = MyLoadTester(workload=workload, endpoint="http://IP:PORT/PATH", http_method="post")
tester.start()
```

To install load tester move to this folder and do
```
pip install -e .
```
5 changes: 3 additions & 2 deletions async_load_tester/barazmoon/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from barazmoon.main import BarAzmoonProcess
from barazmoon.main import BarAzmoonAsync
from barazmoon.main import BarAzmoonAsyncRest
from barazmoon.mlserver import MLServerProcess
from barazmoon.mlserver import MLServerAsync
from barazmoon.mlserver import MLServerAsyncRest
from barazmoon.mlserver import MLServerAsyncGrpc
# from barazmoon.main import queue
100 changes: 96 additions & 4 deletions async_load_tester/barazmoon/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
from numpy.random import default_rng
import aiohttp
import asyncio
import grpc
from mlserver.codecs.string import StringRequestCodec
import mlserver.grpc.dataplane_pb2_grpc as dataplane
import mlserver.grpc.converters as converters
import json
# from multiprocessing import Queue


Expand Down Expand Up @@ -101,10 +106,10 @@ def process_response(self, data_id: str, response: dict):
# outputs = [queue.get() for _ in range(queue.qsize())]
# return outputs

# ============= Pure Async based load tester =============
# ============= Pure Async Rest based load tester =============


async def request_after(session, url, wait, payload):
async def request_after_rest(session, url, wait, payload):
if wait:
await asyncio.sleep(wait)
sending_time = time.time()
Expand All @@ -127,7 +132,7 @@ async def request_after(session, url, wait, payload):
return {'failed': 'timeout'}


class BarAzmoonAsync:
class BarAzmoonAsyncRest:
def __init__(self, endpoint, payload, benchmark_duration=1):
"""
endpoint:
Expand Down Expand Up @@ -164,7 +169,7 @@ async def submit_requests_after(self, after, req_count, duration):
print(f'Sending {req_count} requests sent in {time.ctime()} at timestep {after}')
for i in range(req_count):
tasks.append(asyncio.ensure_future(
request_after(
request_after_rest(
self.session,
self.endpoint,
wait=arrival[i],
Expand All @@ -182,3 +187,90 @@ async def submit_requests_after(self, after, req_count, duration):

async def close(self):
await self.session.close()


# ============= Pure Async Grpc based load tester =============

async def request_after_grpc(stub, metadata, wait, payload):
# TODO
if wait:
await asyncio.sleep(wait)
sending_time = time.time()
try:
resp = await stub.ModelInfer(
request=payload,
metadata=metadata)

inference_response = converters.ModelInferResponseConverter.to_types(resp)
raw_json = StringRequestCodec.decode_response(inference_response)
resp = json.loads(raw_json[0])

arrival_time = time.time()
timing = {
'timing':{
'sending_time': sending_time,
'arrival_time': arrival_time
}
}
resp.update(timing)
return resp
except asyncio.exceptions.TimeoutError:
return {'failed': 'timeout'}
except grpc.RpcError as e:
return {'failed': str(e)}

class BarAzmoonAsyncGrpc:
# TODO
def __init__(self, endpoint, metadata, payload, benchmark_duration=1):
"""
endpoint:
the path the load testing endpoint
payload:
data to the be sent
"""
self.endpoint = endpoint
self.payload = payload
self.metadata = metadata
self.responses = []
self.duration = benchmark_duration

async def benchmark(self, request_counts):
async with grpc.aio.insecure_channel(self.endpoint) as ch:
self.stub = dataplane.GRPCInferenceServiceStub(ch)
tasks = []
for i, req_count in enumerate(request_counts):
tasks.append(
asyncio.ensure_future(
self.submit_requests_after(
i * self.duration, req_count, self.duration)
))
await asyncio.gather(*tasks)

async def submit_requests_after(self, after, req_count, duration):
if after:
await asyncio.sleep(after)
tasks = []
beta = duration / req_count
start = time.time()

rng = default_rng()
arrival = rng.exponential(beta, req_count)

print(f'Sending {req_count} requests sent in {time.ctime()} at timestep {after}')
for i in range(req_count):
tasks.append(asyncio.ensure_future(
request_after_grpc(
self.stub,
self.metadata,
wait=arrival[i],
payload=self.payload
)
))
resps = await asyncio.gather(*tasks)

elapsed = time.time() - start
if elapsed < duration:
await asyncio.sleep(duration-elapsed)

self.responses.append(resps)
print(f'Recieving {len(resps)} requests sent in {time.ctime()} at timestep {after}')
81 changes: 76 additions & 5 deletions async_load_tester/barazmoon/mlserver.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from typing import List, Tuple, Any
import asyncio

import mlserver.grpc.converters as converters
import aiohttp.payload as aiohttp_payload
import mlserver.types as types

from .main import BarAzmoonProcess
from .main import BarAzmoonAsync
from .main import BarAzmoonAsyncRest
from .main import BarAzmoonAsyncGrpc


class MLServerProcess(BarAzmoonProcess):
Expand Down Expand Up @@ -83,7 +86,7 @@ def process_response(self, data_id: str, response: dict):
print(response)


class MLServerAsync:
class MLServerAsyncRest:
def __init__(
self, *, workload: List[int], endpoint: str,
data: Any, data_shape: List[int],
Expand All @@ -100,7 +103,7 @@ def __init__(
_, self.payload = self.get_request_data()

async def start(self):
c = BarAzmoonAsync(self.endpoint, self.payload)
c = BarAzmoonAsyncRest(self.endpoint, self.payload)
await c.benchmark(self._workload)
await c.close()
return c.responses
Expand Down Expand Up @@ -162,4 +165,72 @@ def get_request_data(self) -> Tuple[str, str]:
}
else:
raise ValueError(f"Unkown datatype {self.kwargs['data_type']}")
return None, aiohttp_payload.JsonPayload(payload)
return None, aiohttp_payload.JsonPayload(payload)


class MLServerAsyncGrpc:
# TODO
def __init__(
self, *, workload: List[int], endpoint: str,
data: Any, data_shape: List[int], model: str,
data_type: str, metadata: List[Tuple[str, str]],
**kwargs,):
self.endpoint = endpoint
self.metadata = metadata
self.model = model
self._workload = (rate for rate in workload)
self._counter = 0
self.data_type = data_type
self.data = data
self.data_shape = data_shape
self.kwargs = kwargs
_, self.payload = self.get_request_data()

async def start(self):
c = BarAzmoonAsyncGrpc(self.endpoint, self.metadata, self.payload)
await c.benchmark(self._workload)
return c.responses

def get_request_data(self) -> Tuple[str, str]:
if self.data_type == 'audio':
payload = types.InferenceRequest(
inputs=[
types.RequestInput(
name="echo_request",
shape=self.data_shape,
datatype="FP32",
data=self.data,
parameters=types.Parameters(content_type="np"),
)
]
)
elif self.data_type == 'text':
payload = types.InferenceRequest(
inputs=[
types.RequestInput(
name="text_inputs",
shape=[1],
datatype="BYTES",
data=[self.data.encode('utf8')],
parameters=types.Parameters(content_type="str"),
)
]
)
elif self.data_type == 'image':
payload = types.InferenceRequest(
inputs=[
types.RequestInput(
name="parameters-np",
shape=self.data_shape,
datatype="INT32",
data=self.data,
parameters=types.Parameters(content_type="np"),
)
]
)
else:
raise ValueError(f"Unkown datatype {self.kwargs['data_type']}")
payload = converters.ModelInferRequestConverter.from_types(
payload, model_name=self.model, model_version=None
)
return None, payload
67 changes: 67 additions & 0 deletions mlserver/1-paper-audio-qa/seldon-core-version/client-async-grpc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from urllib import response
import grpc
from pprint import PrettyPrinter
from mlserver.types import InferenceResponse
from mlserver.grpc.converters import ModelInferResponseConverter
import mlserver.grpc.dataplane_pb2_grpc as dataplane
import mlserver.grpc.converters as converters
from mlserver.codecs.string import StringRequestCodec
from mlserver.codecs.string import StringRequestCodec
pp = PrettyPrinter(indent=4)
from datasets import load_dataset
import mlserver.types as types
import json
import asyncio

# single node seldon+mlserver
endpoint = "localhost:32000"
deployment_name = 'audio-qa'
model = None
namespace = "default"
metadata = [("seldon", deployment_name), ("namespace", namespace)]

batch_test = 30
ds = load_dataset(
"hf-internal-testing/librispeech_asr_demo",
"clean",
split="validation")

input_data = ds[0]["audio"]["array"]

async def send_requests(ch):
grpc_stub = dataplane.GRPCInferenceServiceStub(ch)
inference_request = types.InferenceRequest(
inputs=[
types.RequestInput(
name="echo_request",
shape=[1, len(input_data)],
datatype="FP32",
data=input_data.tolist(),
parameters=types.Parameters(content_type="np"),
)
]
)
inference_request_g = converters.ModelInferRequestConverter.from_types(
inference_request, model_name=model, model_version=None
)
response = await grpc_stub.ModelInfer(
request=inference_request_g,
metadata=metadata)
return response


async def main():
async with grpc.aio.insecure_channel(endpoint) as ch:
responses = await asyncio.gather(*[send_requests(ch) for _ in range(10)])

inference_responses = list(map(
lambda response: ModelInferResponseConverter.to_types(response), responses))
raw_jsons = list(map(
lambda inference_response: StringRequestCodec.decode_response(
inference_response), inference_responses))
outputs = list(map(
lambda raw_json: json.loads(raw_json[0]), raw_jsons))

pp.pprint(outputs)

asyncio.run(main())
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
namespace = "default"
endpoint = f"http://{gateway_endpoint}/seldon/{namespace}/{deployment_name}/v2/models/infer"

batch_test = 100
batch_test = 10
responses = []

ds = load_dataset(
Expand Down
Loading

0 comments on commit 06909b0

Please sign in to comment.