diff --git a/comps/cores/mega/base_statistics.py b/comps/cores/mega/base_statistics.py index b0285e73f3..62b8292c66 100644 --- a/comps/cores/mega/base_statistics.py +++ b/comps/cores/mega/base_statistics.py @@ -21,47 +21,23 @@ def append_latency(self, latency, first_token_latency=None): if first_token_latency: self.first_token_latencies.append(first_token_latency) - def calculate_statistics(self): - if not self.response_times: - return { - "p50_latency": None, - "p99_latency": None, - "average_latency": None, - } - # Calculate the P50 (median) - p50 = np.percentile(self.response_times, 50) - - # Calculate the P99 - p99 = np.percentile(self.response_times, 99) - - avg = np.average(self.response_times) - - return { - "p50_latency": p50, - "p99_latency": p99, - "average_latency": avg, - } - - def calculate_first_token_statistics(self): - if not self.first_token_latencies: - return { - "p50_latency_first_token": None, - "p99_latency_first_token": None, - "average_latency_first_token": None, - } - # Calculate the P50 (median) - p50 = np.percentile(self.first_token_latencies, 50) - - # Calculate the P99 - p99 = np.percentile(self.first_token_latencies, 99) - - avg = np.average(self.first_token_latencies) - - return { - "p50_latency_first_token": p50, - "p99_latency_first_token": p99, - "average_latency_first_token": avg, - } + def _add_statistics(self, result, stats, suffix): + "add P50 (median), P99 and average values for 'stats' array to 'result' dict" + if stats: + result[f"p50_{suffix}"] = np.percentile(stats, 50) + result[f"p99_{suffix}"] = np.percentile(stats, 99) + result[f"average_{suffix}"] = np.average(stats) + else: + result[f"p50_{suffix}"] = None + result[f"p99_{suffix}"] = None + result[f"average_{suffix}"] = None + + def get_statistics(self): + "return stats dict with P50, P99 and average values for first token and response timings" + result = {} + self._add_statistics(result, self.response_times, "latency") + self._add_statistics(result, self.first_token_latencies, "latency_first_token") + return result def register_statistics( @@ -79,7 +55,5 @@ def collect_all_statistics(): results = {} if statistics_dict: for name, statistic in statistics_dict.items(): - tmp_dict = statistic.calculate_statistics() - tmp_dict.update(statistic.calculate_first_token_statistics()) - results.update({name: tmp_dict}) + results[name] = statistic.get_statistics() return results diff --git a/comps/cores/telemetry/README.md b/comps/cores/telemetry/README.md index 9a99e7f000..b644c94373 100644 --- a/comps/cores/telemetry/README.md +++ b/comps/cores/telemetry/README.md @@ -4,6 +4,19 @@ OPEA Comps currently provides telemetry functionalities for metrics and tracing ![opea telemetry](../assets/img/opea_telemetry.jpg) +Contents: + +- [Metrics](#metrics) + - [HTTP metrics](#http-metrics) + - [Megaservice E2E metrics](#megaservice-e2e-metrics) + - [Inferencing metrics](#inferencing-metrics) + - [Metrics collection](#metrics-collection) +- [Statistics](#statistics) +- [Tracing](#tracing) +- [Visualization](#visualization) +- [Visualize metrics](#visualize-metrics) +- [Visualize tracing](#visualize-tracing) + ## Metrics OPEA microservice metrics are exported in Prometheus format under `/metrics` endpoint. @@ -20,7 +33,7 @@ They can be fetched e.g. with `curl`: curl localhost:{port of your service}/metrics ``` -### HTTP Metrics +### HTTP metrics Metrics output looks following: @@ -54,7 +67,7 @@ Latency ones are histogram metrics i.e. include count, total value and set of va They are available only for _stream_ requests using LLM. Pending count accounts for all requests. -### Inferencing Metrics +### Inferencing metrics For example, you can `curl localhost:6006/metrics` to retrieve the TEI embedding metrics, and the output should look like follows: @@ -95,6 +108,11 @@ Below are some default metrics endpoints for specific microservices: | TEI embedding | 6006 | /metrics | [link](https://huggingface.github.io/text-embeddings-inference/#/Text%20Embeddings%20Inference/metrics) | | TEI reranking | 8808 | /metrics | [link](https://huggingface.github.io/text-embeddings-inference/#/Text%20Embeddings%20Inference/metrics) | +## Statistics + +Additionally, GenAIComps microservices provide separate `/v1/statistics` endpoint, which outputs P50, P99 and average metrics +for response times, and first token latencies, if microservice processes them. + ## Tracing OPEA use OpenTelemetry to trace function call stacks. To trace a function, add the `@opea_telemetry` decorator to either an async or sync function. The call stacks and time span data will be exported by OpenTelemetry. You can use Jaeger UI to visualize this tracing data. diff --git a/comps/finetuning/src/integrations/native.py b/comps/finetuning/src/integrations/native.py index 0227fbf7b0..ad9d9b5707 100644 --- a/comps/finetuning/src/integrations/native.py +++ b/comps/finetuning/src/integrations/native.py @@ -201,7 +201,7 @@ def list_finetuning_checkpoints(self, request: FineTuningJobIDRequest): for file in files: # Loop over directory contents file_path = os.path.join(output_dir, file) if os.path.isdir(file_path) and file.startswith("checkpoint"): - steps = re.findall("\d+", file)[0] + steps = re.findall(r"\d+", file)[0] checkpointsResponse = FineTuningJobCheckpoint( id=f"ftckpt-{uuid.uuid4()}", # Generate a unique ID created_at=int(time.time()), # Use the current timestamp diff --git a/tests/cores/mega/test_base_statistics.py b/tests/cores/mega/test_base_statistics.py index 878b3016c5..4eaae92f1e 100644 --- a/tests/cores/mega/test_base_statistics.py +++ b/tests/cores/mega/test_base_statistics.py @@ -17,10 +17,15 @@ register_statistics, statistics_dict, ) +from comps.cores.mega.base_statistics import collect_all_statistics +SVC1 = "opea_service@s1_add" +SVC2 = "open_service@test" + +@register_statistics(names=[SVC1]) +@register_statistics(names=[SVC2]) @register_microservice(name="s1", host="0.0.0.0", port=8083, endpoint="/v1/add") -@register_statistics(names=["opea_service@s1_add"]) async def s1_add(request: TextDoc) -> TextDoc: start = time.time() time.sleep(5) @@ -28,7 +33,7 @@ async def s1_add(request: TextDoc) -> TextDoc: req_dict = json.loads(req) text = req_dict["text"] text += "opea" - statistics_dict["opea_service@s1_add"].append_latency(time.time() - start, None) + statistics_dict[SVC1].append_latency(time.time() - start, None) return {"text": text} @@ -49,14 +54,45 @@ async def test_base_statistics(self): for _ in range(2): task1 = asyncio.create_task(self.service_builder.schedule(initial_inputs={"text": "hello, "})) await asyncio.gather(task1) - result_dict1, _ = task1.result() + _result_dict1, _ = task1.result() response = requests.get("http://localhost:8083/v1/statistics") res = response.json() - p50 = res["opea_service@s1_add"]["p50_latency"] - p99 = res["opea_service@s1_add"]["p99_latency"] + p50 = res[SVC1]["p50_latency"] + p99 = res[SVC1]["p99_latency"] self.assertEqual(int(p50), int(p99)) +class TestBaseStatisticsLocal(unittest.TestCase): + def test_stats(self): + stats = statistics_dict[SVC2] + + stats.append_latency(3) + res = collect_all_statistics() + avg = res[SVC2]["average_latency"] + self.assertIsNotNone(avg) + p99 = res[SVC2]["p99_latency"] + self.assertEqual(int(p99), int(avg)) + p50 = res[SVC2]["p50_latency"] + self.assertEqual(int(p99), int(p50)) + self.assertIsNone(res[SVC2]["p50_latency_first_token"]) + + stats.append_latency(2, 1) + res = collect_all_statistics() + avg = res[SVC2]["average_latency_first_token"] + self.assertIsNotNone(avg) + p99 = res[SVC2]["p99_latency_first_token"] + self.assertEqual(int(p99), int(avg)) + p50 = res[SVC2]["p50_latency_first_token"] + self.assertEqual(int(p99), int(p50)) + + stats.append_latency(1) + res = collect_all_statistics() + p50 = res[SVC2]["p50_latency"] + avg = res[SVC2]["average_latency"] + self.assertEqual(int(avg), int(p50)) + self.assertEqual(int(p50), 2) + + if __name__ == "__main__": unittest.main()