diff --git a/VERSION b/VERSION index b0bb8785..85b7c695 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.9.5 +0.9.6 diff --git a/lib/supavisor.ex b/lib/supavisor.ex index adfb10ec..7509c5c9 100644 --- a/lib/supavisor.ex +++ b/lib/supavisor.ex @@ -11,6 +11,7 @@ defmodule Supavisor do @type secrets :: {:password | :auth_query, fun()} @type mode :: :transaction | :session @type id :: {String.t(), String.t(), mode} + @type subscribe_opts :: %{workers: workers, ps: list, idle_timeout: integer} @registry Supavisor.Registry.Tenants @@ -48,18 +49,18 @@ defmodule Supavisor do end end - @spec subscribe_local(pid, id) :: {:ok, workers, iodata()} | {:error, any()} - def subscribe_local(pid, id) do + @spec subscribe_local(pid, id) :: {:ok, subscribe_opts} | {:error, any()} + def(subscribe_local(pid, id)) do with {:ok, workers} <- get_local_workers(id), - {:ok, ps} <- Manager.subscribe(workers.manager, pid) do - {:ok, workers, ps} + {:ok, ps, idle_timeout} <- Manager.subscribe(workers.manager, pid) do + {:ok, %{workers: workers, ps: ps, idle_timeout: idle_timeout}} else error -> error end end - @spec subscribe(pid, id, pid) :: {:ok, workers, iodata()} | {:error, any()} + @spec subscribe(pid, id, pid) :: {:ok, subscribe_opts} | {:error, any()} def subscribe(sup, id, pid \\ self()) do dest_node = node(sup) @@ -169,6 +170,7 @@ defmodule Supavisor do ip_version: ip_ver, default_pool_size: def_pool_size, default_max_clients: def_max_clients, + client_idle_timeout: client_idle_timeout, users: [ %{ db_user: db_user, @@ -210,7 +212,8 @@ defmodule Supavisor do pool_size: pool_size, mode: mode, default_parameter_status: ps, - max_clients: max_clients + max_clients: max_clients, + client_idle_timeout: client_idle_timeout } DynamicSupervisor.start_child( diff --git a/lib/supavisor/application.ex b/lib/supavisor/application.ex index 90d60a2e..8057712d 100644 --- a/lib/supavisor/application.ex +++ b/lib/supavisor/application.ex @@ -40,7 +40,7 @@ defmodule Supavisor.Application do %{ max_connections: String.to_integer(System.get_env("MAX_CONNECTIONS") || "25000"), num_acceptors: String.to_integer(System.get_env("NUM_ACCEPTORS") || "100"), - socket_opts: [port: port] + socket_opts: [port: port, keepalive: true] }, Supavisor.ClientHandler, %{mode: mode} diff --git a/lib/supavisor/client_handler.ex b/lib/supavisor/client_handler.ex index c95cfc1a..0f3a5db1 100644 --- a/lib/supavisor/client_handler.ex +++ b/lib/supavisor/client_handler.ex @@ -55,7 +55,8 @@ defmodule Supavisor.ClientHandler do auth_secrets: nil, proxy_type: nil, mode: opts.mode, - stats: %{} + stats: %{}, + idle_timeout: 0 } :gen_statem.enter_loop(__MODULE__, [hibernate_after: 5_000], :exchange, data) @@ -184,17 +185,17 @@ defmodule Supavisor.ClientHandler do Logger.debug("Subscribe to tenant #{inspect(data.id)}") with {:ok, sup} <- Supavisor.start(data.id, data.auth_secrets), - {:ok, workers, ps} <- Supavisor.subscribe(sup, data.id) do - Process.monitor(workers.manager) - data = Map.merge(data, workers) + {:ok, opts} <- Supavisor.subscribe(sup, data.id) do + Process.monitor(opts.workers.manager) + data = Map.merge(data, opts.workers) db_pid = db_checkout(:on_connect, data) - data = %{data | db_pid: db_pid} + data = %{data | db_pid: db_pid, idle_timeout: opts.idle_timeout} next = - if ps == [] do + if opts.ps == [] do {:timeout, 10_000, :wait_ps} else - {:next_event, :internal, {:greetings, ps}} + {:next_event, :internal, {:greetings, opts.ps}} end {:keep_state, data, next} @@ -213,7 +214,12 @@ defmodule Supavisor.ClientHandler do def handle_event(:internal, {:greetings, ps}, _, %{sock: sock} = data) do :ok = sock_send(sock, Server.greetings(ps)) - {:next_state, :idle, data} + + if data.idle_timeout > 0 do + {:next_state, :idle, data, idle_check(data.idle_timeout)} + else + {:next_state, :idle, data} + end end def handle_event(:timeout, :subscribe, _, _) do @@ -227,10 +233,27 @@ defmodule Supavisor.ClientHandler do {:keep_state_and_data, {:next_event, :internal, {:greetings, ps}}} end - # ignore termination messages - def handle_event(:info, {proto, _, <>}, _, _) when proto in [:tcp, :ssl] do + def handle_event(:timeout, :idle_terminate, _, data) do + Logger.warning("Terminate an idle connection by #{data.idle_timeout} timeout") + {:stop, :normal, data} + end + + # handle Terminate message + def handle_event(:info, {proto, _, <>}, :idle, data) when proto in [:tcp, :ssl] do Logger.debug("Receive termination") - :keep_state_and_data + {:stop, :normal, data} + end + + # handle Sync message + def handle_event(:info, {proto, _, <>}, :idle, data) when proto in [:tcp, :ssl] do + Logger.debug("Receive sync") + :ok = sock_send(data.sock, Server.ready_for_query()) + + if data.idle_timeout > 0 do + {:keep_state_and_data, idle_check(data.idle_timeout)} + else + :keep_state_and_data + end end def handle_event(:info, {proto, _, bin}, :idle, data) do @@ -278,11 +301,6 @@ defmodule Supavisor.ClientHandler do end # client closed connection - def handle_event(_, {:tcp_closed, _}, _, data) do - Logger.debug("tcp soket closed for #{inspect(data.tenant)}") - {:stop, :normal} - end - def handle_event(_, {closed, _}, _, data) when closed in [:tcp_closed, :ssl_closed] do Logger.debug("#{closed} soket closed for #{inspect(data.tenant)}") @@ -323,7 +341,15 @@ defmodule Supavisor.ClientHandler do {_, stats} = Telem.network_usage(:client, data.sock, data.id, data.stats) Telem.client_query_time(data.query_start, data.id) - {:next_state, :idle, %{data | db_pid: db_pid, stats: stats}, reply} + + actions = + if data.idle_timeout > 0 do + [reply, idle_check(data.idle_timeout)] + else + reply + end + + {:next_state, :idle, %{data | db_pid: db_pid, stats: stats}, actions} else Logger.debug("Client is not ready") {:keep_state_and_data, reply} @@ -650,4 +676,9 @@ defmodule Supavisor.ClientHandler do end def try_get_sni(_), do: nil + + @spec idle_check(non_neg_integer) :: {:timeout, non_neg_integer, :idle_terminate} + defp idle_check(timeout) do + {:timeout, timeout, :idle_terminate} + end end diff --git a/lib/supavisor/manager.ex b/lib/supavisor/manager.ex index 48920275..19fc15d9 100644 --- a/lib/supavisor/manager.ex +++ b/lib/supavisor/manager.ex @@ -14,7 +14,7 @@ defmodule Supavisor.Manager do GenServer.start_link(__MODULE__, args, name: name) end - @spec subscribe(pid, pid) :: {:ok, iodata() | []} | {:error, :max_clients_reached} + @spec subscribe(pid, pid) :: {:ok, iodata() | [], integer} | {:error, :max_clients_reached} def subscribe(manager, pid) do GenServer.call(manager, {:subscribe, pid}) end @@ -43,7 +43,8 @@ defmodule Supavisor.Manager do parameter_status: [], wait_ps: [], default_parameter_status: args.default_parameter_status, - max_clients: args.max_clients + max_clients: args.max_clients, + idle_timeout: args.client_idle_timeout } {tenant, user, _mode} = args.id @@ -64,10 +65,10 @@ defmodule Supavisor.Manager do case state.parameter_status do [] -> - {{:ok, []}, update_in(state.wait_ps, &[pid | &1])} + {{:ok, [], state.idle_timeout}, update_in(state.wait_ps, &[pid | &1])} ps -> - {{:ok, ps}, state} + {{:ok, ps, state.idle_timeout}, state} end else {{:error, :max_clients_reached}, state} diff --git a/lib/supavisor/tenants/tenant.ex b/lib/supavisor/tenants/tenant.ex index 6b02ecd7..1106ba09 100644 --- a/lib/supavisor/tenants/tenant.ex +++ b/lib/supavisor/tenants/tenant.ex @@ -26,6 +26,7 @@ defmodule Supavisor.Tenants.Tenant do field(:default_pool_size, :integer, default: 15) field(:sni_hostname, :string) field(:default_max_clients, :integer, default: 1000) + field(:client_idle_timeout, :integer, default: 0) has_many(:users, User, foreign_key: :tenant_external_id, @@ -55,7 +56,8 @@ defmodule Supavisor.Tenants.Tenant do :auth_query, :default_pool_size, :sni_hostname, - :default_max_clients + :default_max_clients, + :client_idle_timeout ]) |> check_constraint(:upstream_ssl, name: :upstream_constraints, prefix: "_supavisor") |> check_constraint(:upstream_verify, name: :upstream_constraints, prefix: "_supavisor") diff --git a/lib/supavisor_web/views/tenant_view.ex b/lib/supavisor_web/views/tenant_view.ex index f9b86270..27936e40 100644 --- a/lib/supavisor_web/views/tenant_view.ex +++ b/lib/supavisor_web/views/tenant_view.ex @@ -26,6 +26,7 @@ defmodule SupavisorWeb.TenantView do auth_query: tenant.auth_query, sni_hostname: tenant.sni_hostname, default_max_clients: tenant.default_max_clients, + client_idle_timeout: tenant.client_idle_timeout, users: render_many(tenant.users, UserView, "user.json") } end diff --git a/priv/repo/migrations/20230914102712_add_client_idle_timeout.exs b/priv/repo/migrations/20230914102712_add_client_idle_timeout.exs new file mode 100644 index 00000000..c43e31e7 --- /dev/null +++ b/priv/repo/migrations/20230914102712_add_client_idle_timeout.exs @@ -0,0 +1,9 @@ +defmodule Supavisor.Repo.Migrations.AddClientIdleTimeout do + use Ecto.Migration + + def change do + alter table("tenants", prefix: "_supavisor") do + add(:client_idle_timeout, :integer, null: false, default: 0) + end + end +end