diff --git a/.changeset/grumpy-cheetahs-hear.md b/.changeset/grumpy-cheetahs-hear.md new file mode 100644 index 0000000000..f5fabebb19 --- /dev/null +++ b/.changeset/grumpy-cheetahs-hear.md @@ -0,0 +1,5 @@ +--- +"@core/sync-service": patch +--- + +Electric as a library: Support multiple stacks diff --git a/packages/sync-service/array.sql b/packages/sync-service/array.sql new file mode 100644 index 0000000000..85af8e2897 --- /dev/null +++ b/packages/sync-service/array.sql @@ -0,0 +1,25 @@ +DROP TABLE temp_values; + +CREATE TEMP TABLE temp_values ( value text[]); + +DO +$do$ +BEGIN + FOR i IN 1..40000 LOOP + INSERT INTO temp_values (value) + SELECT ARRAY( + SELECT floor(random() * 100)::text + FROM generate_series(1, 5) + ); + END LOOP; +END +$do$; + +EXPLAIN ANALYZE +SELECT * +FROM temp_values +WHERE ARRAY( + SELECT floor(random() * 100)::text + FROM generate_series(1, 100) +) @> value; + diff --git a/packages/sync-service/config/runtime.exs b/packages/sync-service/config/runtime.exs index df16a4b973..edde570e44 100644 --- a/packages/sync-service/config/runtime.exs +++ b/packages/sync-service/config/runtime.exs @@ -83,7 +83,11 @@ if otlp_endpoint do otlp_endpoint: otlp_endpoint, otlp_headers: Map.new(headers), otlp_compression: :gzip, - resource: %{service: %{name: service_name, version: version}, instance: %{id: instance_id}} + resource: %{ + name: "metrics", + service: %{name: service_name, version: version}, + instance: %{id: instance_id} + } end otel_batch_processor = diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index 052a2b3e62..d1367873dc 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -78,7 +78,7 @@ defmodule Electric.Application do chunk_bytes_threshold: Electric.Config.get_env(:chunk_bytes_threshold), name: Electric.StackSupervisor }, - {Electric.Telemetry, stack_id: stack_id, storage: storage}, + {Electric.Telemetry.ApplicationTelemetry, []}, {Bandit, plug: {Electric.Plug.Router, router_opts}, port: Electric.Config.get_env(:service_port), diff --git a/packages/sync-service/lib/electric/stack_supervisor.ex b/packages/sync-service/lib/electric/stack_supervisor.ex index a74b1d64db..a8c0ba9b4f 100644 --- a/packages/sync-service/lib/electric/stack_supervisor.ex +++ b/packages/sync-service/lib/electric/stack_supervisor.ex @@ -277,7 +277,8 @@ defmodule Electric.StackSupervisor do {Registry, name: shape_changes_registry_name, keys: :duplicate, partitions: registry_partitions}, {Electric.Postgres.Inspector.EtsInspector, stack_id: stack_id, pool: db_pool}, - {Electric.Connection.Supervisor, new_connection_manager_opts} + {Electric.Connection.Supervisor, new_connection_manager_opts}, + {Electric.Telemetry.StackTelemetry, stack_id: stack_id, storage: config.storage} ] # Store the telemetry span attributes in the persistent term for this stack diff --git a/packages/sync-service/lib/electric/telemetry.ex b/packages/sync-service/lib/electric/telemetry/application_telemetry.ex similarity index 67% rename from packages/sync-service/lib/electric/telemetry.ex rename to packages/sync-service/lib/electric/telemetry/application_telemetry.ex index 88a1d482c6..1fad149acd 100644 --- a/packages/sync-service/lib/electric/telemetry.ex +++ b/packages/sync-service/lib/electric/telemetry/application_telemetry.ex @@ -1,13 +1,18 @@ -defmodule Electric.Telemetry do +defmodule Electric.Telemetry.ApplicationTelemetry do + @moduledoc """ + Collects and exports application level telemetry such as CPU, memory and BEAM metrics. + + See also StackTelemetry for stack specific telemetry. + """ use Supervisor import Telemetry.Metrics require Logger - def start_link(init_arg) do + def start_link(opts) do if Electric.Config.telemetry_export_enabled?() do - Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__) + Supervisor.start_link(__MODULE__, opts, name: __MODULE__) else # Avoid starting the telemetry supervisor and its telemetry_poller child if we're not # intending to export periodic measurements metrics anywhere. @@ -15,7 +20,9 @@ defmodule Electric.Telemetry do end end - def init(opts) do + def init(_opts) do + Process.set_label(:application_telemetry_supervisor) + system_metrics_poll_interval = Electric.Config.get_env(:system_metrics_poll_interval) statsd_host = Electric.Config.get_env(:telemetry_statsd_host) prometheus? = not is_nil(Electric.Config.get_env(:prometheus_port)) @@ -27,7 +34,7 @@ defmodule Electric.Telemetry do # and add its default measurements to the list of our custom ones. This allows for all # periodic measurements to be defined in one place. {:telemetry_poller, - measurements: periodic_measurements(opts), + measurements: periodic_measurements(), period: system_metrics_poll_interval, init_delay: :timer.seconds(5)}, statsd_reporter_child_spec(statsd_host), @@ -52,8 +59,7 @@ defmodule Electric.Telemetry do static_info: static_info(), metrics: call_home_metrics(), first_report_in: {2, :minute}, - reporting_period: {30, :minute}, - reporter_fn: &Electric.Telemetry.CallHomeReporter.report_home/1} + reporting_period: {30, :minute}} end def static_info() do @@ -78,12 +84,6 @@ defmodule Electric.Telemetry do # make sure you also change the receiver def call_home_metrics() do [ - environment: [ - pg_version: - last_value("electric.postgres.info_looked_up.pg_version", - reporter_options: [persist_between_sends: true] - ) - ], resources: [ uptime: last_value("vm.uptime.total", @@ -107,39 +107,6 @@ defmodule Electric.Telemetry do swap_used: last_value("system.swap.used"), swap_free_percent: last_value("system.swap_percent.free"), swap_used_percent: last_value("system.swap_percent.used") - ], - usage: [ - inbound_bytes: - sum("electric.postgres.replication.transaction_received.bytes", unit: :byte), - inbound_transactions: sum("electric.postgres.replication.transaction_received.count"), - inbound_operations: sum("electric.postgres.replication.transaction_received.operations"), - stored_bytes: sum("electric.storage.transaction_stored.bytes", unit: :byte), - stored_transactions: sum("electric.storage.transaction_stored.count"), - stored_operations: sum("electric.storage.transaction_stored.operations"), - total_used_storage_kb: last_value("electric.storage.used", unit: {:byte, :kilobyte}), - total_shapes: last_value("electric.shapes.total_shapes.count"), - active_shapes: - summary("electric.plug.serve_shape.monotonic_time", - unit: :unique, - reporter_options: [count_unique: :shape_handle], - keep: &(&1.status < 300) - ), - unique_clients: - summary("electric.plug.serve_shape.monotonic_time", - unit: :unique, - reporter_options: [count_unique: :client_ip], - keep: &(&1.status < 300) - ), - sync_requests: - counter("electric.plug.serve_shape.monotonic_time", - drop: &(Map.get(&1, :live, false) || false) - ), - live_requests: - counter("electric.plug.serve_shape.monotonic_time", - keep: &(Map.get(&1, :live, false) || false) - ), - served_bytes: sum("electric.plug.serve_shape.bytes", unit: :byte), - wal_size: summary("electric.postgres.replication.wal_size", unit: :byte) ] ] end @@ -169,21 +136,6 @@ defmodule Electric.Telemetry do last_value("vm.total_run_queue_lengths.total"), last_value("vm.total_run_queue_lengths.cpu"), last_value("vm.total_run_queue_lengths.io"), - summary("plug.router_dispatch.stop.duration", - tags: [:route], - unit: {:native, :millisecond} - ), - summary("plug.router_dispatch.exception.duration", - tags: [:route], - unit: {:native, :millisecond} - ), - summary("electric.shape_cache.create_snapshot_task.stop.duration", - unit: {:native, :millisecond} - ), - summary("electric.storage.make_new_snapshot.stop.duration", unit: {:native, :millisecond}), - summary("electric.querying.stream_initial_data.stop.duration", - unit: {:native, :millisecond} - ), last_value("system.load_percent.avg1"), last_value("system.load_percent.avg5"), last_value("system.load_percent.avg15"), @@ -199,10 +151,6 @@ defmodule Electric.Telemetry do [ last_value("system.cpu.core_count"), last_value("system.cpu.utilization.total"), - last_value("electric.postgres.replication.wal_size", unit: :byte), - last_value("electric.storage.used", unit: {:byte, :kilobyte}), - last_value("electric.shapes.total_shapes.count"), - last_value("vm.memory.total", unit: :byte), last_value("vm.memory.processes_used", unit: :byte), last_value("vm.memory.binary", unit: :byte), last_value("vm.memory.ets", unit: :byte), @@ -212,15 +160,16 @@ defmodule Electric.Telemetry do last_value("vm.total_run_queue_lengths.total"), last_value("vm.total_run_queue_lengths.cpu"), last_value("vm.total_run_queue_lengths.io"), - last_value("electric.postgres.replication.wal_size", unit: :byte), - counter("electric.postgres.replication.transaction_received.count"), - sum("electric.postgres.replication.transaction_received.bytes", unit: :byte), - sum("electric.storage.transaction_stored.bytes", unit: :byte) + last_value("vm.uptime.total", + unit: :second, + measurement: &:erlang.convert_time_unit(&1.total, :native, :second) + ), + last_value("vm.memory.total", unit: :byte) ] ++ Enum.map( # Add "system.cpu.utilization.core_*" but since there's no wildcard support we - # explicitly add 64 cores. If there are fewer cores, the missing ones will be ignored. - 0..63, + # explicitly add the cores here. + 0..(:erlang.system_info(:logical_processors) - 1), &last_value("system.cpu.utilization.core_#{&1}") ) end @@ -229,22 +178,11 @@ defmodule Electric.Telemetry do [ last_value("system.load_percent.avg1"), last_value("system.load_percent.avg5"), - last_value("system.load_percent.avg15"), - distribution("electric.plug.serve_shape.duration", - drop: &(Map.get(&1, :live, false) || false) - ), - distribution("electric.shape_cache.create_snapshot_task.stop.duration", - unit: {:native, :millisecond} - ), - distribution("electric.storage.make_new_snapshot.stop.duration", - unit: {:native, :millisecond} - ) + last_value("system.load_percent.avg15") ] ++ prometheus_metrics() end - defp periodic_measurements(opts) do - stack_id = Keyword.fetch!(opts, :stack_id) - + defp periodic_measurements() do [ # Default measurements included with the telemetry_poller application: :memory, @@ -253,13 +191,9 @@ defmodule Electric.Telemetry do # Our custom measurements: {__MODULE__, :uptime_event, []}, - {__MODULE__, :count_shapes, [stack_id]}, {__MODULE__, :cpu_utilization, []}, - {__MODULE__, :get_total_disk_usage, [opts]}, - {__MODULE__, :get_system_load_average, [opts]}, - {__MODULE__, :get_system_memory_usage, [opts]}, - {Electric.Connection.Manager, :report_retained_wal_size, - [Electric.Connection.Manager.name(stack_id)]} + {__MODULE__, :get_system_load_average, []}, + {__MODULE__, :get_system_memory_usage, []} ] end @@ -269,28 +203,6 @@ defmodule Electric.Telemetry do }) end - def count_shapes(stack_id) do - Electric.ShapeCache.list_shapes(stack_id: stack_id) - |> length() - |> then( - &:telemetry.execute([:electric, :shapes, :total_shapes], %{count: &1}, %{stack_id: stack_id}) - ) - end - - def get_total_disk_usage(opts) do - storage = Electric.StackSupervisor.storage_mod_arg(Map.new(opts)) - - Electric.ShapeCache.Storage.get_total_disk_usage(storage) - |> then( - &:telemetry.execute([:electric, :storage], %{used: &1}, %{ - stack_id: opts[:stack_id] - }) - ) - catch - :exit, {:noproc, _} -> - :ok - end - def cpu_utilization do cores = :cpu_sup.util([:per_cpu]) @@ -303,7 +215,7 @@ defmodule Electric.Telemetry do :telemetry.execute([:system, :cpu], %{core_count: Enum.count(cores)}) end - def get_system_load_average(opts) do + def get_system_load_average do cores = :erlang.system_info(:logical_processors) # > The load values are proportional to how long time a runnable Unix @@ -340,18 +252,12 @@ defmodule Electric.Telemetry do |> Map.new(fn probe -> {probe, 100 * (apply(:cpu_sup, probe, []) / 256 / cores)} end) - |> then( - &:telemetry.execute([:system, :load_percent], &1, %{ - stack_id: opts[:stack_id] - }) - ) + |> then(&:telemetry.execute([:system, :load_percent], &1)) end @required_system_memory_keys ~w[system_total_memory free_memory]a - def get_system_memory_usage(opts) do - metadata = %{stack_id: opts[:stack_id]} - + def get_system_memory_usage() do system_memory = Map.new(:memsup.get_system_memory_data()) # Sanity-check that all the required keys are present before doing any arithmetic on them @@ -387,13 +293,13 @@ defmodule Electric.Telemetry do mem_stats = Map.put(mem_stats, :total_memory, total) - :telemetry.execute([:system, :memory], mem_stats, metadata) - :telemetry.execute([:system, :memory_percent], mem_percent_stats, metadata) + :telemetry.execute([:system, :memory], mem_stats) + :telemetry.execute([:system, :memory_percent], mem_percent_stats) mem_stats end - Map.merge(mem_stats, swap_stats(:os.type(), system_memory, metadata)) + Map.merge(mem_stats, swap_stats(:os.type(), system_memory)) end defp resident_memory(%{available_memory: available_memory}) do @@ -423,12 +329,12 @@ defmodule Electric.Telemetry do %{} end - defp swap_stats({:unix, :darwin}, _system_memory, _metadata) do + defp swap_stats({:unix, :darwin}, _system_memory) do # On macOS, swap stats are not available %{} end - defp swap_stats(_os_type, %{total_swap: total, free_swap: free}, metadata) do + defp swap_stats(_os_type, %{total_swap: total, free_swap: free}) do used = total - free swap_stats = %{total_swap: total, free_swap: free, used_swap: used} @@ -440,14 +346,14 @@ defmodule Electric.Telemetry do %{free_swap: 0, used_swap: 0} end - :telemetry.execute([:system, :swap], swap_stats, metadata) - :telemetry.execute([:system, :swap_percent], swap_percent_stats, metadata) + :telemetry.execute([:system, :swap], swap_stats) + :telemetry.execute([:system, :swap_percent], swap_percent_stats) swap_stats end @required_swap_keys ~w[total_swap free_swap]a - defp swap_stats(_os_type, system_memory, _metadata) do + defp swap_stats(_os_type, system_memory) do missing_swap_keys = Enum.reject(@required_swap_keys, &Map.has_key?(system_memory, &1)) Logger.warning( diff --git a/packages/sync-service/lib/electric/telemetry/call_home_reporter.ex b/packages/sync-service/lib/electric/telemetry/call_home_reporter.ex index f434325a00..3a256a513b 100644 --- a/packages/sync-service/lib/electric/telemetry/call_home_reporter.ex +++ b/packages/sync-service/lib/electric/telemetry/call_home_reporter.ex @@ -18,7 +18,7 @@ defmodule Electric.Telemetry.CallHomeReporter do static_info = Keyword.get(opts, :static_info, %{}) first_report_in = cast_time_to_ms(Keyword.fetch!(opts, :first_report_in)) reporting_period = cast_time_to_ms(Keyword.fetch!(opts, :reporting_period)) - reporter_fn = Keyword.fetch!(opts, :reporter_fn) + reporter_fn = Keyword.get(opts, :reporter_fn, &report_home/1) GenServer.start_link( __MODULE__, diff --git a/packages/sync-service/lib/electric/telemetry/stack_telemetry.ex b/packages/sync-service/lib/electric/telemetry/stack_telemetry.ex new file mode 100644 index 0000000000..9959810b37 --- /dev/null +++ b/packages/sync-service/lib/electric/telemetry/stack_telemetry.ex @@ -0,0 +1,264 @@ +defmodule Electric.Telemetry.StackTelemetry do + @moduledoc """ + Collects and exports stack level telemetry such as database and shape metrics. + + If multiple databases are used, each database will have it's own stack and it's own StackTelemetry. + + See also ApplicationTelemetry for application/system level specific telemetry. + """ + use Supervisor + + import Telemetry.Metrics + + require Logger + + def start_link(opts) do + if Electric.Config.telemetry_export_enabled?() do + Supervisor.start_link(__MODULE__, opts) + else + # Avoid starting the telemetry supervisor and its telemetry_poller child if we're not + # intending to export periodic measurements metrics anywhere. + :ignore + end + end + + def init(opts) do + Process.set_label({:stack_telemetry_supervisor, stack_id(opts)}) + + system_metrics_poll_interval = Electric.Config.get_env(:system_metrics_poll_interval) + statsd_host = Electric.Config.get_env(:telemetry_statsd_host) + prometheus? = not is_nil(Electric.Config.get_env(:prometheus_port)) + call_home_telemetry? = Electric.Config.get_env(:call_home_telemetry?) + otel_metrics? = not is_nil(Application.get_env(:otel_metric_exporter, :otlp_endpoint)) + + [ + {:telemetry_poller, + measurements: periodic_measurements(opts), + period: system_metrics_poll_interval, + init_delay: :timer.seconds(3)}, + statsd_reporter_child_spec(statsd_host, opts), + prometheus_reporter_child_spec(prometheus?, opts), + call_home_reporter_child_spec(call_home_telemetry?, opts), + otel_reporter_child_spec(otel_metrics?, opts) + ] + |> Enum.reject(&is_nil/1) + |> Supervisor.init(strategy: :one_for_one) + end + + defp otel_reporter_child_spec(true, opts) do + {OtelMetricExporter, + name: :"stack_otel_telemetry_#{stack_id(opts)}", + metrics: otel_metrics(opts), + export_period: :timer.seconds(30), + resource: %{stack_id: stack_id(opts)}} + end + + defp otel_reporter_child_spec(false, _opts), do: nil + + defp call_home_reporter_child_spec(false, _opts), do: nil + + defp call_home_reporter_child_spec(true, opts) do + {Electric.Telemetry.CallHomeReporter, + name: :"stack_call_home_telemetry_#{stack_id(opts)}", + static_info: static_info(opts), + metrics: call_home_metrics(opts), + first_report_in: {2, :minute}, + reporting_period: {30, :minute}} + end + + def static_info(opts) do + {total_mem, _, _} = :memsup.get_memory_data() + processors = :erlang.system_info(:logical_processors) + {os_family, os_name} = :os.type() + arch = :erlang.system_info(:system_architecture) + + %{ + electric_version: to_string(Electric.version()), + environment: %{ + os: %{family: os_family, name: os_name}, + arch: to_string(arch), + cores: processors, + ram: total_mem, + electric_instance_id: Electric.instance_id(), + stack_id: stack_id(opts) + } + } + end + + # IMPORTANT: these metrics are validated on the receiver side, so if you change them, + # make sure you also change the receiver + def call_home_metrics(opts) do + for_stack = for_stack(opts) + + [ + environment: [ + pg_version: + last_value("electric.postgres.info_looked_up.pg_version", + reporter_options: [persist_between_sends: true], + keep: for_stack + ) + ], + usage: [ + inbound_bytes: + sum("electric.postgres.replication.transaction_received.bytes", + unit: :byte, + keep: for_stack + ), + inbound_transactions: + sum("electric.postgres.replication.transaction_received.count", keep: for_stack), + inbound_operations: + sum("electric.postgres.replication.transaction_received.operations", keep: for_stack), + stored_bytes: + sum("electric.storage.transaction_stored.bytes", unit: :byte, keep: for_stack), + stored_transactions: sum("electric.storage.transaction_stored.count", keep: for_stack), + stored_operations: sum("electric.storage.transaction_stored.operations", keep: for_stack), + total_used_storage_kb: + last_value("electric.storage.used", unit: {:byte, :kilobyte}, keep: for_stack), + total_shapes: last_value("electric.shapes.total_shapes.count", keep: for_stack), + active_shapes: + summary("electric.plug.serve_shape.monotonic_time", + unit: :unique, + reporter_options: [count_unique: :shape_handle], + keep: &(&1.status < 300 && for_stack.(&1)) + ), + unique_clients: + summary("electric.plug.serve_shape.monotonic_time", + unit: :unique, + reporter_options: [count_unique: :client_ip], + keep: &(&1.status < 300 && for_stack.(&1)) + ), + sync_requests: + counter("electric.plug.serve_shape.monotonic_time", + keep: &(&1[:live] != true && for_stack.(&1)) + ), + live_requests: + counter("electric.plug.serve_shape.monotonic_time", + keep: &(&1[:live] && for_stack.(&1)) + ), + served_bytes: sum("electric.plug.serve_shape.bytes", unit: :byte, keep: for_stack), + wal_size: summary("electric.postgres.replication.wal_size", unit: :byte, keep: for_stack) + ] + ] + end + + defp statsd_reporter_child_spec(nil, _opts), do: nil + + defp statsd_reporter_child_spec(host, opts) do + {TelemetryMetricsStatsd, + host: host, + formatter: :datadog, + global_tags: [instance_id: Electric.instance_id()], + metrics: statsd_metrics(opts)} + end + + defp prometheus_reporter_child_spec(false, _opts), do: nil + + defp prometheus_reporter_child_spec(true, opts) do + {TelemetryMetricsPrometheus.Core, + name: :"stack_prometheus_telemetry_#{stack_id(opts)}", metrics: prometheus_metrics(opts)} + end + + defp statsd_metrics(opts) do + [ + summary("plug.router_dispatch.stop.duration", + tags: [:route], + unit: {:native, :millisecond}, + keep: for_stack(opts) + ), + summary("plug.router_dispatch.exception.duration", + tags: [:route], + unit: {:native, :millisecond}, + keep: for_stack(opts) + ), + summary("electric.shape_cache.create_snapshot_task.stop.duration", + unit: {:native, :millisecond}, + keep: for_stack(opts) + ), + summary("electric.storage.make_new_snapshot.stop.duration", + unit: {:native, :millisecond}, + keep: for_stack(opts) + ), + summary("electric.querying.stream_initial_data.stop.duration", + unit: {:native, :millisecond}, + keep: for_stack(opts) + ) + ] + |> Enum.map(&%{&1 | tags: [:instance_id | &1.tags]}) + end + + defp prometheus_metrics(opts) do + [ + last_value("electric.postgres.replication.wal_size", unit: :byte, keep: for_stack(opts)), + last_value("electric.storage.used", unit: {:byte, :kilobyte}, keep: for_stack(opts)), + last_value("electric.shapes.total_shapes.count", keep: for_stack(opts)), + last_value("electric.postgres.replication.wal_size", unit: :byte, keep: for_stack(opts)), + counter("electric.postgres.replication.transaction_received.count", keep: for_stack(opts)), + sum("electric.postgres.replication.transaction_received.bytes", + unit: :byte, + keep: for_stack(opts) + ), + sum("electric.storage.transaction_stored.bytes", unit: :byte, keep: for_stack(opts)) + ] + end + + defp otel_metrics(opts) do + for_stack = for_stack(opts) + + [ + distribution("electric.plug.serve_shape.duration", + keep: &(&1[:live] != true && for_stack.(&1)) + ), + distribution("electric.shape_cache.create_snapshot_task.stop.duration", + unit: {:native, :millisecond}, + keep: for_stack + ), + distribution("electric.storage.make_new_snapshot.stop.duration", + unit: {:native, :millisecond}, + keep: for_stack + ) + ] ++ prometheus_metrics(opts) + end + + defp periodic_measurements(opts) do + [ + {__MODULE__, :count_shapes, [stack_id(opts)]}, + {__MODULE__, :get_total_disk_usage, [opts]}, + {Electric.Connection.Manager, :report_retained_wal_size, + [Electric.Connection.Manager.name(stack_id(opts))]} + ] + end + + def count_shapes(stack_id) do + Electric.ShapeCache.list_shapes(stack_id: stack_id) + |> length() + |> then( + &:telemetry.execute([:electric, :shapes, :total_shapes], %{count: &1}, %{stack_id: stack_id}) + ) + end + + def get_total_disk_usage(opts) do + storage = Electric.StackSupervisor.storage_mod_arg(Map.new(opts)) + + Electric.ShapeCache.Storage.get_total_disk_usage(storage) + |> then( + &:telemetry.execute([:electric, :storage], %{used: &1}, %{ + stack_id: opts[:stack_id] + }) + ) + catch + :exit, {:noproc, _} -> + :ok + end + + defp stack_id(opts) do + Keyword.fetch!(opts, :stack_id) + end + + def for_stack(opts) do + stack_id = stack_id(opts) + + fn metadata -> + metadata[:stack_id] == stack_id + end + end +end diff --git a/packages/sync-service/mix.exs b/packages/sync-service/mix.exs index e5481d4f7f..cbe0e34d79 100644 --- a/packages/sync-service/mix.exs +++ b/packages/sync-service/mix.exs @@ -98,6 +98,8 @@ defmodule Electric.MixProject do {:opentelemetry_telemetry, "~> 1.1"}, {:opentelemetry_semantic_conventions, "~> 1.27"}, {:otel_metric_exporter, "~> 0.2"}, + # For debugging the otel_metric_exporter check it out locally and uncomment the line below + # {:otel_metric_exporter, path: "../../../elixir-otel-metric-exporter"}, {:pg_query_ex, "0.5.3"}, {:plug, "~> 1.16"}, {:postgrex, "~> 0.19"}, diff --git a/packages/sync-service/mix.lock b/packages/sync-service/mix.lock index 4413a67d1a..a8467229ef 100644 --- a/packages/sync-service/mix.lock +++ b/packages/sync-service/mix.lock @@ -28,7 +28,7 @@ "makeup_elixir": {:hex, :makeup_elixir, "0.16.2", "627e84b8e8bf22e60a2579dad15067c755531fea049ae26ef1020cad58fe9578", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "41193978704763f6bbe6cc2758b84909e62984c7752b3784bd3c218bb341706b"}, "makeup_erlang": {:hex, :makeup_erlang, "1.0.1", "c7f58c120b2b5aa5fd80d540a89fdf866ed42f1f3994e4fe189abebeab610839", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "8a89a1eeccc2d798d6ea15496a6e4870b75e014d1af514b1b71fa33134f57814"}, "mime": {:hex, :mime, "2.0.6", "8f18486773d9b15f95f4f4f1e39b710045fa1de891fada4516559967276e4dc2", [:mix], [], "hexpm", "c9945363a6b26d747389aac3643f8e0e09d30499a138ad64fe8fd1d13d9b153e"}, - "mint": {:hex, :mint, "1.6.2", "af6d97a4051eee4f05b5500671d47c3a67dac7386045d87a904126fd4bbcea2e", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "5ee441dffc1892f1ae59127f74afe8fd82fda6587794278d924e4d90ea3d63f9"}, + "mint": {:hex, :mint, "1.7.1", "113fdb2b2f3b59e47c7955971854641c61f378549d73e829e1768de90fc1abf1", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "fceba0a4d0f24301ddee3024ae116df1c3f4bb7a563a731f45fdfeb9d39a231b"}, "mox": {:hex, :mox, "1.2.0", "a2cd96b4b80a3883e3100a221e8adc1b98e4c3a332a8fc434c39526babafd5b3", [:mix], [{:nimble_ownership, "~> 1.0", [hex: :nimble_ownership, repo: "hexpm", optional: false]}], "hexpm", "c7b92b3cc69ee24a7eeeaf944cd7be22013c52fcb580c1f33f50845ec821089a"}, "nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"}, "nimble_ownership": {:hex, :nimble_ownership, "1.0.0", "3f87744d42c21b2042a0aa1d48c83c77e6dd9dd357e425a038dd4b49ba8b79a1", [:mix], [], "hexpm", "7c16cc74f4e952464220a73055b557a273e8b1b7ace8489ec9d86e9ad56cb2cc"}, @@ -39,7 +39,7 @@ "opentelemetry_exporter": {:hex, :opentelemetry_exporter, "1.8.0", "5d546123230771ef4174e37bedfd77e3374913304cd6ea3ca82a2add49cd5d56", [:rebar3], [{:grpcbox, ">= 0.0.0", [hex: :grpcbox, repo: "hexpm", optional: false]}, {:opentelemetry, "~> 1.5.0", [hex: :opentelemetry, repo: "hexpm", optional: false]}, {:opentelemetry_api, "~> 1.4.0", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}, {:tls_certificate_check, "~> 1.18", [hex: :tls_certificate_check, repo: "hexpm", optional: false]}], "hexpm", "a1f9f271f8d3b02b81462a6bfef7075fd8457fdb06adff5d2537df5e2264d9af"}, "opentelemetry_semantic_conventions": {:hex, :opentelemetry_semantic_conventions, "1.27.0", "acd0194a94a1e57d63da982ee9f4a9f88834ae0b31b0bd850815fe9be4bbb45f", [:mix, :rebar3], [], "hexpm", "9681ccaa24fd3d810b4461581717661fd85ff7019b082c2dff89c7d5b1fc2864"}, "opentelemetry_telemetry": {:hex, :opentelemetry_telemetry, "1.1.1", "4a73bfa29d7780ffe33db345465919cef875034854649c37ac789eb8e8f38b21", [:mix, :rebar3], [{:opentelemetry_api, "~> 1.0", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "ee43b14e6866123a3ee1344e3c0d3d7591f4537542c2a925fcdbf46249c9b50b"}, - "otel_metric_exporter": {:hex, :otel_metric_exporter, "0.2.2", "4594d95d4cb5ee49549125feec1a10e388cfe055d8060779e98808989ee039eb", [:mix], [{:finch, "~> 0.19", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:nimble_options, "~> 1.1", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:protobuf, "~> 0.13.0", [hex: :protobuf, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 1.0", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "757a28627e5f036543e9f5711051c0e18511d65cdd063e08181b44580d90eac1"}, + "otel_metric_exporter": {:hex, :otel_metric_exporter, "0.2.4", "1e476848c52e7967d4e938603b0db840798ae3525407e85596508885e68374ce", [:mix], [{:finch, "~> 0.19", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:nimble_options, "~> 1.1", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:protobuf, "~> 0.13.0", [hex: :protobuf, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 1.0", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "bbb8667cae543df822def74625d1e1bf62206a5349da14604b8a9a563507a080"}, "pg_query_ex": {:hex, :pg_query_ex, "0.5.3", "84bf09f4ea10ada6e1cbfd739ccb9ffb6e5b21d87ab81cf52a42fefcc1808566", [:make, :mix], [{:elixir_make, "~> 0.4", [hex: :elixir_make, repo: "hexpm", optional: false]}, {:protox, "~> 1.7", [hex: :protox, repo: "hexpm", optional: false]}], "hexpm", "ec0554d6d287da4cc15cc773577ef61cf41d5d6fcc784bb85f6209439cb246a7"}, "plug": {:hex, :plug, "1.16.1", "40c74619c12f82736d2214557dedec2e9762029b2438d6d175c5074c933edc9d", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "a13ff6b9006b03d7e33874945b2755253841b238c34071ed85b0e86057f8cddc"}, "plug_crypto": {:hex, :plug_crypto, "2.1.0", "f44309c2b06d249c27c8d3f65cfe08158ade08418cf540fd4f72d4d6863abb7b", [:mix], [], "hexpm", "131216a4b030b8f8ce0f26038bc4421ae60e4bb95c5cf5395e1421437824c4fa"}, @@ -52,7 +52,7 @@ "sentry": {:hex, :sentry, "10.8.0", "1e8cc0ef21401e5914e6fc2f37489d6c685d31a0556dbd8ab4709cc1587a7232", [:mix], [{:hackney, "~> 1.8", [hex: :hackney, repo: "hexpm", optional: true]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: true]}, {:nimble_options, "~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_ownership, "~> 0.3.0 or ~> 1.0", [hex: :nimble_ownership, repo: "hexpm", optional: false]}, {:phoenix, "~> 1.6", [hex: :phoenix, repo: "hexpm", optional: true]}, {:phoenix_live_view, "~> 0.20", [hex: :phoenix_live_view, repo: "hexpm", optional: true]}, {:plug, "~> 1.6", [hex: :plug, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "92549e7ba776b7ccfed4e74d58987272d37d99606b130e4141bc015a1a8e4235"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"}, "telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"}, - "telemetry_metrics": {:hex, :telemetry_metrics, "1.0.0", "29f5f84991ca98b8eb02fc208b2e6de7c95f8bb2294ef244a176675adc7775df", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "f23713b3847286a534e005126d4c959ebcca68ae9582118ce436b521d1d47d5d"}, + "telemetry_metrics": {:hex, :telemetry_metrics, "1.1.0", "5bd5f3b5637e0abea0426b947e3ce5dd304f8b3bc6617039e2b5a008adc02f8f", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "e7b79e8ddfde70adb6db8a6623d1778ec66401f366e9a8f5dd0955c56bc8ce67"}, "telemetry_metrics_prometheus_core": {:hex, :telemetry_metrics_prometheus_core, "1.2.1", "c9755987d7b959b557084e6990990cb96a50d6482c683fb9622a63837f3cd3d8", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6 or ~> 1.0", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "5e2c599da4983c4f88a33e9571f1458bf98b0cf6ba930f1dc3a6e8cf45d5afb6"}, "telemetry_metrics_statsd": {:hex, :telemetry_metrics_statsd, "0.7.1", "3502235bb5b35ce50d608bf0f34369ef76eb92a4dbc8708c7e8780ca0da2d53e", [:mix], [{:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6 or ~> 1.0", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "06338d9dc3b4a202f11a6e706fd3feba4c46100d0aca23688dea0b8f801c361f"}, "telemetry_poller": {:hex, :telemetry_poller, "1.1.0", "58fa7c216257291caaf8d05678c8d01bd45f4bdbc1286838a28c4bb62ef32999", [:rebar3], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "9eb9d9cbfd81cbd7cdd24682f8711b6e2b691289a0de6826e58452f28c103c8f"}, diff --git a/packages/sync-service/test/electric/telemetry_test.exs b/packages/sync-service/test/electric/telemetry/application_telemetry_test.exs similarity index 81% rename from packages/sync-service/test/electric/telemetry_test.exs rename to packages/sync-service/test/electric/telemetry/application_telemetry_test.exs index 765f57c347..03ea78969c 100644 --- a/packages/sync-service/test/electric/telemetry_test.exs +++ b/packages/sync-service/test/electric/telemetry/application_telemetry_test.exs @@ -1,6 +1,8 @@ defmodule Electric.TelemetryTest do use ExUnit.Case, async: true + alias Electric.Telemetry.ApplicationTelemetry + describe "get_system_memory_usage" do test "returns calculated memory stats" do case :os.type() do @@ -11,7 +13,7 @@ defmodule Electric.TelemetryTest do free_memory: _, used_memory: _, resident_memory: _ - } = Electric.Telemetry.get_system_memory_usage([]) + } = ApplicationTelemetry.get_system_memory_usage() _ -> assert %{ @@ -25,7 +27,7 @@ defmodule Electric.TelemetryTest do total_swap: _, free_swap: _, used_swap: _ - } = Electric.Telemetry.get_system_memory_usage([]) + } = ApplicationTelemetry.get_system_memory_usage() end end end