Skip to content

Commit

Permalink
Improve error handling when server goes down
Browse files Browse the repository at this point in the history
Rearranged children order so consumers don't go down due to rest_for_one strategy.
  • Loading branch information
Maruika Wei committed Aug 4, 2020
1 parent ce729a5 commit 877b952
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 9 deletions.
12 changes: 6 additions & 6 deletions lib/itk/queue.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,7 @@ defmodule ITKQueue do
[
ITKQueue.ConnectionPool,
ITKQueue.PublisherPool,
%{
id: ITKQueue.RetryPublisher,
start: {ITKQueue.RetryPublisher, :start_link, [[], ITKQueue.RetryPublisher]}
},
ITKQueue.ConsumerSupervisor,
%{
id: ITKQueue.ConsumerConnection,
start:
Expand All @@ -51,8 +48,11 @@ defmodule ITKQueue do
ITKQueue.ConsumerConnection
]}
},
ITKQueue.ConsumerSupervisor,
ITKQueue.Workers
ITKQueue.Workers,
%{
id: ITKQueue.RetryPublisher,
start: {ITKQueue.RetryPublisher, :start_link, [[], ITKQueue.RetryPublisher]}
}
]
end

Expand Down
6 changes: 3 additions & 3 deletions lib/itk/queue/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ defmodule ITKQueue.Consumer do
connection -> {:ok, connection}
end
catch
_ ->
_, _ ->
{:error, :connection_lost}
end

Expand All @@ -107,8 +107,8 @@ defmodule ITKQueue.Consumer do

{:ok, _} = AMQP.Basic.consume(channel, queue_name, self())
{:ok, channel}
rescue
e -> e
catch
_, e -> e
end

defp subscribe(subscription = %Subscription{queue_name: queue_name, routing_key: routing_key}) do
Expand Down
4 changes: 4 additions & 0 deletions lib/itk/queue/pub_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ defmodule ITKQueue.PubChannel do
Logger.info("Publisher channel #{inspect(self())} opened on conn #{inspect(conn)}")
{:noreply, %{state | chan: chan, status: :connected}}
end)
catch
_, e ->
Process.send_after(self(), :connect, @reconnect_interval)
{:noreply, state}
end

def handle_info(:confirm, state = %{status: :connected, chan: chan, pending: pending}) do
Expand Down
5 changes: 5 additions & 0 deletions lib/itk/queue/retry_publisher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ defmodule ITKQueue.RetryPublisher do
Logger.info("RetryPublisher channel #{inspect(self())} opened on conn #{inspect(conn)}")
{:noreply, %{state | chan: chan, status: :connected}}
end)
catch
_, e ->
Logger.warn("RetryPublisher connect error: #{inspect(e)}")
Process.send_after(self(), :connect, @reconnect_interval)
{:noreply, state}
end

def handle_info(:confirm, state = %{status: :connected, chan: chan}) do
Expand Down

0 comments on commit 877b952

Please sign in to comment.