Skip to content

Commit

Permalink
fix: Do not await for responses when recovering pub filters (#2265)
Browse files Browse the repository at this point in the history
The `recover_shape` and `refresh_publication` are in the `init` path of
`ShapeCache`, and there is no reason why we would be waiting for
responses.

The `recover_shape` has no expected failure as it simply updates a map
with appropriate counters.

The `refresh_publication` will schedule a publication update, but we
don't have to wait for it to finish to consider `ShapeCache` ready, as
the shape consumers are already up and running and any new shapes will
trigger another publication update after this one.

These changes are based off of Sentry errors noticed from the cloud.

I've also set the publication update debounce time to 0ms, so now it
simply waits until end of current process message queue. This addresses
an unnecessary delay in `prepare_table` calls (@robacourt), since even
when there is no contention it would still debounce for 50ms. I think
that simply reinserting the update at the end of the message queue
should still handle things pretty well but we can see the benchmarks.
  • Loading branch information
msfstef authored Jan 29, 2025
1 parent a09a7a7 commit eccdf9f
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 16 deletions.
7 changes: 7 additions & 0 deletions .changeset/healthy-seas-bathe.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@core/sync-service": patch
---

- Do not await for responses while recovering publication filters.
- Remove publication update debounce time - simply wait until end of current process message queue.

Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,12 @@ defmodule Electric.Replication.PublicationManager do

@retry_timeout 300
@max_retries 3
@default_debounce_timeout 50

# The default debounce timeout is 0, which means that the publication update
# will be scheduled immediately to run at the end of the current process
# mailbox, but we are leaving this configurable in case we want larger
# windows to aggregate shape filter updates
@default_debounce_timeout 0

@relation_counter :relation_counter
@relation_where :relation_where
Expand Down Expand Up @@ -97,7 +102,8 @@ defmodule Electric.Replication.PublicationManager do
@spec recover_shape(Shape.t(), Keyword.t()) :: :ok
def recover_shape(shape, opts \\ []) do
server = Access.get(opts, :server, name(opts))
GenServer.call(server, {:recover_shape, shape})
GenServer.cast(server, {:recover_shape, shape})
:ok
end

@spec remove_shape(Shape.t(), Keyword.t()) :: :ok
Expand All @@ -113,11 +119,8 @@ defmodule Electric.Replication.PublicationManager do
@spec refresh_publication(Keyword.t()) :: :ok
def refresh_publication(opts \\ []) do
server = Access.get(opts, :server, name(opts))

case GenServer.call(server, :refresh_publication) do
:ok -> :ok
{:error, err} -> raise err
end
GenServer.cast(server, :refresh_publication)
:ok
end

def start_link(opts) do
Expand Down Expand Up @@ -175,20 +178,20 @@ defmodule Electric.Replication.PublicationManager do
{:noreply, state}
end

def handle_call({:recover_shape, shape}, _from, state) do
state = update_relation_filters_for_shape(shape, :add, state)
{:reply, :ok, state}
end

def handle_call({:remove_shape, shape}, from, state) do
state = update_relation_filters_for_shape(shape, :remove, state)
state = add_waiter(from, state)
state = schedule_update_publication(state.update_debounce_timeout, state)
{:noreply, state}
end

def handle_call(:refresh_publication, from, state) do
state = add_waiter(from, state)
@impl true
def handle_cast({:recover_shape, shape}, state) do
state = update_relation_filters_for_shape(shape, :add, state)
{:noreply, state}
end

def handle_cast(:refresh_publication, state) do
state = schedule_update_publication(state.update_debounce_timeout, state)
{:noreply, state}
end
Expand Down Expand Up @@ -265,7 +268,12 @@ defmodule Electric.Replication.PublicationManager do
%{state | pg_version: pg_version}

{:error, err} ->
Logger.error("Failed to get PG version, retrying after timeout: #{inspect(err)}")
err_msg = "Failed to get PG version, retrying after timeout: #{inspect(err)}"

if %DBConnection.ConnectionError{reason: :queue_timeout} == err,
do: Logger.warning(err_msg),
else: Logger.error(err_msg)

Process.sleep(@retry_timeout)
get_pg_version(state)
end
Expand Down
2 changes: 1 addition & 1 deletion packages/sync-service/lib/electric/shape_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ defmodule Electric.ShapeCache do
{:ok, _pid, _snapshot_xmin, _latest_offset} = start_shape(shape_handle, shape, state)

# recover publication filter state
:ok = publication_manager.recover_shape(shape, publication_manager_opts)
publication_manager.recover_shape(shape, publication_manager_opts)
rescue
e ->
Logger.error("Failed to recover shape #{shape_handle}: #{inspect(e)}")
Expand Down

0 comments on commit eccdf9f

Please sign in to comment.