Skip to content

Commit

Permalink
chore(sync-service): Separate stack telemetry from application teleme…
Browse files Browse the repository at this point in the history
…try (#2337)

Our cloud version of electric has multiple stacks, one per
tenant/database. Since we want metrics from each tenant, this PR
separates the stack/tenant specific telemetry from the
application/system telemetry. For cloud electric we will then have 1
application metric message and multiple stack metric messages being sent
each time we poll.
  • Loading branch information
robacourt authored Feb 17, 2025
1 parent bf25e67 commit 7cb4ccb
Show file tree
Hide file tree
Showing 11 changed files with 347 additions and 138 deletions.
5 changes: 5 additions & 0 deletions .changeset/grumpy-cheetahs-hear.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Electric as a library: Support multiple stacks
25 changes: 25 additions & 0 deletions packages/sync-service/array.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
DROP TABLE temp_values;

CREATE TEMP TABLE temp_values ( value text[]);

DO
$do$
BEGIN
FOR i IN 1..40000 LOOP
INSERT INTO temp_values (value)
SELECT ARRAY(
SELECT floor(random() * 100)::text
FROM generate_series(1, 5)
);
END LOOP;
END
$do$;

EXPLAIN ANALYZE
SELECT *
FROM temp_values
WHERE ARRAY(
SELECT floor(random() * 100)::text
FROM generate_series(1, 100)
) @> value;

6 changes: 5 additions & 1 deletion packages/sync-service/config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,11 @@ if otlp_endpoint do
otlp_endpoint: otlp_endpoint,
otlp_headers: Map.new(headers),
otlp_compression: :gzip,
resource: %{service: %{name: service_name, version: version}, instance: %{id: instance_id}}
resource: %{
name: "metrics",
service: %{name: service_name, version: version},
instance: %{id: instance_id}
}
end

otel_batch_processor =
Expand Down
2 changes: 1 addition & 1 deletion packages/sync-service/lib/electric/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ defmodule Electric.Application do
chunk_bytes_threshold: Electric.Config.get_env(:chunk_bytes_threshold),
name: Electric.StackSupervisor
},
{Electric.Telemetry, stack_id: stack_id, storage: storage},
{Electric.Telemetry.ApplicationTelemetry, []},
{Bandit,
plug: {Electric.Plug.Router, router_opts},
port: Electric.Config.get_env(:service_port),
Expand Down
3 changes: 2 additions & 1 deletion packages/sync-service/lib/electric/stack_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,8 @@ defmodule Electric.StackSupervisor do
{Registry,
name: shape_changes_registry_name, keys: :duplicate, partitions: registry_partitions},
{Electric.Postgres.Inspector.EtsInspector, stack_id: stack_id, pool: db_pool},
{Electric.Connection.Supervisor, new_connection_manager_opts}
{Electric.Connection.Supervisor, new_connection_manager_opts},
{Electric.Telemetry.StackTelemetry, stack_id: stack_id, storage: config.storage}
]

# Store the telemetry span attributes in the persistent term for this stack
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,28 @@
defmodule Electric.Telemetry do
defmodule Electric.Telemetry.ApplicationTelemetry do
@moduledoc """
Collects and exports application level telemetry such as CPU, memory and BEAM metrics.
See also StackTelemetry for stack specific telemetry.
"""
use Supervisor

import Telemetry.Metrics

require Logger

def start_link(init_arg) do
def start_link(opts) do
if Electric.Config.telemetry_export_enabled?() do
Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
else
# Avoid starting the telemetry supervisor and its telemetry_poller child if we're not
# intending to export periodic measurements metrics anywhere.
:ignore
end
end

def init(opts) do
def init(_opts) do
Process.set_label(:application_telemetry_supervisor)

