Skip to content

Commit

Permalink
- Extract CollectedMetricToJson() to internal/metrics/collect.cc
Browse files Browse the repository at this point in the history
- Add Reset() to metric types

PiperOrigin-RevId: 518025229
Change-Id: I42245b7b49872cf53f9258492793d32de1a087a4
  • Loading branch information
ChromeHearts authored and copybara-github committed Mar 20, 2023
1 parent 80c85f2 commit c78ccb1
Show file tree
Hide file tree
Showing 12 changed files with 168 additions and 85 deletions.
2 changes: 0 additions & 2 deletions python/tensorstore/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -965,8 +965,6 @@ pybind11_cc_library(
"//tensorstore/internal:global_initializer",
"//tensorstore/internal/metrics:collect",
"//tensorstore/internal/metrics:registry",
"//tensorstore/util:str_cat",
"@com_github_nlohmann_json//:nlohmann_json",
"@com_github_pybind_pybind11//:pybind11",
],
alwayslink = True,
Expand Down
69 changes: 1 addition & 68 deletions python/tensorstore/metrics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,82 +20,16 @@
#include <string>
#include <vector>

#include <nlohmann/json.hpp>
#include "python/tensorstore/json_type_caster.h"
#include "python/tensorstore/tensorstore_module_components.h"
#include "tensorstore/internal/global_initializer.h"
#include "tensorstore/internal/metrics/collect.h"
#include "tensorstore/internal/metrics/registry.h"
#include "tensorstore/util/str_cat.h"

