Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Do not await for responses when recovering pub filters #2265

Merged
merged 5 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/healthy-seas-bathe.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@core/sync-service": patch
---

- Do not await for responses while recovering publication filters.
- Remove publication update debounce time - simply wait until end of current process message queue.

Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ defmodule Electric.Replication.PublicationManager do

@retry_timeout 300
@max_retries 3
@default_debounce_timeout 50
@default_debounce_timeout 0
msfstef marked this conversation as resolved.
Show resolved Hide resolved

@relation_counter :relation_counter
@relation_where :relation_where
Expand Down Expand Up @@ -97,7 +97,8 @@ defmodule Electric.Replication.PublicationManager do
@spec recover_shape(Shape.t(), Keyword.t()) :: :ok
def recover_shape(shape, opts \\ []) do
server = Access.get(opts, :server, name(opts))
GenServer.call(server, {:recover_shape, shape})
GenServer.cast(server, {:recover_shape, shape})
:ok
end

@spec remove_shape(Shape.t(), Keyword.t()) :: :ok
Expand All @@ -113,11 +114,8 @@ defmodule Electric.Replication.PublicationManager do
@spec refresh_publication(Keyword.t()) :: :ok
def refresh_publication(opts \\ []) do
server = Access.get(opts, :server, name(opts))

case GenServer.call(server, :refresh_publication) do
:ok -> :ok
{:error, err} -> raise err
end
GenServer.cast(server, :refresh_publication)
:ok
end

def start_link(opts) do
Expand Down Expand Up @@ -175,20 +173,20 @@ defmodule Electric.Replication.PublicationManager do
{:noreply, state}
end

def handle_call({:recover_shape, shape}, _from, state) do
state = update_relation_filters_for_shape(shape, :add, state)
{:reply, :ok, state}
end

def handle_call({:remove_shape, shape}, from, state) do
state = update_relation_filters_for_shape(shape, :remove, state)
state = add_waiter(from, state)
state = schedule_update_publication(state.update_debounce_timeout, state)
{:noreply, state}
end

def handle_call(:refresh_publication, from, state) do
state = add_waiter(from, state)
@impl true
def handle_cast({:recover_shape, shape}, state) do
state = update_relation_filters_for_shape(shape, :add, state)
{:noreply, state}
end

def handle_cast(:refresh_publication, state) do
state = schedule_update_publication(state.update_debounce_timeout, state)
{:noreply, state}
end
Expand Down Expand Up @@ -265,7 +263,12 @@ defmodule Electric.Replication.PublicationManager do
%{state | pg_version: pg_version}

{:error, err} ->
Logger.error("Failed to get PG version, retrying after timeout: #{inspect(err)}")
err_msg = "Failed to get PG version, retrying after timeout: #{inspect(err)}"

if %DBConnection.ConnectionError{reason: :queue_timeout} == err,
do: Logger.warning(err_msg),
else: Logger.error(err_msg)

Process.sleep(@retry_timeout)
get_pg_version(state)
end
Expand Down
2 changes: 1 addition & 1 deletion packages/sync-service/lib/electric/shape_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ defmodule Electric.ShapeCache do
{:ok, _pid, _snapshot_xmin, _latest_offset} = start_shape(shape_handle, shape, state)

# recover publication filter state
:ok = publication_manager.recover_shape(shape, publication_manager_opts)
publication_manager.recover_shape(shape, publication_manager_opts)
rescue
e ->
Logger.error("Failed to recover shape #{shape_handle}: #{inspect(e)}")
Expand Down
Loading