Skip to content

Commit

Permalink
chore(sync-service): Isolate telemetry config (#2344)
Browse files Browse the repository at this point in the history
Following @msfstef 's comments on [this
PR](electric-sql/stratovolt#311) - This PR
separates the telemetry config from application environment config
  • Loading branch information
robacourt authored Feb 17, 2025
1 parent 93cef8a commit 329c428
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 96 deletions.
5 changes: 5 additions & 0 deletions .changeset/popular-shrimps-ring.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Electric as a library: Telemetry config is now option parameters rather than application environemnt config
14 changes: 12 additions & 2 deletions packages/sync-service/lib/electric/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,15 @@ defmodule Electric.Application do
[]
end

telemetry_opts = [
instance_id: Electric.instance_id(),
system_metrics_poll_interval: Electric.Config.get_env(:system_metrics_poll_interval),
statsd_host: Electric.Config.get_env(:telemetry_statsd_host),
prometheus?: not is_nil(Electric.Config.get_env(:prometheus_port)),
call_home_telemetry?: Electric.Config.get_env(:call_home_telemetry?),
otel_metrics?: not is_nil(Application.get_env(:otel_metric_exporter, :otlp_endpoint))
]

# The root application supervisor starts the core global processes, including the HTTP
# server and the database connection manager. The latter is responsible for establishing
# all needed connections to the database (acquiring the exclusive access lock, opening a
Expand Down Expand Up @@ -88,9 +97,10 @@ defmodule Electric.Application do
pool_opts: [pool_size: Electric.Config.get_env(:db_pool_size)],
storage: storage,
chunk_bytes_threshold: Electric.Config.get_env(:chunk_bytes_threshold),
name: Electric.StackSupervisor
name: Electric.StackSupervisor,
telemetry_opts: telemetry_opts
},
{Electric.Telemetry.ApplicationTelemetry, []}
{Electric.Telemetry.ApplicationTelemetry, telemetry_opts}
],
api_server,
prometheus_endpoint(Electric.Config.get_env(:prometheus_port))
Expand Down
13 changes: 0 additions & 13 deletions packages/sync-service/lib/electric/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -113,19 +113,6 @@ defmodule Electric.Config do
Application.fetch_env!(:electric, key)
end

@doc """
True when at least one metric exporter is enabled.
This function is used to skip starting the Electric.Telemetry supervisor when there's no need
to capture periodic measurements. Useful in the dev and test environments.
"""
def telemetry_export_enabled? do
not is_nil(Electric.Config.get_env(:telemetry_statsd_host)) or
not is_nil(Electric.Config.get_env(:prometheus_port)) or
Electric.Config.get_env(:call_home_telemetry?) or
not is_nil(Application.get_env(:otel_metric_exporter, :otlp_endpoint))
end

