Skip to content

Commit

Permalink
More determinism in the invalidated-replication-slot.lux test (#2318)
Browse files Browse the repository at this point in the history
Monitor the stack supervisor process to guarantee its eventual shutdown.
Previously, it might not have shut down in time before the liveness
check performed in the integration test.
  • Loading branch information
alco authored Feb 10, 2025
1 parent 2d9a636 commit 2e28583
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 8 deletions.
35 changes: 27 additions & 8 deletions integration-tests/tests/invalidated-replication-slot.lux
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,29 @@

###

[macro capture_stack_supervisor_pid]
?pid=(<\d+\.\d+\.\d+>) \[debug\] The single StackSupervisor is initializing...
# All interactive commands and matches are performed within a single IEx session,
# so we can assign local variables here which is less of a hassle than using lux variables.
!stack_supervisor_pid = :erlang.list_to_pid(~c"$1")
!stack_supervisor_mon = Process.monitor(stack_supervisor_pid)
[endmacro]

[macro verify_connection_and_stack_supervisors_shutdown stack_id invalidated_slot_error]
??$invalidated_slot_error
??[error] Stopping connection supervisor with stack_id=$stack_id due to an unrecoverable error

!receive do \
{:DOWN, ^stack_supervisor_mon, :process, ^stack_supervisor_pid, _} -> IO.puts "stack supervisor is down" \
end
??stack supervisor is down

!IO.puts("Stack supervisor pid: #{inspect Process.whereis(Electric.StackSupervisor)}")
??Stack supervisor pid: nil
[endmacro]

###

## Start a new Postgres cluster configured for easy replication slot invalidation.
[invoke setup_pg \
"--wal-segsize=1" \
Expand All @@ -24,6 +47,8 @@
[invoke setup_electric]

[shell electric]
[invoke capture_stack_supervisor_pid]

??[info] Starting replication from postgres

# Verify that the stack supervisor is registered using regular process registration. If we
Expand All @@ -40,14 +65,6 @@
[shell pg]
?invalidating slot "electric_slot_integration" because its restart_lsn [\d\w]+/[\d\w]+ exceeds max_slot_wal_keep_size

[macro verify_connection_and_stack_supervisors_shutdown stack_id invalidated_slot_error]
??$invalidated_slot_error
??[error] Stopping connection supervisor with stack_id=$stack_id due to an unrecoverable error

!IO.puts("Stack supervisor pid: #{inspect Process.whereis(Electric.StackSupervisor)}")
??Stack supervisor pid: nil
[endmacro]

## Observe the fatal connection error.
[shell electric]
# Reset the failure pattern because we'll be matching on an error.
Expand All @@ -59,6 +76,8 @@
!:ok = Application.stop(:electric)
!:ok = Application.start(:electric)

[invoke capture_stack_supervisor_pid]

??[info] Starting replication from postgres

[invoke verify_connection_and_stack_supervisors_shutdown $stack_id $invalidated_slot_error]
Expand Down
4 changes: 4 additions & 0 deletions packages/sync-service/lib/electric/stack_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ defmodule Electric.StackSupervisor do

alias Electric.ShapeCache.LogChunker

require Logger

@opts_schema NimbleOptions.new!(
name: [type: :any, required: false],
stack_id: [type: :string, required: true],
Expand Down Expand Up @@ -201,6 +203,8 @@ defmodule Electric.StackSupervisor do

@impl true
def init(%{stack_id: stack_id} = config) do
Logger.debug("The single StackSupervisor is initializing...")

Process.set_label({:stack_supervisor, stack_id})
Logger.metadata(stack_id: stack_id)
Electric.Telemetry.Sentry.set_tags_context(stack_id: stack_id)
Expand Down

0 comments on commit 2e28583

Please sign in to comment.