Skip to content

Commit

Permalink
feat(sync-service): Connect shape consumer spans to replication trace…
Browse files Browse the repository at this point in the history
…s and add OTEL metrics (#2288)

This PR improves observability by:
- Connecting shape consumer spans to replication traces
- Adding `electric.shapes.total_shapes.count` metric to OTEL metrics
(Closes #2269 although I have not added "shapes that have an active
subscriber" as this information is already available in the traces)
- Adding `electric.storage.used` metric to OTEL metrics
- Adding CPU % utilisation per core telemetry and exposing them to the
OTEL metrics. CPU metrics are already available, but % utilisation per
core seems a simple one to comprehend and in a form our customers will
understand.
  • Loading branch information
robacourt authored Feb 4, 2025
1 parent abbde9c commit c4e4e75
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 28 deletions.
5 changes: 5 additions & 0 deletions .changeset/smooth-lions-joke.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Connect shape consumer spans to replication traces and add OTEL metrics
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ defmodule Electric.Replication.ShapeLogCollector do
# determining how long a write should reasonably take and if that fails
# it should raise.
def store_transaction(%Transaction{} = txn, server) do
GenStage.call(server, {:new_txn, txn}, :infinity)
trace_context = OpenTelemetry.get_current_context()
GenStage.call(server, {:new_txn, txn, trace_context}, :infinity)
end

def handle_relation_msg(%Changes.Relation{} = rel, server) do
GenServer.call(server, {:relation_msg, rel}, :infinity)
trace_context = OpenTelemetry.get_current_context()
GenServer.call(server, {:relation_msg, rel, trace_context}, :infinity)
end

def init(opts) do
Expand Down Expand Up @@ -94,14 +96,17 @@ defmodule Electric.Replication.ShapeLogCollector do
{:noreply, [], remove_subscription(from, state)}
end

def handle_call({:new_txn, %Transaction{xid: xid, lsn: lsn} = txn}, from, state) do
def handle_call({:new_txn, %Transaction{xid: xid, lsn: lsn} = txn, trace_context}, from, state) do
OpenTelemetry.set_current_context(trace_context)

Logger.info("Received transaction #{xid} from Postgres at #{lsn}")
Logger.debug(fn -> "Txn received in ShapeLogCollector: #{inspect(txn)}" end)

handle_transaction(txn, from, state)
end

def handle_call({:relation_msg, %Relation{} = rel}, from, state) do
def handle_call({:relation_msg, %Relation{} = rel, trace_context}, from, state) do
OpenTelemetry.set_current_context(trace_context)
Logger.info("Received relation #{inspect(rel.schema)}.#{inspect(rel.table)}")
Logger.debug(fn -> "Relation received in ShapeLogCollector: #{inspect(rel)}" end)

Expand Down
15 changes: 10 additions & 5 deletions packages/sync-service/lib/electric/shapes/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,12 @@ defmodule Electric.Shapes.Consumer do

# `Shapes.Dispatcher` only works with single-events, so we can safely assert
# that here
def handle_events([%Changes.Relation{id: id}], _from, %{shape: %{root_table_id: id}} = state) do
def handle_events([{event, trace_context}], _from, state) do
OpenTelemetry.set_current_context(trace_context)
handle_event(event, state)
end

defp handle_event(%Changes.Relation{id: id}, %{shape: %{root_table_id: id}} = state) do
%{shape: %{root_table: root_table}, inspector: inspector} = state

Logger.info(
Expand All @@ -211,20 +216,20 @@ defmodule Electric.Shapes.Consumer do
end

# Buffer incoming transactions until we know our xmin
def handle_events([%Transaction{xid: xid}] = txns, _from, %{snapshot_xmin: nil} = state) do
defp handle_event(%Transaction{xid: xid} = txn, %{snapshot_xmin: nil} = state) do
Logger.debug(fn ->
"Consumer for #{state.shape_handle} buffering 1 transaction with xid #{xid}"
end)

{:noreply, [], %{state | buffer: state.buffer ++ txns}}
{:noreply, [], %{state | buffer: state.buffer ++ [txn]}}
end

def handle_events([%Transaction{}] = txns, _from, state) do
defp handle_event(%Transaction{} = txn, state) do
OpenTelemetry.with_span(
"shape_write.consumer.handle_txns",
[snapshot_xmin: state.snapshot_xmin],
state.stack_id,
fn -> handle_txns(txns, state) end
fn -> handle_txns([txn], state) end
)
end

Expand Down
5 changes: 4 additions & 1 deletion packages/sync-service/lib/electric/shapes/dispatcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ defmodule Electric.Shapes.Dispatcher do

alias Electric.Shapes.Filter
alias Electric.Shapes.Partitions
alias Electric.Telemetry.OpenTelemetry

defmodule State do
defstruct [:waiting, :pending, :subscribers, :filter, :partitions, :pids]
Expand Down Expand Up @@ -135,11 +136,13 @@ defmodule Electric.Shapes.Dispatcher do
def dispatch([event], _length, %State{waiting: 0, subscribers: subscribers} = state) do
{partitions, event} = Partitions.handle_event(state.partitions, event)

context = OpenTelemetry.get_current_context()

{waiting, pending} =
state.filter
|> Filter.affected_shapes(event)
|> Enum.reduce({0, MapSet.new()}, fn {pid, ref} = subscriber, {waiting, pending} ->
Process.send(pid, {:"$gen_consumer", {self(), ref}, [event]}, [:noconnect])
Process.send(pid, {:"$gen_consumer", {self(), ref}, [{event, context}]}, [:noconnect])
{waiting + 1, MapSet.put(pending, subscriber)}
end)
|> case do
Expand Down
27 changes: 26 additions & 1 deletion packages/sync-service/lib/electric/telemetry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ defmodule Electric.Telemetry do

defp otel_metrics() do
[
last_value("system.cpu.core_count"),
last_value("system.cpu.utilization.total"),
last_value("electric.storage.used", unit: {:byte, :kilobyte}),
last_value("electric.shapes.total_shapes.count"),
last_value("system.load_percent.avg1"),
last_value("system.load_percent.avg5"),
last_value("system.load_percent.avg15"),
Expand All @@ -222,7 +226,9 @@ defmodule Electric.Telemetry do
distribution("electric.storage.make_new_snapshot.stop.duration",
unit: {:native, :millisecond}
)
] ++ prometheus_metrics()
] ++
prometheus_metrics() ++
Enum.map(0..63, &last_value("system.cpu.utilization.core_#{&1}"))
end

defp periodic_measurements(opts) do
Expand All @@ -232,6 +238,7 @@ defmodule Electric.Telemetry do
# A module, function and arguments to be invoked periodically.
{__MODULE__, :uptime_event, []},
{__MODULE__, :count_shapes, [stack_id]},
{__MODULE__, :cpu_utilization, []},
{__MODULE__, :get_total_disk_usage, [opts]},
{__MODULE__, :get_system_load_average, [opts]},
{__MODULE__, :get_system_memory_usage, [opts]},
Expand Down Expand Up @@ -268,6 +275,18 @@ defmodule Electric.Telemetry do
:ok
end

def cpu_utilization do
cores =
:cpu_sup.util([:per_cpu])
|> Map.new(fn {cpu_index, busy, _free, _misc} -> {:"core_#{cpu_index}", busy} end)

cores
|> Map.put(:total, cores |> Map.values() |> mean())
|> then(&:telemetry.execute([:system, :cpu, :utilization], &1))

:telemetry.execute([:system, :cpu], %{core_count: Enum.count(cores)})
end

def get_system_load_average(opts) do
cores = :erlang.system_info(:logical_processors)

Expand Down Expand Up @@ -422,4 +441,10 @@ defmodule Electric.Telemetry do

%{}
end

defp mean([]), do: nil

defp mean(list) when is_list(list) do
Enum.sum(list) / Enum.count(list)
end
end
32 changes: 16 additions & 16 deletions packages/sync-service/test/electric/shapes/dispatcher_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ defmodule Electric.Shapes.DispatcherTest do

{:ok, [], dispatcher} = D.dispatch([event], 1, dispatcher)

assert_receive {C, ^ref1, [^event]}
assert_receive {C, ^ref1, [{^event, _ctx}]}
assert {:ok, 0, dispatcher} = D.ask(1, c1, dispatcher)
assert_receive {C, ^ref2, [^event]}
assert_receive {C, ^ref2, [{^event, _ctx}]}
assert {:ok, 0, dispatcher} = D.ask(1, c2, dispatcher)
assert_receive {C, ^ref3, [^event]}
assert_receive {C, ^ref3, [{^event, _ctx}]}
# now that all consumers have received and processed the message we should
# forward demand onto the producer
assert {:ok, 1, _dispatcher} = D.ask(1, c3, dispatcher)
Expand All @@ -90,11 +90,11 @@ defmodule Electric.Shapes.DispatcherTest do

{:ok, [], dispatcher} = D.dispatch([event], 1, dispatcher)

refute_receive {C, ^ref1, [^event]}
refute_receive {C, ^ref1, [{^event, _ctx}]}

assert_receive {C, ^ref2, [^event]}
assert_receive {C, ^ref2, [{^event, _ctx}]}
assert {:ok, 0, dispatcher} = D.ask(1, c2, dispatcher)
assert_receive {C, ^ref3, [^event]}
assert_receive {C, ^ref3, [{^event, _ctx}]}
assert {:ok, 1, _dispatcher} = D.ask(1, c3, dispatcher)
end

Expand All @@ -113,14 +113,14 @@ defmodule Electric.Shapes.DispatcherTest do

{:ok, [], dispatcher} = D.dispatch([event], 1, dispatcher)

assert_receive {C, ^ref1, [^event]}
assert_receive {C, ^ref1, [{^event, _ctx}]}
assert {:ok, 0, dispatcher} = D.ask(1, c1, dispatcher)

{:ok, 0, dispatcher} = D.cancel(c1, dispatcher)

assert_receive {C, ^ref2, [^event]}
assert_receive {C, ^ref2, [{^event, _ctx}]}
assert {:ok, 0, dispatcher} = D.ask(1, c2, dispatcher)
assert_receive {C, ^ref3, [^event]}
assert_receive {C, ^ref3, [{^event, _ctx}]}
assert {:ok, 1, _dispatcher} = D.ask(1, c3, dispatcher)
end

Expand All @@ -141,14 +141,14 @@ defmodule Electric.Shapes.DispatcherTest do

{:ok, 0, dispatcher} = D.cancel(c2, dispatcher)

assert_receive {C, ^ref1, [^event]}
assert_receive {C, ^ref1, [{^event, _ctx}]}
assert {:ok, 0, dispatcher} = D.ask(1, c1, dispatcher)

# we've cancelled but haven't killed the pid (and even if we had, it will
# have likely already sent the confirmation message)
assert_receive {C, ^ref2, [^event]}
assert_receive {C, ^ref2, [{^event, _ctx}]}

assert_receive {C, ^ref3, [^event]}
assert_receive {C, ^ref3, [{^event, _ctx}]}
assert {:ok, 1, _dispatcher} = D.ask(1, c3, dispatcher)
end

Expand All @@ -168,7 +168,7 @@ defmodule Electric.Shapes.DispatcherTest do

{:ok, [], dispatcher} = D.dispatch([event], 1, dispatcher)

assert_receive {C, ^ref1, [^event]}
assert_receive {C, ^ref1, [{^event, _ctx}]}
assert {:ok, 0, dispatcher} = D.ask(1, c1, dispatcher)

{:ok, 0, dispatcher} = D.cancel(c2, dispatcher)
Expand All @@ -189,9 +189,9 @@ defmodule Electric.Shapes.DispatcherTest do
event = @transaction

{:ok, [], dispatcher} = D.dispatch([event], 1, dispatcher)
refute_receive {C, ^ref1, [^event]}
refute_receive {C, ^ref2, [^event]}
refute_receive {C, ^ref3, [^event]}
refute_receive {C, ^ref1, [{^event, _ctx}]}
refute_receive {C, ^ref2, [{^event, _ctx}]}
refute_receive {C, ^ref3, [{^event, _ctx}]}
# none of the subscribers want the event, but we need to simulate the full cycle
# so the dispatcher should generate some fake demand. This goes to the
# last subscriber, which is at the head of the list
Expand Down
2 changes: 1 addition & 1 deletion packages/sync-service/test/support/transaction_consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ defmodule Support.TransactionConsumer do
{:manual, {id, from, parent}}
end

def handle_events([txn], _from, {id, subscription, parent}) do
def handle_events([{txn, _ctx}], _from, {id, subscription, parent}) do
send(parent, {__MODULE__, {id, self()}, [txn]})
GenStage.ask(subscription, 1)
{:noreply, [], {id, subscription, parent}}
Expand Down

0 comments on commit c4e4e75

Please sign in to comment.