From 877b9527c6be9c8a733bb0d5ba91e8d7093f13b0 Mon Sep 17 00:00:00 2001 From: Maruika Wei Date: Tue, 4 Aug 2020 16:08:30 -0400 Subject: [PATCH] Improve error handling when server goes down Rearranged children order so consumers don't go down due to rest_for_one strategy. --- lib/itk/queue.ex | 12 ++++++------ lib/itk/queue/consumer.ex | 6 +++--- lib/itk/queue/pub_channel.ex | 4 ++++ lib/itk/queue/retry_publisher.ex | 5 +++++ 4 files changed, 18 insertions(+), 9 deletions(-) diff --git a/lib/itk/queue.ex b/lib/itk/queue.ex index fa4870c..f792bef 100644 --- a/lib/itk/queue.ex +++ b/lib/itk/queue.ex @@ -38,10 +38,7 @@ defmodule ITKQueue do [ ITKQueue.ConnectionPool, ITKQueue.PublisherPool, - %{ - id: ITKQueue.RetryPublisher, - start: {ITKQueue.RetryPublisher, :start_link, [[], ITKQueue.RetryPublisher]} - }, + ITKQueue.ConsumerSupervisor, %{ id: ITKQueue.ConsumerConnection, start: @@ -51,8 +48,11 @@ defmodule ITKQueue do ITKQueue.ConsumerConnection ]} }, - ITKQueue.ConsumerSupervisor, - ITKQueue.Workers + ITKQueue.Workers, + %{ + id: ITKQueue.RetryPublisher, + start: {ITKQueue.RetryPublisher, :start_link, [[], ITKQueue.RetryPublisher]} + } ] end diff --git a/lib/itk/queue/consumer.ex b/lib/itk/queue/consumer.ex index 61ff746..aaa75bd 100644 --- a/lib/itk/queue/consumer.ex +++ b/lib/itk/queue/consumer.ex @@ -93,7 +93,7 @@ defmodule ITKQueue.Consumer do connection -> {:ok, connection} end catch - _ -> + _, _ -> {:error, :connection_lost} end @@ -107,8 +107,8 @@ defmodule ITKQueue.Consumer do {:ok, _} = AMQP.Basic.consume(channel, queue_name, self()) {:ok, channel} - rescue - e -> e + catch + _, e -> e end defp subscribe(subscription = %Subscription{queue_name: queue_name, routing_key: routing_key}) do diff --git a/lib/itk/queue/pub_channel.ex b/lib/itk/queue/pub_channel.ex index a90c4a0..574615d 100644 --- a/lib/itk/queue/pub_channel.ex +++ b/lib/itk/queue/pub_channel.ex @@ -49,6 +49,10 @@ defmodule ITKQueue.PubChannel do Logger.info("Publisher channel #{inspect(self())} opened on conn #{inspect(conn)}") {:noreply, %{state | chan: chan, status: :connected}} end) + catch + _, e -> + Process.send_after(self(), :connect, @reconnect_interval) + {:noreply, state} end def handle_info(:confirm, state = %{status: :connected, chan: chan, pending: pending}) do diff --git a/lib/itk/queue/retry_publisher.ex b/lib/itk/queue/retry_publisher.ex index f1f0ce1..ccf3e50 100644 --- a/lib/itk/queue/retry_publisher.ex +++ b/lib/itk/queue/retry_publisher.ex @@ -44,6 +44,11 @@ defmodule ITKQueue.RetryPublisher do Logger.info("RetryPublisher channel #{inspect(self())} opened on conn #{inspect(conn)}") {:noreply, %{state | chan: chan, status: :connected}} end) + catch + _, e -> + Logger.warn("RetryPublisher connect error: #{inspect(e)}") + Process.send_after(self(), :connect, @reconnect_interval) + {:noreply, state} end def handle_info(:confirm, state = %{status: :connected, chan: chan}) do