system_metrics_poll_interval = Electric.Config.get_env(:system_metrics_poll_interval)
statsd_host = Electric.Config.get_env(:telemetry_statsd_host)
prometheus? = not is_nil(Electric.Config.get_env(:prometheus_port))
Expand All @@ -27,7 +34,7 @@ defmodule Electric.Telemetry do
# and add its default measurements to the list of our custom ones. This allows for all
# periodic measurements to be defined in one place.
{:telemetry_poller,
measurements: periodic_measurements(opts),
measurements: periodic_measurements(),
period: system_metrics_poll_interval,
init_delay: :timer.seconds(5)},
statsd_reporter_child_spec(statsd_host),
Expand All @@ -52,8 +59,7 @@ defmodule Electric.Telemetry do
static_info: static_info(),
metrics: call_home_metrics(),
first_report_in: {2, :minute},
reporting_period: {30, :minute},
reporter_fn: &Electric.Telemetry.CallHomeReporter.report_home/1}
reporting_period: {30, :minute}}
end

def static_info() do
Expand All @@ -78,12 +84,6 @@ defmodule Electric.Telemetry do
# make sure you also change the receiver
def call_home_metrics() do
[
environment: [
pg_version:
last_value("electric.postgres.info_looked_up.pg_version",
reporter_options: [persist_between_sends: true]
)
],
resources: [
uptime:
last_value("vm.uptime.total",
Expand All @@ -107,39 +107,6 @@ defmodule Electric.Telemetry do
swap_used: last_value("system.swap.used"),
swap_free_percent: last_value("system.swap_percent.free"),
swap_used_percent: last_value("system.swap_percent.used")
],
usage: [
inbound_bytes:
sum("electric.postgres.replication.transaction_received.bytes", unit: :byte),
inbound_transactions: sum("electric.postgres.replication.transaction_received.count"),
inbound_operations: sum("electric.postgres.replication.transaction_received.operations"),
stored_bytes: sum("electric.storage.transaction_stored.bytes", unit: :byte),
stored_transactions: sum("electric.storage.transaction_stored.count"),
stored_operations: sum("electric.storage.transaction_stored.operations"),
total_used_storage_kb: last_value("electric.storage.used", unit: {:byte, :kilobyte}),
total_shapes: last_value("electric.shapes.total_shapes.count"),
active_shapes:
summary("electric.plug.serve_shape.monotonic_time",
unit: :unique,
reporter_options: [count_unique: :shape_handle],
keep: &(&1.status < 300)
),
unique_clients:
summary("electric.plug.serve_shape.monotonic_time",
unit: :unique,
reporter_options: [count_unique: :client_ip],
keep: &(&1.status < 300)
),
sync_requests:
counter("electric.plug.serve_shape.monotonic_time",
drop: &(Map.get(&1, :live, false) || false)
),
live_requests:
counter("electric.plug.serve_shape.monotonic_time",
keep: &(Map.get(&1, :live, false) || false)
),
served_bytes: sum("electric.plug.serve_shape.bytes", unit: :byte),
wal_size: summary("electric.postgres.replication.wal_size", unit: :byte)
]
]
end
Expand Down Expand Up @@ -169,21 +136,6 @@ defmodule Electric.Telemetry do
last_value("vm.total_run_queue_lengths.total"),
last_value("vm.total_run_queue_lengths.cpu"),
last_value("vm.total_run_queue_lengths.io"),
summary("plug.router_dispatch.stop.duration",
tags: [:route],
unit: {:native, :millisecond}
),
summary("plug.router_dispatch.exception.duration",
tags: [:route],
unit: {:native, :millisecond}
),
summary("electric.shape_cache.create_snapshot_task.stop.duration",
unit: {:native, :millisecond}
),
summary("electric.storage.make_new_snapshot.stop.duration", unit: {:native, :millisecond}),
summary("electric.querying.stream_initial_data.stop.duration",
unit: {:native, :millisecond}
),
last_value("system.load_percent.avg1"),
last_value("system.load_percent.avg5"),
last_value("system.load_percent.avg15"),
Expand All @@ -199,10 +151,6 @@ defmodule Electric.Telemetry do
[
last_value("system.cpu.core_count"),
last_value("system.cpu.utilization.total"),
last_value("electric.postgres.replication.wal_size", unit: :byte),
last_value("electric.storage.used", unit: {:byte, :kilobyte}),
last_value("electric.shapes.total_shapes.count"),
last_value("vm.memory.total", unit: :byte),
last_value("vm.memory.processes_used", unit: :byte),
last_value("vm.memory.binary", unit: :byte),
last_value("vm.memory.ets", unit: :byte),
Expand All @@ -212,15 +160,16 @@ defmodule Electric.Telemetry do
last_value("vm.total_run_queue_lengths.total"),
last_value("vm.total_run_queue_lengths.cpu"),
last_value("vm.total_run_queue_lengths.io"),
last_value("electric.postgres.replication.wal_size", unit: :byte),
counter("electric.postgres.replication.transaction_received.count"),
sum("electric.postgres.replication.transaction_received.bytes", unit: :byte),
sum("electric.storage.transaction_stored.bytes", unit: :byte)
last_value("vm.uptime.total",
unit: :second,
measurement: &:erlang.convert_time_unit(&1.total, :native, :second)
),
last_value("vm.memory.total", unit: :byte)
] ++
Enum.map(
# Add "system.cpu.utilization.core_*" but since there's no wildcard support we
# explicitly add 64 cores. If there are fewer cores, the missing ones will be ignored.
0..63,
# explicitly add the cores here.
0..(:erlang.system_info(:logical_processors) - 1),
&last_value("system.cpu.utilization.core_#{&1}")
)
end
Expand All @@ -229,22 +178,11 @@ defmodule Electric.Telemetry do
[
last_value("system.load_percent.avg1"),
last_value("system.load_percent.avg5"),
last_value("system.load_percent.avg15"),
distribution("electric.plug.serve_shape.duration",
drop: &(Map.get(&1, :live, false) || false)
),
distribution("electric.shape_cache.create_snapshot_task.stop.duration",
unit: {:native, :millisecond}
),
distribution("electric.storage.make_new_snapshot.stop.duration",
unit: {:native, :millisecond}
)
last_value("system.load_percent.avg15")
] ++ prometheus_metrics()
end

defp periodic_measurements(opts) do
stack_id = Keyword.fetch!(opts, :stack_id)

defp periodic_measurements() do
[
# Default measurements included with the telemetry_poller application:
:memory,
Expand All @@ -253,13 +191,9 @@ defmodule Electric.Telemetry do

# Our custom measurements:
{__MODULE__, :uptime_event, []},
{__MODULE__, :count_shapes, [stack_id]},
{__MODULE__, :cpu_utilization, []},
{__MODULE__, :get_total_disk_usage, [opts]},
{__MODULE__, :get_system_load_average, [opts]},
{__MODULE__, :get_system_memory_usage, [opts]},
{Electric.Connection.Manager, :report_retained_wal_size,
[Electric.Connection.Manager.name(stack_id)]}
{__MODULE__, :get_system_load_average, []},
{__MODULE__, :get_system_memory_usage, []}
]
end

Expand All @@ -269,28 +203,6 @@ defmodule Electric.Telemetry do
})
end