namespace tensorstore {
namespace internal_python {
namespace {

/// Converts a CollectedMetric to json.
::nlohmann::json CollectedMetricToJson(
const internal_metrics::CollectedMetric& metric) {
::nlohmann::json::object_t result;
result["name"] = metric.metric_name;

auto set_field_keys = [&](auto& v, ::nlohmann::json::object_t& h) {
assert(metric.field_names.size() == v.fields.size());
for (size_t i = 0; i < metric.field_names.size(); ++i) {
if (metric.field_names[i] == "value" ||
metric.field_names[i] == "count" ||
metric.field_names[i] == "max_value" ||
metric.field_names[i] == "sum") {
h[tensorstore::StrCat("_", metric.field_names[i])] = v.fields[i];
} else {
h[std::string(metric.field_names[i])] = v.fields[i];
}
}
};

std::vector<::nlohmann::json> values;
if (!metric.gauges.empty()) {
for (const auto& v : metric.gauges) {
::nlohmann::json::object_t tmp{};
set_field_keys(v, tmp);
std::visit([&](auto x) { tmp["value"] = x; }, v.value);
std::visit([&](auto x) { tmp["max_value"] = x; }, v.max_value);
values.push_back(std::move(tmp));
}
} else if (!metric.values.empty()) {
for (const auto& v : metric.values) {
::nlohmann::json::object_t tmp{};
set_field_keys(v, tmp);
std::visit([&](auto x) { tmp["value"] = x; }, v.value);
values.push_back(std::move(tmp));
}
} else if (!metric.counters.empty()) {
for (const auto& v : metric.counters) {
::nlohmann::json::object_t tmp{};
set_field_keys(v, tmp);
std::visit([&](auto x) { tmp["count"] = x; }, v.value);
values.push_back(std::move(tmp));
}
} else if (!metric.histograms.empty()) {
for (const auto& v : metric.histograms) {
::nlohmann::json::object_t tmp{};
set_field_keys(v, tmp);
tmp["count"] = v.count;
tmp["sum"] = v.sum;

size_t end = v.buckets.size();
while (end > 0 && v.buckets[end - 1] == 0) end--;

auto it = v.buckets.begin();
for (size_t i = 0; i < end; ++i) {
tmp[tensorstore::StrCat(i)] = *it++;
}
values.push_back(std::move(tmp));
}
}
result["values"] = std::move(values);
return result;
}

std::vector<::nlohmann::json> CollectMatchingMetrics(
std::string metric_prefix, bool include_zero_metrics) {
std::vector<::nlohmann::json> lines;
Expand All @@ -104,14 +38,13 @@ std::vector<::nlohmann::json> CollectMatchingMetrics(
internal_metrics::GetMetricRegistry().CollectWithPrefix(metric_prefix)) {
if (include_zero_metrics ||
internal_metrics::IsCollectedMetricNonZero(metric)) {
lines.push_back(CollectedMetricToJson(metric));
lines.push_back(internal_metrics::CollectedMetricToJson(metric));
}
}

std::sort(std::begin(lines), std::end(lines));
return lines;
}

} // namespace

void RegisterMetricBindings(pybind11::module_ m, Executor defer) {
Expand Down
2 changes: 2 additions & 0 deletions tensorstore/internal/metrics/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ tensorstore_cc_library(
hdrs = ["collect.h"],
deps = [
":metadata",
"//tensorstore/util:str_cat",
"@com_github_nlohmann_json//:nlohmann_json",
"@com_google_absl//absl/functional:function_ref",
"@com_google_absl//absl/strings",
],
Expand Down
63 changes: 63 additions & 0 deletions tensorstore/internal/metrics/collect.cc
Original file line number Diff line number Diff line change
Expand Up @@ -132,5 +132,68 @@ void FormatCollectedMetric(
}
}

/// Converts a CollectedMetric to json.
::nlohmann::json CollectedMetricToJson(const CollectedMetric& metric) {
::nlohmann::json::object_t result;
result["name"] = metric.metric_name;

auto set_field_keys = [&](auto& v, ::nlohmann::json::object_t& h) {
assert(metric.field_names.size() == v.fields.size());
for (size_t i = 0; i < metric.field_names.size(); ++i) {
if (metric.field_names[i] == "value" ||
metric.field_names[i] == "count" ||
metric.field_names[i] == "max_value" ||
metric.field_names[i] == "sum") {
h[absl::StrCat("_", metric.field_names[i])] = v.fields[i];
} else {
h[std::string(metric.field_names[i])] = v.fields[i];
}
}
};

std::vector<::nlohmann::json> values;
if (!metric.gauges.empty()) {
for (const auto& v : metric.gauges) {
::nlohmann::json::object_t tmp{};
set_field_keys(v, tmp);
std::visit([&](auto x) { tmp["value"] = x; }, v.value);
std::visit([&](auto x) { tmp["max_value"] = x; }, v.max_value);
values.push_back(std::move(tmp));
}
} else if (!metric.values.empty()) {
for (const auto& v : metric.values) {
::nlohmann::json::object_t tmp{};
set_field_keys(v, tmp);
std::visit([&](auto x) { tmp["value"] = x; }, v.value);
values.push_back(std::move(tmp));
}
} else if (!metric.counters.empty()) {
for (const auto& v : metric.counters) {
::nlohmann::json::object_t tmp{};
set_field_keys(v, tmp);
std::visit([&](auto x) { tmp["count"] = x; }, v.value);
values.push_back(std::move(tmp));
}
} else if (!metric.histograms.empty()) {
for (const auto& v : metric.histograms) {
::nlohmann::json::object_t tmp{};
set_field_keys(v, tmp);
tmp["count"] = v.count;
tmp["sum"] = v.sum;

size_t end = v.buckets.size();
while (end > 0 && v.buckets[end - 1] == 0) end--;

auto it = v.buckets.begin();
for (size_t i = 0; i < end; ++i) {
tmp[absl::StrCat(i)] = *it++;
}
values.push_back(std::move(tmp));
}
}
result["values"] = std::move(values);
return result;
}

} // namespace internal_metrics
} // namespace tensorstore
4 changes: 4 additions & 0 deletions tensorstore/internal/metrics/collect.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <vector>

#include "absl/functional/function_ref.h"
#include <nlohmann/json.hpp>
#include "tensorstore/internal/metrics/metadata.h"

namespace tensorstore {
Expand Down Expand Up @@ -70,6 +71,9 @@ void FormatCollectedMetric(
absl::FunctionRef<void(bool has_value, std::string formatted_line)>
handle_line);

/// Converts a CollectedMetric to json.
::nlohmann::json CollectedMetricToJson(const CollectedMetric& metric);

} // namespace internal_metrics
} // namespace tensorstore

Expand Down
6 changes: 6 additions & 0 deletions tensorstore/internal/metrics/counter.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ class ABSL_CACHELINE_ALIGNED Counter {
return *impl_.GetCell(labels...);
}

void Reset() { impl_.Reset(); }

private:
Counter(std::string metric_name, MetricMetadata metadata,
typename Impl::field_names_type field_names)
Expand Down Expand Up @@ -172,6 +174,8 @@ class ABSL_CACHELINE_ALIGNED CounterCell<double> : public CounterTag {

double Get() const { return value_; }

void Reset() { value_ = 0.0; }

private:
std::atomic<double> value_{0.0};
};
Expand All @@ -192,6 +196,8 @@ class ABSL_CACHELINE_ALIGNED CounterCell<int64_t> : public CounterTag {

int64_t Get() const { return value_; }

void Reset() { value_ = 0; }

private:
std::atomic<int64_t> value_{0};
};
Expand Down
16 changes: 16 additions & 0 deletions tensorstore/internal/metrics/gauge.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ class ABSL_CACHELINE_ALIGNED Gauge {
return *impl_.GetCell(labels...);
}

