From 36722ca941f3b7ab2412784b5bd14ae87d72902b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Jan=20Niemier?= Date: Tue, 3 Dec 2024 21:33:18 +0100 Subject: [PATCH] chore: extract handling network data to separate function This approach make data handling more concise and less prone to errors. This is also will make extraction of different working modes easier in the future refactoring. --- lib/supavisor/client_handler.ex | 207 +++++++++++++++++--------------- 1 file changed, 112 insertions(+), 95 deletions(-) diff --git a/lib/supavisor/client_handler.ex b/lib/supavisor/client_handler.ex index 5ce3afb3..aefbaca8 100644 --- a/lib/supavisor/client_handler.ex +++ b/lib/supavisor/client_handler.ex @@ -506,103 +506,21 @@ defmodule Supavisor.ClientHandler do {:keep_state_and_data, {:timeout, data.heartbeat_interval, :heartbeat_check}} end - # handle Terminate message - def handle_event(:info, {proto, _, <>}, :idle, _) - when proto in @proto do - Logger.info("ClientHandler: Terminate received from client") - {:stop, {:shutdown, :terminate_received}} - end - - # handle Sync message - def handle_event(:info, {proto, _, <> = msg}, :idle, data) - when proto in @proto do - Logger.debug("ClientHandler: Receive sync") - - # db_pid can be nil in transaction mode, so we will send ready_for_query - # without checking out a direct connection. If there is a linked db_pid, - # we will forward the message to it - if data.db_pid != nil, - do: :ok = sock_send_maybe_active_once(msg, data), - else: :ok = HandlerHelpers.sock_send(data.sock, Server.ready_for_query()) - - {:keep_state, %{data | active_count: reset_active_count(data)}, handle_actions(data)} - end - - def handle_event(:info, {proto, _, <> = msg}, _, data) - when proto in @proto do - Logger.debug("ClientHandler: Receive sync while not idle") - :ok = sock_send_maybe_active_once(msg, data) - {:keep_state, %{data | active_count: reset_active_count(data)}, handle_actions(data)} - end - - # handle Flush message - def handle_event(:info, {proto, _, <> = msg}, _, data) - when proto in @proto do - Logger.debug("ClientHandler: Receive flush while not idle") - :ok = sock_send_maybe_active_once(msg, data) - {:keep_state, %{data | active_count: reset_active_count(data)}, handle_actions(data)} - end - - # incoming query with a single pool - def handle_event(:info, {proto, _, bin}, :idle, %{pool: pid} = data) - when is_binary(bin) and is_pid(pid) and proto in @proto do - Logger.debug("ClientHandler: Receive query #{inspect(bin)}") - db_pid = db_checkout(:both, :on_query, data) - handle_prepared_statements(db_pid, bin, data) - - {:next_state, :busy, %{data | db_pid: db_pid, query_start: System.monotonic_time()}, - {:next_event, :internal, {proto, nil, bin}}} - end - - def handle_event(:info, {proto, _, bin}, _, %{mode: :proxy} = data) when proto in @proto do - {:next_state, :busy, %{data | query_start: System.monotonic_time()}, - {:next_event, :internal, {proto, nil, bin}}} - end - - # incoming query with read/write pools - def handle_event(:info, {proto, _, bin}, :idle, data) when proto in @proto do - query_type = - with {:ok, payload} <- Client.get_payload(bin), - {:ok, statements} <- Supavisor.PgParser.statements(payload) do - Logger.debug( - "ClientHandler: Receive payload #{inspect(payload, pretty: true)} statements #{inspect(statements)}" - ) + def handle_event(kind, {proto, socket, msg}, state, data) + when proto in @proto and is_binary(msg) do + with {:next_state, next_state, new_data, actions} <- handle_data(kind, msg, state, data) do + new_actions = + actions + |> List.wrap() + |> Enum.map(fn + {:next_event, type, bin} when is_binary(bin) -> + {:next_event, type, {proto, socket, bin}} - case statements do - # naive check for read only queries - ["SelectStmt"] -> :read - _ -> :write - end - else - other -> - Logger.error("ClientHandler: Receive query error: #{inspect(other)}") - :write - end - - ts = System.monotonic_time() - db_pid = db_checkout(query_type, :on_query, data) - - {:next_state, :busy, %{data | db_pid: db_pid, query_start: ts, last_query: bin}, - {:next_event, :internal, {proto, nil, bin}}} - end - - # forward query to db - def handle_event(_, {proto, _, bin}, :busy, data) when proto in @proto do - Logger.debug("ClientHandler: Forward query to db #{inspect(bin)} #{inspect(data.db_pid)}") - - case sock_send_maybe_active_once(bin, data) do - :ok -> - {:keep_state, %{data | active_count: data.active_count + 1}} - - error -> - Logger.error("ClientHandler: error while sending query: #{inspect(error)}") - - HandlerHelpers.sock_send( - data.sock, - Server.error_message("XX000", "Error while sending query") - ) + other -> + other + end) - {:stop, {:shutdown, :send_query_error}} + {:next_state, next_state, new_data, new_actions} end end @@ -1048,6 +966,105 @@ defmodule Supavisor.ClientHandler do end end + @spec handle_data(kind :: atom(), data :: binary(), state, data) :: + :gen_statem.event_handler_result(data) + when state: atom() | term(), + data: term() + + # handle Terminate message + defp handle_data(:info, <>, :idle, _data) do + Logger.info("ClientHandler: Terminate received from client") + {:stop, {:shutdown, :terminate_received}} + end + + defp handle_data(:info, <> <> _ = msg, :idle, data) do + Logger.debug("ClientHandler: Receive sync") + + # db_pid can be nil in transaction mode, so we will send ready_for_query + # without checking out a direct connection. If there is a linked db_pid, + # we will forward the message to it + if data.db_pid != nil, + do: :ok = sock_send_maybe_active_once(msg, data), + else: :ok = HandlerHelpers.sock_send(data.sock, Server.ready_for_query()) + + {:keep_state, %{data | active_count: reset_active_count(data)}, handle_actions(data)} + end + + defp handle_data(:info, <> = msg, _, data) do + Logger.debug("ClientHandler: Receive sync while not idle") + :ok = sock_send_maybe_active_once(msg, data) + {:keep_state, %{data | active_count: reset_active_count(data)}, handle_actions(data)} + end + + # handle Flush message + defp handle_data(:info, <> = msg, _, data) do + Logger.debug("ClientHandler: Receive flush while not idle") + :ok = sock_send_maybe_active_once(msg, data) + {:keep_state, %{data | active_count: reset_active_count(data)}, handle_actions(data)} + end + + # incoming query with a single pool + defp handle_data(:info, bin, :idle, %{pool: pid} = data) when is_pid(pid) do + Logger.debug("ClientHandler: Receive query #{inspect(bin)}") + db_pid = db_checkout(:both, :on_query, data) + handle_prepared_statements(db_pid, bin, data) + + {:next_state, :busy, %{data | db_pid: db_pid, query_start: System.monotonic_time()}, + {:next_event, :internal, bin}} + end + + defp handle_data(:info, bin, _, %{mode: :proxy} = data) do + {:next_state, :busy, %{data | query_start: System.monotonic_time()}, + {:next_event, :internal, bin}} + end + + # incoming query with read/write pools + defp handle_data(:info, bin, :idle, data) do + query_type = + with {:ok, payload} <- Client.get_payload(bin), + {:ok, statements} <- Supavisor.PgParser.statements(payload) do + Logger.debug( + "ClientHandler: Receive payload #{inspect(payload, pretty: true)} statements #{inspect(statements)}" + ) + + case statements do + # naive check for read only queries + ["SelectStmt"] -> :read + _ -> :write + end + else + other -> + Logger.error("ClientHandler: Receive query error: #{inspect(other)}") + :write + end + + ts = System.monotonic_time() + db_pid = db_checkout(query_type, :on_query, data) + + {:next_state, :busy, %{data | db_pid: db_pid, query_start: ts, last_query: bin}, + {:next_event, :internal, bin}} + end + + # forward query to db + defp handle_data(_, bin, :busy, data) do + Logger.debug("ClientHandler: Forward query to db #{inspect(bin)} #{inspect(data.db_pid)}") + + case sock_send_maybe_active_once(bin, data) do + :ok -> + {:keep_state, %{data | active_count: data.active_count + 1}} + + error -> + Logger.error("ClientHandler: error while sending query: #{inspect(error)}") + + HandlerHelpers.sock_send( + data.sock, + Server.error_message("XX000", "Error while sending query") + ) + + {:stop, {:shutdown, :send_query_error}} + end + end + @spec handle_prepared_statements({pid, pid, Supavisor.sock()}, binary, map) :: :ok | nil defp handle_prepared_statements({_, pid, _}, bin, %{mode: :transaction} = data) do with {:ok, payload} <- Client.get_payload(bin),