diff --git a/integration-tests/tests/invalidated-replication-slot.lux b/integration-tests/tests/invalidated-replication-slot.lux index abf1653fef..025500bbfb 100644 --- a/integration-tests/tests/invalidated-replication-slot.lux +++ b/integration-tests/tests/invalidated-replication-slot.lux @@ -15,6 +15,29 @@ ### +[macro capture_stack_supervisor_pid] + ?pid=(<\d+\.\d+\.\d+>) \[debug\] The single StackSupervisor is initializing... + # All interactive commands and matches are performed within a single IEx session, + # so we can assign local variables here which is less of a hassle than using lux variables. + !stack_supervisor_pid = :erlang.list_to_pid(~c"$1") + !stack_supervisor_mon = Process.monitor(stack_supervisor_pid) +[endmacro] + +[macro verify_connection_and_stack_supervisors_shutdown stack_id invalidated_slot_error] + ??$invalidated_slot_error + ??[error] Stopping connection supervisor with stack_id=$stack_id due to an unrecoverable error + + !receive do \ + {:DOWN, ^stack_supervisor_mon, :process, ^stack_supervisor_pid, _} -> IO.puts "stack supervisor is down" \ + end + ??stack supervisor is down + + !IO.puts("Stack supervisor pid: #{inspect Process.whereis(Electric.StackSupervisor)}") + ??Stack supervisor pid: nil +[endmacro] + +### + ## Start a new Postgres cluster configured for easy replication slot invalidation. [invoke setup_pg \ "--wal-segsize=1" \ @@ -24,6 +47,8 @@ [invoke setup_electric] [shell electric] + [invoke capture_stack_supervisor_pid] + ??[info] Starting replication from postgres # Verify that the stack supervisor is registered using regular process registration. If we @@ -40,14 +65,6 @@ [shell pg] ?invalidating slot "electric_slot_integration" because its restart_lsn [\d\w]+/[\d\w]+ exceeds max_slot_wal_keep_size -[macro verify_connection_and_stack_supervisors_shutdown stack_id invalidated_slot_error] - ??$invalidated_slot_error - ??[error] Stopping connection supervisor with stack_id=$stack_id due to an unrecoverable error - - !IO.puts("Stack supervisor pid: #{inspect Process.whereis(Electric.StackSupervisor)}") - ??Stack supervisor pid: nil -[endmacro] - ## Observe the fatal connection error. [shell electric] # Reset the failure pattern because we'll be matching on an error. @@ -59,6 +76,8 @@ !:ok = Application.stop(:electric) !:ok = Application.start(:electric) + [invoke capture_stack_supervisor_pid] + ??[info] Starting replication from postgres [invoke verify_connection_and_stack_supervisors_shutdown $stack_id $invalidated_slot_error] diff --git a/packages/sync-service/lib/electric/stack_supervisor.ex b/packages/sync-service/lib/electric/stack_supervisor.ex index a5d1028814..a74b1d64db 100644 --- a/packages/sync-service/lib/electric/stack_supervisor.ex +++ b/packages/sync-service/lib/electric/stack_supervisor.ex @@ -34,6 +34,8 @@ defmodule Electric.StackSupervisor do alias Electric.ShapeCache.LogChunker + require Logger + @opts_schema NimbleOptions.new!( name: [type: :any, required: false], stack_id: [type: :string, required: true], @@ -201,6 +203,8 @@ defmodule Electric.StackSupervisor do @impl true def init(%{stack_id: stack_id} = config) do + Logger.debug("The single StackSupervisor is initializing...") + Process.set_label({:stack_supervisor, stack_id}) Logger.metadata(stack_id: stack_id) Electric.Telemetry.Sentry.set_tags_context(stack_id: stack_id)