@doc ~S"""
Parse a PostgreSQL URI into a keyword list.
Expand Down
4 changes: 3 additions & 1 deletion packages/sync-service/lib/electric/stack_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ defmodule Electric.StackSupervisor do
registry_partitions: [type: :non_neg_integer, required: false]
]
],
telemetry_opts: [type: :keyword_list, default: []],
telemetry_span_attrs: [
# Validates the OpenTelemetry.attributes_map() type
# cf. https://github.com/open-telemetry/opentelemetry-erlang/blob/9f7affe630676d2803b04f69d0c759effb6e0245/apps/opentelemetry_api/src/opentelemetry.erl#L118
Expand Down Expand Up @@ -278,7 +279,8 @@ defmodule Electric.StackSupervisor do
name: shape_changes_registry_name, keys: :duplicate, partitions: registry_partitions},
{Electric.Postgres.Inspector.EtsInspector, stack_id: stack_id, pool: db_pool},
{Electric.Connection.Supervisor, new_connection_manager_opts},
{Electric.Telemetry.StackTelemetry, stack_id: stack_id, storage: config.storage}
{Electric.Telemetry.StackTelemetry,
config.telemetry_opts ++ [stack_id: stack_id, storage: config.storage]}
]

# Store the telemetry span attributes in the persistent term for this stack
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,58 +10,67 @@ defmodule Electric.Telemetry.ApplicationTelemetry do

require Logger

@opts_schema NimbleOptions.new!(Electric.Telemetry.Opts.schema())

def start_link(opts) do
if Electric.Config.telemetry_export_enabled?() do
Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
else
# Avoid starting the telemetry supervisor and its telemetry_poller child if we're not
# intending to export periodic measurements metrics anywhere.
:ignore
with {:ok, opts} <- NimbleOptions.validate(opts, @opts_schema) do
if telemetry_export_enabled?(Map.new(opts)) do
Supervisor.start_link(__MODULE__, Map.new(opts), name: __MODULE__)
else
# Avoid starting the telemetry supervisor and its telemetry_poller child if we're not
# intending to export periodic measurements metrics anywhere.
:ignore
end
end
end

def init(_opts) do
def init(opts) do
Process.set_label(:application_telemetry_supervisor)

system_metrics_poll_interval = Electric.Config.get_env(:system_metrics_poll_interval)
statsd_host = Electric.Config.get_env(:telemetry_statsd_host)
prometheus? = not is_nil(Electric.Config.get_env(:prometheus_port))
call_home_telemetry? = Electric.Config.get_env(:call_home_telemetry?)
otel_metrics? = not is_nil(Application.get_env(:otel_metric_exporter, :otlp_endpoint))
[telemetry_poller_child_spec(opts) | exporter_child_specs(opts)]
|> Supervisor.init(strategy: :one_for_one)
end

defp telemetry_poller_child_spec(opts) do
# The telemetry_poller application starts its own poller by default but we disable that
# and add its default measurements to the list of our custom ones. This allows for all
# periodic measurements to be defined in one place.
{:telemetry_poller,
measurements: periodic_measurements(),
period: opts.system_metrics_poll_interval,
init_delay: :timer.seconds(5)}
end

defp telemetry_export_enabled?(opts) do
exporter_child_specs(opts) != []
end

defp exporter_child_specs(opts) do
[
# The telemetry_poller application starts its own poller by default but we disable that
# and add its default measurements to the list of our custom ones. This allows for all
# periodic measurements to be defined in one place.
{:telemetry_poller,
measurements: periodic_measurements(),
period: system_metrics_poll_interval,
init_delay: :timer.seconds(5)},
statsd_reporter_child_spec(statsd_host),
prometheus_reporter_child_spec(prometheus?),
call_home_reporter_child_spec(call_home_telemetry?),
otel_reporter_child_spec(otel_metrics?)
statsd_reporter_child_spec(opts),
prometheus_reporter_child_spec(opts),
call_home_reporter_child_spec(opts),
otel_reporter_child_spec(opts)
]
|> Enum.reject(&is_nil/1)
|> Supervisor.init(strategy: :one_for_one)
end

defp otel_reporter_child_spec(true) do
defp otel_reporter_child_spec(%{otel_metrics?: true}) do
{OtelMetricExporter, metrics: otel_metrics(), export_period: :timer.seconds(30)}
end

defp otel_reporter_child_spec(false), do: nil

defp call_home_reporter_child_spec(false), do: nil
defp otel_reporter_child_spec(_), do: nil

defp call_home_reporter_child_spec(true) do
defp call_home_reporter_child_spec(%{call_home_telemetry?: true}) do
{Electric.Telemetry.CallHomeReporter,
static_info: static_info(),
metrics: call_home_metrics(),
first_report_in: {2, :minute},
reporting_period: {30, :minute}}
end

defp call_home_reporter_child_spec(_), do: nil

def static_info() do
{total_mem, _, _} = :memsup.get_memory_data()
processors = :erlang.system_info(:logical_processors)
Expand Down Expand Up @@ -111,22 +120,22 @@ defmodule Electric.Telemetry.ApplicationTelemetry do
]
end

defp statsd_reporter_child_spec(nil), do: nil

defp statsd_reporter_child_spec(host) do
defp statsd_reporter_child_spec(%{statsd_host: host} = opts) when host != nil do
{TelemetryMetricsStatsd,
host: host,
formatter: :datadog,
global_tags: [instance_id: Electric.instance_id()],
global_tags: [instance_id: opts.instance_id],
metrics: statsd_metrics()}
end

defp prometheus_reporter_child_spec(false), do: nil
defp statsd_reporter_child_spec(_), do: nil

defp prometheus_reporter_child_spec(true) do
defp prometheus_reporter_child_spec(%{prometheus?: true}) do
{TelemetryMetricsPrometheus.Core, metrics: prometheus_metrics()}
end

defp prometheus_reporter_child_spec(_), do: nil

defp statsd_metrics() do
[
last_value("vm.memory.total", unit: :byte),
Expand Down
12 changes: 12 additions & 0 deletions packages/sync-service/lib/electric/telemetry/opts.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
defmodule Electric.Telemetry.Opts do
def schema do
[
instance_id: [type: :string],
system_metrics_poll_interval: [type: :integer, default: :timer.seconds(5)],
statsd_host: [type: {:or, [:string, nil]}, default: nil],
prometheus?: [type: :boolean, default: false],
call_home_telemetry?: [type: :boolean, default: false],
otel_metrics?: [type: :boolean, default: false]
]
end
end
104 changes: 59 additions & 45 deletions packages/sync-service/lib/electric/telemetry/stack_telemetry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,60 +12,78 @@ defmodule Electric.Telemetry.StackTelemetry do

require Logger

@opts_schema NimbleOptions.new!(
Electric.Telemetry.Opts.schema() ++
[
stack_id: [type: :string, required: true],
storage: [type: :mod_arg, required: true]
]
)

def start_link(opts) do
if Electric.Config.telemetry_export_enabled?() do
Supervisor.start_link(__MODULE__, opts)
else
# Avoid starting the telemetry supervisor and its telemetry_poller child if we're not
# intending to export periodic measurements metrics anywhere.
:ignore
with {:ok, opts} <- NimbleOptions.validate(opts, @opts_schema) do
if telemetry_export_enabled?(Map.new(opts)) do
Supervisor.start_link(__MODULE__, Map.new(opts))
else
# Avoid starting the telemetry supervisor and its telemetry_poller child if we're not
# intending to export periodic measurements metrics anywhere.
:ignore
end
end
end

def init(opts) do
Process.set_label({:stack_telemetry_supervisor, stack_id(opts)})
Process.set_label({:stack_telemetry_supervisor, opts.stack_id})

[telemetry_poller_child_spec(opts) | exporter_child_specs(opts)]
|> Supervisor.init(strategy: :one_for_one)
end

system_metrics_poll_interval = Electric.Config.get_env(:system_metrics_poll_interval)
statsd_host = Electric.Config.get_env(:telemetry_statsd_host)
prometheus? = not is_nil(Electric.Config.get_env(:prometheus_port))
call_home_telemetry? = Electric.Config.get_env(:call_home_telemetry?)
otel_metrics? = not is_nil(Application.get_env(:otel_metric_exporter, :otlp_endpoint))
defp telemetry_poller_child_spec(opts) do
# The telemetry_poller application starts its own poller by default but we disable that
# and add its default measurements to the list of our custom ones. This allows for all
# periodic measurements to be defined in one place.
{:telemetry_poller,
measurements: periodic_measurements(opts),
period: opts.system_metrics_poll_interval,
init_delay: :timer.seconds(3)}
end

defp telemetry_export_enabled?(opts) do
exporter_child_specs(opts) != []
end

defp exporter_child_specs(opts) do
[
{:telemetry_poller,
measurements: periodic_measurements(opts),
period: system_metrics_poll_interval,
init_delay: :timer.seconds(3)},
statsd_reporter_child_spec(statsd_host, opts),
prometheus_reporter_child_spec(prometheus?, opts),
call_home_reporter_child_spec(call_home_telemetry?, opts),
otel_reporter_child_spec(otel_metrics?, opts)
statsd_reporter_child_spec(opts),
prometheus_reporter_child_spec(opts),
call_home_reporter_child_spec(opts),
otel_reporter_child_spec(opts)
]
|> Enum.reject(&is_nil/1)
|> Supervisor.init(strategy: :one_for_one)
end

defp otel_reporter_child_spec(true, opts) do
defp otel_reporter_child_spec(%{otel_metrics?: true} = opts) do
{OtelMetricExporter,
name: :"stack_otel_telemetry_#{stack_id(opts)}",
name: :"stack_otel_telemetry_#{opts.stack_id}",
metrics: otel_metrics(opts),
export_period: :timer.seconds(30),
resource: %{stack_id: stack_id(opts)}}
resource: %{stack_id: opts.stack_id}}
end

defp otel_reporter_child_spec(false, _opts), do: nil

defp call_home_reporter_child_spec(false, _opts), do: nil
defp otel_reporter_child_spec(_), do: nil

defp call_home_reporter_child_spec(true, opts) do
defp call_home_reporter_child_spec(%{call_home_telemetry?: true} = opts) do
{Electric.Telemetry.CallHomeReporter,
name: :"stack_call_home_telemetry_#{stack_id(opts)}",
name: :"stack_call_home_telemetry_#{opts.stack_id}",
static_info: static_info(opts),
metrics: call_home_metrics(opts),
first_report_in: {2, :minute},
reporting_period: {30, :minute}}
end

defp call_home_reporter_child_spec(_), do: nil

def static_info(opts) do
{total_mem, _, _} = :memsup.get_memory_data()
processors = :erlang.system_info(:logical_processors)
Expand All @@ -80,7 +98,7 @@ defmodule Electric.Telemetry.StackTelemetry do
cores: processors,
ram: total_mem,
electric_instance_id: Electric.instance_id(),
stack_id: stack_id(opts)
stack_id: opts.stack_id
}
}
end
Expand Down Expand Up @@ -141,23 +159,23 @@ defmodule Electric.Telemetry.StackTelemetry do
]
end

defp statsd_reporter_child_spec(nil, _opts), do: nil

defp statsd_reporter_child_spec(host, opts) do
defp statsd_reporter_child_spec(%{statsd_host: host} = opts) when host != nil do
{TelemetryMetricsStatsd,
host: host,
formatter: :datadog,
global_tags: [instance_id: Electric.instance_id()],
global_tags: [instance_id: opts.instance_id],
metrics: statsd_metrics(opts)}
end

defp prometheus_reporter_child_spec(false, _opts), do: nil
defp statsd_reporter_child_spec(_), do: nil

defp prometheus_reporter_child_spec(true, opts) do
defp prometheus_reporter_child_spec(%{prometheus?: true} = opts) do
{TelemetryMetricsPrometheus.Core,
name: :"stack_prometheus_telemetry_#{stack_id(opts)}", metrics: prometheus_metrics(opts)}
name: :"stack_prometheus_telemetry_#{opts.stack_id}", metrics: prometheus_metrics(opts)}
end

defp prometheus_reporter_child_spec(_), do: nil

defp statsd_metrics(opts) do
[
summary("plug.router_dispatch.stop.duration",
Expand Down Expand Up @@ -221,10 +239,10 @@ defmodule Electric.Telemetry.StackTelemetry do

defp periodic_measurements(opts) do
[
{__MODULE__, :count_shapes, [stack_id(opts)]},
{__MODULE__, :count_shapes, [opts.stack_id]},
{__MODULE__, :get_total_disk_usage, [opts]},
{Electric.Connection.Manager, :report_retained_wal_size,
[Electric.Connection.Manager.name(stack_id(opts))]}
[Electric.Connection.Manager.name(opts.stack_id)]}
]
end

Expand All @@ -237,7 +255,7 @@ defmodule Electric.Telemetry.StackTelemetry do
end

def get_total_disk_usage(opts) do
storage = Electric.StackSupervisor.storage_mod_arg(Map.new(opts))
storage = Electric.StackSupervisor.storage_mod_arg(opts)

Electric.ShapeCache.Storage.get_total_disk_usage(storage)
|> then(
Expand All @@ -250,12 +268,8 @@ defmodule Electric.Telemetry.StackTelemetry do
:ok
end

defp stack_id(opts) do
Keyword.fetch!(opts, :stack_id)
end

def for_stack(opts) do
stack_id = stack_id(opts)
stack_id = opts.stack_id

fn metadata ->
metadata[:stack_id] == stack_id
Expand Down

0 comments on commit 329c428

Please sign in to comment.