void Reset() { impl_.Reset(); }

private:
Gauge(std::string metric_name, MetricMetadata metadata,
typename Impl::field_names_type field_names)
Expand Down Expand Up @@ -202,6 +204,13 @@ class ABSL_CACHELINE_ALIGNED GaugeCell<double> : public GaugeTag {
double Get() const { return value_; }
double GetMax() const { return max_; }

void Reset() {
// not thread safe
value_ = 0.0;
max_ = 0.0;
SetMax(value_.load());
}

private:
inline void SetMax(double value) {
double h = max_.load(std::memory_order_relaxed);
Expand Down Expand Up @@ -237,6 +246,13 @@ class ABSL_CACHELINE_ALIGNED GaugeCell<int64_t> : public GaugeTag {
int64_t Get() const { return value_; }
int64_t GetMax() const { return max_; }

void Reset() {
// not thread safe
value_ = 0;
max_ = 0;
SetMax(value_.load());
}

private:
inline void SetMax(int64_t value) {
int64_t h = max_.load(std::memory_order_relaxed);
Expand Down
11 changes: 11 additions & 0 deletions tensorstore/internal/metrics/histogram.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ class ABSL_CACHELINE_ALIGNED Histogram {
return *impl_.GetCell(labels...);
}

void Reset() { impl_.Reset(); }

private:
Histogram(std::string metric_name, MetricMetadata metadata,
typename Impl::field_names_type field_names)
Expand Down Expand Up @@ -212,6 +214,15 @@ class ABSL_CACHELINE_ALIGNED HistogramCell : public Bucketer {
return buckets_[idx];
}

void Reset() {
// There is potential inconsistency between count/sum/bucket
sum_ = 0.0;
count_ = 0;
for (auto& b : buckets_) {
b = 0;
}
}

private:
std::atomic<double> sum_{0.0};
std::atomic<int64_t> count_{0};
Expand Down
15 changes: 15 additions & 0 deletions tensorstore/internal/metrics/metric_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,13 @@ class AbstractMetric : public AbstractMetricBase<sizeof...(Fields)> {
}
}

void Reset() {
absl::MutexLock l(&mu_);
for (auto& [k, v] : impl_) {
v.Reset();
}
}

private:
// NOTE: It would be nice to have a lock-free hashtable here.
mutable absl::Mutex mu_;
Expand Down Expand Up @@ -256,6 +263,8 @@ class AbstractMetric<Cell> : public AbstractMetricBase<0> {
on_cell(impl_, g);
}

void Reset() { impl_.Reset(); }

private:
Cell impl_;
};
Expand Down Expand Up @@ -290,6 +299,12 @@ class AbstractMetric<Cell, bool> : public AbstractMetricBase<1> {
on_cell(false_impl_, field_values_type{false});
}

void Reset() {
// not thread safe
true_impl_.Reset();
false_impl_.Reset();
}

private:
Cell true_impl_;
Cell false_impl_;
Expand Down
11 changes: 9 additions & 2 deletions tensorstore/internal/metrics/registry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ std::vector<CollectedMetric> MetricRegistry::CollectWithPrefix(
absl::MutexLock l(&mu_);
for (auto& kv : entries_) {
if (prefix.empty() || absl::StartsWith(kv.first, prefix)) {
auto opt_metric = kv.second.poly();
auto opt_metric = kv.second.poly(CollectMetricTag{});
if (opt_metric.has_value()) {
all.emplace_back(std::move(*opt_metric));
assert(all.back().metric_name == kv.first);
Expand All @@ -60,7 +60,7 @@ std::optional<CollectedMetric> MetricRegistry::Collect(std::string_view name) {
absl::MutexLock l(&mu_);
auto it = entries_.find(name);
if (it == entries_.end()) return std::nullopt;
auto opt_metric = it->second.poly();
auto opt_metric = it->second.poly(CollectMetricTag{});
assert(!opt_metric.has_value() || opt_metric->metric_name == it->first);
return opt_metric;
}
Expand All @@ -70,5 +70,12 @@ MetricRegistry& GetMetricRegistry() {
return *registry;
}

void MetricRegistry::Reset() {
absl::MutexLock l(&mu_);
for (auto& [k, v] : entries_) {
v.poly(ResetMetricTag{});
}
}

} // namespace internal_metrics
} // namespace tensorstore
Loading

0 comments on commit c78ccb1

Please sign in to comment.