def count_shapes(stack_id) do
Electric.ShapeCache.list_shapes(stack_id: stack_id)
|> length()
|> then(
&:telemetry.execute([:electric, :shapes, :total_shapes], %{count: &1}, %{stack_id: stack_id})
)
end

def get_total_disk_usage(opts) do
storage = Electric.StackSupervisor.storage_mod_arg(Map.new(opts))

Electric.ShapeCache.Storage.get_total_disk_usage(storage)
|> then(
&:telemetry.execute([:electric, :storage], %{used: &1}, %{
stack_id: opts[:stack_id]
})
)
catch
:exit, {:noproc, _} ->
:ok
end

def cpu_utilization do
cores =
:cpu_sup.util([:per_cpu])
Expand All @@ -303,7 +215,7 @@ defmodule Electric.Telemetry do
:telemetry.execute([:system, :cpu], %{core_count: Enum.count(cores)})
end

def get_system_load_average(opts) do
def get_system_load_average do
cores = :erlang.system_info(:logical_processors)

# > The load values are proportional to how long time a runnable Unix
Expand Down Expand Up @@ -340,18 +252,12 @@ defmodule Electric.Telemetry do
|> Map.new(fn probe ->
{probe, 100 * (apply(:cpu_sup, probe, []) / 256 / cores)}
end)
|> then(
&:telemetry.execute([:system, :load_percent], &1, %{
stack_id: opts[:stack_id]
})
)
|> then(&:telemetry.execute([:system, :load_percent], &1))
end

