Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generalize BaseStatistics code a bit + document it #1107

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 18 additions & 44 deletions comps/cores/mega/base_statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,47 +21,23 @@ def append_latency(self, latency, first_token_latency=None):
if first_token_latency:
self.first_token_latencies.append(first_token_latency)

def calculate_statistics(self):
if not self.response_times:
return {
"p50_latency": None,
"p99_latency": None,
"average_latency": None,
}
# Calculate the P50 (median)
p50 = np.percentile(self.response_times, 50)

# Calculate the P99
p99 = np.percentile(self.response_times, 99)

avg = np.average(self.response_times)

return {
"p50_latency": p50,
"p99_latency": p99,
"average_latency": avg,
}

def calculate_first_token_statistics(self):
if not self.first_token_latencies:
return {
"p50_latency_first_token": None,
"p99_latency_first_token": None,
"average_latency_first_token": None,
}
# Calculate the P50 (median)
p50 = np.percentile(self.first_token_latencies, 50)

# Calculate the P99
p99 = np.percentile(self.first_token_latencies, 99)

avg = np.average(self.first_token_latencies)

return {
"p50_latency_first_token": p50,
"p99_latency_first_token": p99,
"average_latency_first_token": avg,
}
def _add_statistics(self, result, stats, suffix):
"add P50 (median), P99 and average values for 'stats' array to 'result' dict"
if stats:
result[f"p50_{suffix}"] = np.percentile(stats, 50)
result[f"p99_{suffix}"] = np.percentile(stats, 99)
result[f"average_{suffix}"] = np.average(stats)
else:
result[f"p50_{suffix}"] = None
result[f"p99_{suffix}"] = None
result[f"average_{suffix}"] = None

def get_statistics(self):
"return stats dict with P50, P99 and average values for first token and response timings"
result = {}
self._add_statistics(result, self.response_times, "latency")
self._add_statistics(result, self.first_token_latencies, "latency_first_token")
return result


