Skip to content

Commit

Permalink
feat: add client_idle_timeout option (#165)
Browse files Browse the repository at this point in the history
  • Loading branch information
abc3 authored Sep 18, 2023
1 parent d547b2a commit d4c6d65
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 30 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.9.5
0.9.6
15 changes: 9 additions & 6 deletions lib/supavisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ defmodule Supavisor do
@type secrets :: {:password | :auth_query, fun()}
@type mode :: :transaction | :session
@type id :: {String.t(), String.t(), mode}
@type subscribe_opts :: %{workers: workers, ps: list, idle_timeout: integer}

@registry Supavisor.Registry.Tenants

Expand Down Expand Up @@ -48,18 +49,18 @@ defmodule Supavisor do
end
end

@spec subscribe_local(pid, id) :: {:ok, workers, iodata()} | {:error, any()}
def subscribe_local(pid, id) do
@spec subscribe_local(pid, id) :: {:ok, subscribe_opts} | {:error, any()}
def(subscribe_local(pid, id)) do
with {:ok, workers} <- get_local_workers(id),
{:ok, ps} <- Manager.subscribe(workers.manager, pid) do
{:ok, workers, ps}
{:ok, ps, idle_timeout} <- Manager.subscribe(workers.manager, pid) do
{:ok, %{workers: workers, ps: ps, idle_timeout: idle_timeout}}
else
error ->
error
end
end

@spec subscribe(pid, id, pid) :: {:ok, workers, iodata()} | {:error, any()}
@spec subscribe(pid, id, pid) :: {:ok, subscribe_opts} | {:error, any()}
def subscribe(sup, id, pid \\ self()) do
dest_node = node(sup)

Expand Down Expand Up @@ -169,6 +170,7 @@ defmodule Supavisor do
ip_version: ip_ver,
default_pool_size: def_pool_size,
default_max_clients: def_max_clients,
client_idle_timeout: client_idle_timeout,
users: [
%{
db_user: db_user,
Expand Down Expand Up @@ -210,7 +212,8 @@ defmodule Supavisor do
pool_size: pool_size,
mode: mode,
default_parameter_status: ps,
max_clients: max_clients
max_clients: max_clients,
client_idle_timeout: client_idle_timeout
}

DynamicSupervisor.start_child(
Expand Down
2 changes: 1 addition & 1 deletion lib/supavisor/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ defmodule Supavisor.Application do
%{
max_connections: String.to_integer(System.get_env("MAX_CONNECTIONS") || "25000"),
num_acceptors: String.to_integer(System.get_env("NUM_ACCEPTORS") || "100"),
socket_opts: [port: port]
socket_opts: [port: port, keepalive: true]
},
Supavisor.ClientHandler,
%{mode: mode}
Expand Down
65 changes: 48 additions & 17 deletions lib/supavisor/client_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ defmodule Supavisor.ClientHandler do
auth_secrets: nil,
proxy_type: nil,
mode: opts.mode,
stats: %{}
stats: %{},
idle_timeout: 0
}

:gen_statem.enter_loop(__MODULE__, [hibernate_after: 5_000], :exchange, data)
Expand Down Expand Up @@ -184,17 +185,17 @@ defmodule Supavisor.ClientHandler do
Logger.debug("Subscribe to tenant #{inspect(data.id)}")

with {:ok, sup} <- Supavisor.start(data.id, data.auth_secrets),
{:ok, workers, ps} <- Supavisor.subscribe(sup, data.id) do
Process.monitor(workers.manager)
data = Map.merge(data, workers)
{:ok, opts} <- Supavisor.subscribe(sup, data.id) do
Process.monitor(opts.workers.manager)
data = Map.merge(data, opts.workers)
db_pid = db_checkout(:on_connect, data)
data = %{data | db_pid: db_pid}
data = %{data | db_pid: db_pid, idle_timeout: opts.idle_timeout}

next =
if ps == [] do
if opts.ps == [] do
{:timeout, 10_000, :wait_ps}
else
{:next_event, :internal, {:greetings, ps}}
{:next_event, :internal, {:greetings, opts.ps}}
end

{:keep_state, data, next}
Expand All @@ -213,7 +214,12 @@ defmodule Supavisor.ClientHandler do

def handle_event(:internal, {:greetings, ps}, _, %{sock: sock} = data) do
:ok = sock_send(sock, Server.greetings(ps))
{:next_state, :idle, data}

if data.idle_timeout > 0 do
{:next_state, :idle, data, idle_check(data.idle_timeout)}
else
{:next_state, :idle, data}
end
end

def handle_event(:timeout, :subscribe, _, _) do
Expand All @@ -227,10 +233,27 @@ defmodule Supavisor.ClientHandler do
{:keep_state_and_data, {:next_event, :internal, {:greetings, ps}}}
end

# ignore termination messages
def handle_event(:info, {proto, _, <<?X, 4::32>>}, _, _) when proto in [:tcp, :ssl] do
def handle_event(:timeout, :idle_terminate, _, data) do
Logger.warning("Terminate an idle connection by #{data.idle_timeout} timeout")
{:stop, :normal, data}
end

# handle Terminate message
def handle_event(:info, {proto, _, <<?X, 4::32>>}, :idle, data) when proto in [:tcp, :ssl] do
Logger.debug("Receive termination")
:keep_state_and_data
{:stop, :normal, data}
end

# handle Sync message
def handle_event(:info, {proto, _, <<?S, 4::32>>}, :idle, data) when proto in [:tcp, :ssl] do
Logger.debug("Receive sync")
:ok = sock_send(data.sock, Server.ready_for_query())

if data.idle_timeout > 0 do
{:keep_state_and_data, idle_check(data.idle_timeout)}
else
:keep_state_and_data
end
end

def handle_event(:info, {proto, _, bin}, :idle, data) do
Expand Down Expand Up @@ -278,11 +301,6 @@ defmodule Supavisor.ClientHandler do
end

# client closed connection
def handle_event(_, {:tcp_closed, _}, _, data) do
Logger.debug("tcp soket closed for #{inspect(data.tenant)}")
{:stop, :normal}
end

def handle_event(_, {closed, _}, _, data)
when closed in [:tcp_closed, :ssl_closed] do
Logger.debug("#{closed} soket closed for #{inspect(data.tenant)}")
Expand Down Expand Up @@ -323,7 +341,15 @@ defmodule Supavisor.ClientHandler do

{_, stats} = Telem.network_usage(:client, data.sock, data.id, data.stats)
Telem.client_query_time(data.query_start, data.id)
{:next_state, :idle, %{data | db_pid: db_pid, stats: stats}, reply}

actions =
if data.idle_timeout > 0 do
[reply, idle_check(data.idle_timeout)]
else
reply
end

{:next_state, :idle, %{data | db_pid: db_pid, stats: stats}, actions}
else
Logger.debug("Client is not ready")
{:keep_state_and_data, reply}
Expand Down Expand Up @@ -650,4 +676,9 @@ defmodule Supavisor.ClientHandler do
end

def try_get_sni(_), do: nil

@spec idle_check(non_neg_integer) :: {:timeout, non_neg_integer, :idle_terminate}
defp idle_check(timeout) do
{:timeout, timeout, :idle_terminate}
end
end
9 changes: 5 additions & 4 deletions lib/supavisor/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ defmodule Supavisor.Manager do
GenServer.start_link(__MODULE__, args, name: name)
end

@spec subscribe(pid, pid) :: {:ok, iodata() | []} | {:error, :max_clients_reached}
@spec subscribe(pid, pid) :: {:ok, iodata() | [], integer} | {:error, :max_clients_reached}
def subscribe(manager, pid) do
GenServer.call(manager, {:subscribe, pid})
end
Expand Down Expand Up @@ -43,7 +43,8 @@ defmodule Supavisor.Manager do
parameter_status: [],
wait_ps: [],
default_parameter_status: args.default_parameter_status,
max_clients: args.max_clients
max_clients: args.max_clients,
idle_timeout: args.client_idle_timeout
}

{tenant, user, _mode} = args.id
Expand All @@ -64,10 +65,10 @@ defmodule Supavisor.Manager do

case state.parameter_status do
[] ->
{{:ok, []}, update_in(state.wait_ps, &[pid | &1])}
{{:ok, [], state.idle_timeout}, update_in(state.wait_ps, &[pid | &1])}

ps ->
{{:ok, ps}, state}
{{:ok, ps, state.idle_timeout}, state}
end
else
{{:error, :max_clients_reached}, state}
Expand Down
4 changes: 3 additions & 1 deletion lib/supavisor/tenants/tenant.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ defmodule Supavisor.Tenants.Tenant do
field(:default_pool_size, :integer, default: 15)
field(:sni_hostname, :string)
field(:default_max_clients, :integer, default: 1000)
field(:client_idle_timeout, :integer, default: 0)

has_many(:users, User,
foreign_key: :tenant_external_id,
Expand Down Expand Up @@ -55,7 +56,8 @@ defmodule Supavisor.Tenants.Tenant do
:auth_query,
:default_pool_size,
:sni_hostname,
:default_max_clients
:default_max_clients,
:client_idle_timeout
])
|> check_constraint(:upstream_ssl, name: :upstream_constraints, prefix: "_supavisor")
|> check_constraint(:upstream_verify, name: :upstream_constraints, prefix: "_supavisor")
Expand Down
1 change: 1 addition & 0 deletions lib/supavisor_web/views/tenant_view.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ defmodule SupavisorWeb.TenantView do
auth_query: tenant.auth_query,
sni_hostname: tenant.sni_hostname,
default_max_clients: tenant.default_max_clients,
client_idle_timeout: tenant.client_idle_timeout,
users: render_many(tenant.users, UserView, "user.json")
}
end
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
defmodule Supavisor.Repo.Migrations.AddClientIdleTimeout do
use Ecto.Migration

def change do
alter table("tenants", prefix: "_supavisor") do
add(:client_idle_timeout, :integer, null: false, default: 0)
end
end
end

0 comments on commit d4c6d65

Please sign in to comment.