@required_system_memory_keys ~w[system_total_memory free_memory]a

def get_system_memory_usage(opts) do
metadata = %{stack_id: opts[:stack_id]}

def get_system_memory_usage() do
system_memory = Map.new(:memsup.get_system_memory_data())

# Sanity-check that all the required keys are present before doing any arithmetic on them
Expand Down Expand Up @@ -387,13 +293,13 @@ defmodule Electric.Telemetry do

mem_stats = Map.put(mem_stats, :total_memory, total)

:telemetry.execute([:system, :memory], mem_stats, metadata)
:telemetry.execute([:system, :memory_percent], mem_percent_stats, metadata)
:telemetry.execute([:system, :memory], mem_stats)
:telemetry.execute([:system, :memory_percent], mem_percent_stats)

mem_stats
end

Map.merge(mem_stats, swap_stats(:os.type(), system_memory, metadata))
Map.merge(mem_stats, swap_stats(:os.type(), system_memory))
end

defp resident_memory(%{available_memory: available_memory}) do
Expand Down Expand Up @@ -423,12 +329,12 @@ defmodule Electric.Telemetry do
%{}
end

defp swap_stats({:unix, :darwin}, _system_memory, _metadata) do
defp swap_stats({:unix, :darwin}, _system_memory) do
# On macOS, swap stats are not available
%{}
end

defp swap_stats(_os_type, %{total_swap: total, free_swap: free}, metadata) do
defp swap_stats(_os_type, %{total_swap: total, free_swap: free}) do
used = total - free

swap_stats = %{total_swap: total, free_swap: free, used_swap: used}
Expand All @@ -440,14 +346,14 @@ defmodule Electric.Telemetry do
%{free_swap: 0, used_swap: 0}
end

:telemetry.execute([:system, :swap], swap_stats, metadata)
:telemetry.execute([:system, :swap_percent], swap_percent_stats, metadata)
:telemetry.execute([:system, :swap], swap_stats)
:telemetry.execute([:system, :swap_percent], swap_percent_stats)

swap_stats
end

@required_swap_keys ~w[total_swap free_swap]a
defp swap_stats(_os_type, system_memory, _metadata) do
defp swap_stats(_os_type, system_memory) do
missing_swap_keys = Enum.reject(@required_swap_keys, &Map.has_key?(system_memory, &1))

Logger.warning(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ defmodule Electric.Telemetry.CallHomeReporter do
static_info = Keyword.get(opts, :static_info, %{})
first_report_in = cast_time_to_ms(Keyword.fetch!(opts, :first_report_in))
reporting_period = cast_time_to_ms(Keyword.fetch!(opts, :reporting_period))
reporter_fn = Keyword.fetch!(opts, :reporter_fn)
reporter_fn = Keyword.get(opts, :reporter_fn, &report_home/1)

GenServer.start_link(
__MODULE__,
Expand Down
Loading

0 comments on commit 7cb4ccb

Please sign in to comment.