def register_statistics(
Expand All @@ -79,7 +55,5 @@ def collect_all_statistics():
results = {}
if statistics_dict:
for name, statistic in statistics_dict.items():
tmp_dict = statistic.calculate_statistics()
tmp_dict.update(statistic.calculate_first_token_statistics())
results.update({name: tmp_dict})
results[name] = statistic.get_statistics()
return results
22 changes: 20 additions & 2 deletions comps/cores/telemetry/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,19 @@ OPEA Comps currently provides telemetry functionalities for metrics and tracing

![opea telemetry](../assets/img/opea_telemetry.jpg)

Contents:

- [Metrics](#metrics)
- [HTTP metrics](#http-metrics)
- [Megaservice E2E metrics](#megaservice-e2e-metrics)
- [Inferencing metrics](#inferencing-metrics)
- [Metrics collection](#metrics-collection)
- [Statistics](#statistics)
- [Tracing](#tracing)
- [Visualization](#visualization)
- [Visualize metrics](#visualize-metrics)
- [Visualize tracing](#visualize-tracing)

## Metrics

OPEA microservice metrics are exported in Prometheus format under `/metrics` endpoint.
Expand All @@ -20,7 +33,7 @@ They can be fetched e.g. with `curl`:
curl localhost:{port of your service}/metrics
```

### HTTP Metrics
### HTTP metrics

Metrics output looks following:

Expand Down Expand Up @@ -54,7 +67,7 @@ Latency ones are histogram metrics i.e. include count, total value and set of va

They are available only for _stream_ requests using LLM. Pending count accounts for all requests.

### Inferencing Metrics
### Inferencing metrics

For example, you can `curl localhost:6006/metrics` to retrieve the TEI embedding metrics, and the output should look like follows:

Expand Down Expand Up @@ -95,6 +108,11 @@ Below are some default metrics endpoints for specific microservices:
| TEI embedding | 6006 | /metrics | [link](https://huggingface.github.io/text-embeddings-inference/#/Text%20Embeddings%20Inference/metrics) |
| TEI reranking | 8808 | /metrics | [link](https://huggingface.github.io/text-embeddings-inference/#/Text%20Embeddings%20Inference/metrics) |

## Statistics

Additionally, GenAIComps microservices provide separate `/v1/statistics` endpoint, which outputs P50, P99 and average metrics
for response times, and first token latencies, if microservice processes them.

## Tracing

OPEA use OpenTelemetry to trace function call stacks. To trace a function, add the `@opea_telemetry` decorator to either an async or sync function. The call stacks and time span data will be exported by OpenTelemetry. You can use Jaeger UI to visualize this tracing data.
Expand Down
2 changes: 1 addition & 1 deletion comps/finetuning/src/integrations/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ def list_finetuning_checkpoints(self, request: FineTuningJobIDRequest):
for file in files: # Loop over directory contents
file_path = os.path.join(output_dir, file)
if os.path.isdir(file_path) and file.startswith("checkpoint"):
steps = re.findall("\d+", file)[0]
steps = re.findall(r"\d+", file)[0]
checkpointsResponse = FineTuningJobCheckpoint(
id=f"ftckpt-{uuid.uuid4()}", # Generate a unique ID
created_at=int(time.time()), # Use the current timestamp
Expand Down
46 changes: 41 additions & 5 deletions tests/cores/mega/test_base_statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,23 @@
register_statistics,
statistics_dict,
)
from comps.cores.mega.base_statistics import collect_all_statistics

SVC1 = "opea_service@s1_add"
SVC2 = "open_service@test"


@register_statistics(names=[SVC1])
@register_statistics(names=[SVC2])
@register_microservice(name="s1", host="0.0.0.0", port=8083, endpoint="/v1/add")
@register_statistics(names=["opea_service@s1_add"])
async def s1_add(request: TextDoc) -> TextDoc:
start = time.time()
time.sleep(5)
req = request.model_dump_json()
req_dict = json.loads(req)
text = req_dict["text"]
text += "opea"
statistics_dict["opea_service@s1_add"].append_latency(time.time() - start, None)
statistics_dict[SVC1].append_latency(time.time() - start, None)
return {"text": text}


Expand All @@ -49,14 +54,45 @@ async def test_base_statistics(self):
for _ in range(2):
task1 = asyncio.create_task(self.service_builder.schedule(initial_inputs={"text": "hello, "}))
await asyncio.gather(task1)
result_dict1, _ = task1.result()
_result_dict1, _ = task1.result()

response = requests.get("http://localhost:8083/v1/statistics")
res = response.json()
p50 = res["opea_service@s1_add"]["p50_latency"]
p99 = res["opea_service@s1_add"]["p99_latency"]
p50 = res[SVC1]["p50_latency"]
p99 = res[SVC1]["p99_latency"]
self.assertEqual(int(p50), int(p99))


class TestBaseStatisticsLocal(unittest.TestCase):
def test_stats(self):
stats = statistics_dict[SVC2]

stats.append_latency(3)
res = collect_all_statistics()
avg = res[SVC2]["average_latency"]
self.assertIsNotNone(avg)
p99 = res[SVC2]["p99_latency"]
self.assertEqual(int(p99), int(avg))
p50 = res[SVC2]["p50_latency"]
self.assertEqual(int(p99), int(p50))
self.assertIsNone(res[SVC2]["p50_latency_first_token"])

stats.append_latency(2, 1)
res = collect_all_statistics()
avg = res[SVC2]["average_latency_first_token"]
self.assertIsNotNone(avg)
p99 = res[SVC2]["p99_latency_first_token"]
self.assertEqual(int(p99), int(avg))
p50 = res[SVC2]["p50_latency_first_token"]
self.assertEqual(int(p99), int(p50))

stats.append_latency(1)
res = collect_all_statistics()
p50 = res[SVC2]["p50_latency"]
avg = res[SVC2]["average_latency"]
self.assertEqual(int(avg), int(p50))
self.assertEqual(int(p50), 2)


if __name__ == "__main__":
unittest.main()
Loading