diff --git a/lib/flame/pool.ex b/lib/flame/pool.ex index 05401a3..3389035 100644 --- a/lib/flame/pool.ex +++ b/lib/flame/pool.ex @@ -1,12 +1,14 @@ defmodule FLAME.Pool.RunnerState do @moduledoc false + @type t :: %__MODULE__{} defstruct count: nil, pid: nil, monitor_ref: nil end defmodule FLAME.Pool.WaitingState do @moduledoc false + @type t :: %__MODULE__{} defstruct from: nil, monitor_ref: nil, deadline: nil end @@ -26,7 +28,7 @@ defmodule FLAME.Pool do children = [ ..., - {FLAME.Pool, name: MyRunner, min: 1, max: 10, max_concurrency: 100} + {FLAME.Pool, name: MyRunner, min: 1, max: 10} ] See `start_link/1` for supported options. @@ -40,11 +42,12 @@ defmodule FLAME.Pool do alias FLAME.{Pool, Runner, Queue, CodeSync} alias FLAME.Pool.{RunnerState, WaitingState, Caller} - @default_max_concurrency 100 + @default_strategy {Pool.PerRunnerMaxConcurrencyStrategy, [max_concurrency: 100]} @boot_timeout 30_000 @idle_shutdown_after 30_000 @async_boot_debounce 1_000 + @type t :: %__MODULE__{} defstruct name: nil, runner_sup: nil, task_sup: nil, @@ -55,7 +58,7 @@ defmodule FLAME.Pool do min_idle_shutdown_after: nil, min: nil, max: nil, - max_concurrency: nil, + strategy: nil, callers: %{}, waiting: Queue.new(), runners: %{}, @@ -89,8 +92,7 @@ defmodule FLAME.Pool do * `:max` - The maximum number of runners to elastically grow to in the pool. - * `:max_concurrency` - The maximum number of concurrent executions per runner before - booting new runners or queueing calls. Defaults to `100`. + * `:strategy` - The strategy to use. Defaults to `FLAME.Pool.PerRunnerMaxConcurrencyStrategy`. * `:single_use` - if `true`, runners will be terminated after each call completes. Defaults `false`. @@ -183,7 +185,7 @@ defmodule FLAME.Pool do ], min: 1, max: 1, - max_concurrency: 10, + strategy: {FLAME.Pool.PerRunnerMaxConcurrencyStrategy, [max_concurrency: 10]}, backend: {FLAME.FlyBackend, cpu_kind: "performance", cpus: 4, memory_mb: 8192, token: System.fetch_env!("FLY_API_TOKEN"), @@ -203,7 +205,7 @@ defmodule FLAME.Pool do :min_idle_shutdown_after, :min, :max, - :max_concurrency, + :strategy, :backend, :log, :single_use, @@ -417,7 +419,7 @@ defmodule FLAME.Pool do boot_timeout: boot_timeout, idle_shutdown_after: Keyword.get(opts, :idle_shutdown_after, @idle_shutdown_after), min_idle_shutdown_after: Keyword.get(opts, :min_idle_shutdown_after, :infinity), - max_concurrency: Keyword.get(opts, :max_concurrency, @default_max_concurrency), + strategy: Keyword.get(opts, :strategy, @default_strategy), on_grow_start: opts[:on_grow_start], on_grow_end: opts[:on_grow_end], on_shrink: opts[:on_shrink], @@ -489,21 +491,16 @@ defmodule FLAME.Pool do {:noreply, checkout_runner(state, deadline, from)} end - defp runner_count(state) do - map_size(state.runners) + map_size(state.pending_runners) + def runner_count(state) do + map_size(state.runners) end - defp waiting_count(%Pool{waiting: %Queue{} = waiting}) do + def waiting_count(%Pool{waiting: %Queue{} = waiting}) do Queue.size(waiting) end - defp min_runner(state) do - if map_size(state.runners) == 0 do - nil - else - {_ref, min} = Enum.min_by(state.runners, fn {_, %RunnerState{count: count}} -> count end) - min - end + def pending_count(state) do + map_size(state.pending_runners) end defp replace_caller(state, checkout_ref, caller_pid, child_pid) do @@ -544,26 +541,17 @@ defmodule FLAME.Pool do end defp checkout_runner(%Pool{} = state, deadline, from, monitor_ref \\ nil) do - min_runner = min_runner(state) - runner_count = runner_count(state) + {strategy_module, strategy_opts} = state.strategy - cond do - min_runner && min_runner.count < state.max_concurrency -> - reply_runner_checkout(state, min_runner, from, monitor_ref) + actions = strategy_module.checkout_runner(state, strategy_opts) - runner_count < state.max -> - if state.async_boot_timer || - map_size(state.pending_runners) * state.max_concurrency > waiting_count(state) do - waiting_in(state, deadline, from) - else - state - |> async_boot_runner() - |> waiting_in(deadline, from) - end - - true -> - waiting_in(state, deadline, from) - end + Enum.reduce(actions, state, fn action, acc -> + case action do + :wait -> waiting_in(acc, deadline, from) + :scale -> async_boot_runner(acc) + {:checkout, runner} -> reply_runner_checkout(acc, runner, from, monitor_ref) + end + end) end defp reply_runner_checkout(state, %RunnerState{} = runner, from, monitor_ref) do @@ -630,17 +618,29 @@ defmodule FLAME.Pool do end defp async_boot_runner(%Pool{on_grow_start: on_grow_start, name: name} = state) do - new_count = runner_count(state) + 1 + {strategy_module, strategy_opts} = state.strategy - task = - Task.Supervisor.async_nolink(state.task_sup, fn -> - if on_grow_start, do: on_grow_start.(%{count: new_count, name: name, pid: self()}) + current_count = runner_count(state) + pending_count(state) + new_count = strategy_module.desired_count(state, strategy_opts) - start_child_runner(state) - end) + num_tasks = max(new_count - current_count, 0) - new_pending = Map.put(state.pending_runners, task.ref, task.pid) - %Pool{state | pending_runners: new_pending} + if num_tasks do + tasks = + for _ <- 1..num_tasks do + Task.Supervisor.async_nolink(state.task_sup, fn -> + if on_grow_start, do: on_grow_start.(%{count: new_count, name: name, pid: self()}) + start_child_runner(state) + end) + end + + pending_runners = Map.new(tasks, &{&1.ref, &1.pid}) + new_pending = Map.merge(state.pending_runners, pending_runners) + + %Pool{state | pending_runners: new_pending} + else + state + end end defp start_child_runner(%Pool{} = state, runner_opts \\ []) do @@ -754,8 +754,15 @@ defmodule FLAME.Pool do defp handle_down(%Pool{} = state, {:DOWN, ref, :process, pid, reason}) do state = maybe_drop_waiting(state, pid) + %{ + callers: callers, + runners: runners, + pending_runners: pending_runners, + strategy: {strategy_module, strategy_opts} + } = state + state = - case state.callers do + case callers do %{^pid => %Caller{monitor_ref: ^ref} = caller} -> drop_caller(state, pid, caller) @@ -764,16 +771,16 @@ defmodule FLAME.Pool do end state = - case state.runners do + case runners do %{^ref => _} -> drop_child_runner(state, ref) %{} -> state end - case state.pending_runners do + case pending_runners do %{^ref => _} -> state = %Pool{state | pending_runners: Map.delete(state.pending_runners, ref)} # we rate limit this to avoid many failed async boot attempts - if has_unmet_servicable_demand?(state) do + if strategy_module.has_unmet_servicable_demand?(state, strategy_opts) do state |> maybe_on_grow_end(pid, {:exit, reason}) |> schedule_async_boot_runner() @@ -787,7 +794,7 @@ defmodule FLAME.Pool do end defp maybe_on_grow_end(%Pool{on_grow_end: on_grow_end} = state, pid, result) do - new_count = runner_count(state) + new_count = runner_count(state) + pending_count(state) meta = %{count: new_count, name: state.name, pid: pid} case result do @@ -799,17 +806,12 @@ defmodule FLAME.Pool do end defp maybe_on_shrink(%Pool{} = state) do - new_count = runner_count(state) + new_count = runner_count(state) + pending_count(state) if state.on_shrink, do: state.on_shrink.(%{count: new_count, name: state.name}) state end - defp has_unmet_servicable_demand?(%Pool{} = state) do - waiting_count(state) > map_size(state.pending_runners) * state.max_concurrency and - runner_count(state) < state.max - end - defp handle_runner_async_up(%Pool{} = state, pid, ref) when is_pid(pid) and is_reference(ref) do %{^ref => task_pid} = state.pending_runners Process.demonitor(ref, [:flush]) @@ -818,25 +820,15 @@ defmodule FLAME.Pool do {runner, new_state} = put_runner(new_state, pid) new_state = maybe_on_grow_end(new_state, task_pid, :ok) - # pop waiting callers up to max_concurrency, but we must handle: - # 1. the case where we have no waiting callers - # 2. the case where we process a DOWN for the new runner as we pop DOWNs - # looking for fresh waiting - # 3. if we still have waiting callers at the end, boot more runners if we have capacity - Enum.reduce_while(1..state.max_concurrency, new_state, fn i, acc -> - with {:ok, %RunnerState{} = runner} <- Map.fetch(acc.runners, runner.monitor_ref), - true <- i <= acc.max_concurrency do - case pop_next_waiting_caller(acc) do - {%WaitingState{} = next, acc} -> - {:cont, reply_runner_checkout(acc, runner, next.from, next.monitor_ref)} - - {nil, acc} -> - {:halt, acc} - end - else - _ -> {:halt, acc} - end - end) + {strategy_module, strategy_opts} = state.strategy + + pop = fn state -> pop_next_waiting_caller(state) end + + checkout = fn state, runner, from, monitor_ref -> + reply_runner_checkout(state, runner, from, monitor_ref) + end + + strategy_module.assign_waiting_callers(new_state, runner, pop, checkout, strategy_opts) end defp deadline(timeout) when is_integer(timeout) do diff --git a/lib/flame/pool/per_runner_max_concurrency_strategy.ex b/lib/flame/pool/per_runner_max_concurrency_strategy.ex new file mode 100644 index 0000000..9d61be3 --- /dev/null +++ b/lib/flame/pool/per_runner_max_concurrency_strategy.ex @@ -0,0 +1,79 @@ +defmodule FLAME.Pool.PerRunnerMaxConcurrencyStrategy do + alias FLAME.Pool + @behaviour FLAME.Pool.Strategy + + def checkout_runner(%Pool{} = pool, opts) do + min_runner = min_runner(pool) + runner_count = Pool.runner_count(pool) + Pool.pending_count(pool) + max_concurrency = Keyword.fetch!(opts, :max_concurrency) + + cond do + min_runner && min_runner.count < max_concurrency -> + [{:checkout, min_runner}] + + runner_count < pool.max -> + if pool.async_boot_timer || + map_size(pool.pending_runners) * max_concurrency > Pool.waiting_count(pool) do + [:wait] + else + [:scale, :wait] + end + + true -> + [:wait] + end + end + + def assign_waiting_callers( + %Pool{} = pool, + %Pool.RunnerState{} = runner, + pop_next_waiting_caller, + reply_runner_checkout, + opts + ) do + max_concurrency = Keyword.fetch!(opts, :max_concurrency) + + # pop waiting callers up to max_concurrency, but we must handle: + # 1. the case where we have no waiting callers + # 2. the case where we process a DOWN for the new runner as we pop DOWNs + # looking for fresh waiting + {pool, _assigned_concurrency} = + Enum.reduce_while(1..max_concurrency, {pool, 0}, fn _i, {pool, assigned_concurrency} -> + with {:ok, %Pool.RunnerState{} = runner} <- Map.fetch(pool.runners, runner.monitor_ref), + true <- assigned_concurrency <= max_concurrency do + case pop_next_waiting_caller.(pool) do + {%Pool.WaitingState{} = next, pool} -> + pool = reply_runner_checkout.(pool, runner, next.from, next.monitor_ref) + {:cont, {pool, assigned_concurrency + 1}} + + {nil, pool} -> + {:halt, {pool, assigned_concurrency}} + end + else + _ -> {:halt, {pool, assigned_concurrency}} + end + end) + + pool + end + + def desired_count(%Pool{} = pool, _opts) do + Pool.runner_count(pool) + Pool.pending_count(pool) + 1 + end + + def has_unmet_servicable_demand?(%Pool{} = pool, _opts) do + runner_count = Pool.runner_count(pool) + Pool.pending_count(pool) + Pool.waiting_count(pool) > 0 and runner_count < pool.max + end + + defp min_runner(pool) do + if map_size(pool.runners) == 0 do + nil + else + {_ref, min} = + Enum.min_by(pool.runners, fn {_, %Pool.RunnerState{count: count}} -> count end) + + min + end + end +end diff --git a/lib/flame/pool/strategy.ex b/lib/flame/pool/strategy.ex new file mode 100644 index 0000000..7389d86 --- /dev/null +++ b/lib/flame/pool/strategy.ex @@ -0,0 +1,27 @@ +defmodule FLAME.Pool.Strategy do + alias FLAME.Pool + + @type action :: + :wait + | :scale + | {:checkout, Pool.RunnerState.t()} + + @callback checkout_runner(state :: Pool.t(), opts :: Keyword.t()) :: list(action) + + @type pop_next_waiting_caller_fun :: (Pool.t() -> {Pool.WaitingState.t() | nil, Pool.t()}) + @type reply_runner_checkout_fun :: + (Pool.t(), Pool.RunnerState.t(), pid(), reference() -> Pool.t()) + + @callback assign_waiting_callers( + state :: Pool.t(), + runner :: Pool.RunnerState.t(), + pop_next_waiting_caller :: pop_next_waiting_caller_fun(), + reply_runner_checkout :: reply_runner_checkout_fun(), + opts :: Keyword.t() + ) :: + Pool.t() + + @callback desired_count(state :: Pool.t(), opts :: Keyword.t()) :: non_neg_integer() + + @callback has_unmet_servicable_demand?(state :: Pool.t(), opts :: Keyword.t()) :: boolean() +end diff --git a/test/flame_test.exs b/test/flame_test.exs index 97ea75e..6abc76f 100644 --- a/test/flame_test.exs +++ b/test/flame_test.exs @@ -28,7 +28,11 @@ defmodule FLAME.FLAMETest do {:ok, runner_sup: runner_sup, pool_pid: pool_pid} end - @tag runner: [min: 1, max: 2, max_concurrency: 2] + @tag runner: [ + min: 1, + max: 2, + strategy: {Pool.PerRunnerMaxConcurrencyStrategy, max_concurrency: 2} + ] test "init boots min runners synchronously and grows on demand", %{runner_sup: runner_sup} = config do min_pool = Supervisor.which_children(runner_sup) @@ -58,7 +62,11 @@ defmodule FLAME.FLAMETest do assert new_pool == Supervisor.which_children(runner_sup) end - @tag runner: [min: 0, max: 1, max_concurrency: 2] + @tag runner: [ + min: 0, + max: 1, + strategy: {Pool.PerRunnerMaxConcurrencyStrategy, max_concurrency: 2} + ] test "concurrent calls on fully pending runners", %{runner_sup: runner_sup} = config do assert Supervisor.which_children(runner_sup) == [] @@ -97,7 +105,7 @@ defmodule FLAME.FLAMETest do @tag runner: [ min: 1, max: 2, - max_concurrency: 1, + strategy: {Pool.PerRunnerMaxConcurrencyStrategy, max_concurrency: 1}, on_grow_start: &__MODULE__.on_grow_start/1, on_grow_end: &__MODULE__.on_grow_end/2 ] @@ -139,7 +147,12 @@ defmodule FLAME.FLAMETest do end) end - @tag runner: [min: 1, max: 2, max_concurrency: 2, idle_shutdown_after: 500] + @tag runner: [ + min: 1, + max: 2, + strategy: {Pool.PerRunnerMaxConcurrencyStrategy, max_concurrency: 2}, + idle_shutdown_after: 500 + ] test "idle shutdown", %{runner_sup: runner_sup} = config do sim_long_running(config.test, 100) sim_long_running(config.test, 100) @@ -160,7 +173,12 @@ defmodule FLAME.FLAMETest do Supervisor.which_children(runner_sup) end - @tag runner: [min: 1, max: 1, max_concurrency: 2, idle_shutdown_after: 500] + @tag runner: [ + min: 1, + max: 1, + strategy: {Pool.PerRunnerMaxConcurrencyStrategy, max_concurrency: 2}, + idle_shutdown_after: 500 + ] test "pool runner DOWN exits any active checkouts", %{runner_sup: runner_sup} = config do {:ok, active_checkout} = sim_long_running(config.test, 10_000) Process.unlink(active_checkout) @@ -170,7 +188,12 @@ defmodule FLAME.FLAMETest do assert_receive {:DOWN, _ref, :process, ^active_checkout, :killed} end - @tag runner: [min: 0, max: 1, max_concurrency: 2, idle_shutdown_after: 50] + @tag runner: [ + min: 0, + max: 1, + strategy: {Pool.PerRunnerMaxConcurrencyStrategy, max_concurrency: 2}, + idle_shutdown_after: 50 + ] test "call links", %{runner_sup: runner_sup} = config do ExUnit.CaptureLog.capture_log(fn -> parent = self() @@ -226,7 +249,12 @@ defmodule FLAME.FLAMETest do end) end - @tag runner: [min: 0, max: 1, max_concurrency: 2, idle_shutdown_after: 50] + @tag runner: [ + min: 0, + max: 1, + strategy: {Pool.PerRunnerMaxConcurrencyStrategy, max_concurrency: 2}, + idle_shutdown_after: 50 + ] test "cast with link false", %{runner_sup: runner_sup} = config do ExUnit.CaptureLog.capture_log(fn -> assert Supervisor.which_children(runner_sup) == [] @@ -252,7 +280,12 @@ defmodule FLAME.FLAMETest do end describe "cast" do - @tag runner: [min: 1, max: 2, max_concurrency: 2, idle_shutdown_after: 500] + @tag runner: [ + min: 1, + max: 2, + strategy: {Pool.PerRunnerMaxConcurrencyStrategy, max_concurrency: 2}, + idle_shutdown_after: 500 + ] test "normal execution", %{} = config do sim_long_running(config.test, 100) parent = self() @@ -278,7 +311,7 @@ defmodule FLAME.FLAMETest do @tag runner: [ min: 0, max: 2, - max_concurrency: 1, + strategy: {Pool.PerRunnerMaxConcurrencyStrategy, max_concurrency: 1}, on_grow_start: &__MODULE__.growth_grow_start/1 ] test "pool growth", %{} = config do @@ -303,7 +336,12 @@ defmodule FLAME.FLAMETest do refute_receive {:grow_start, _}, 1000 end - @tag runner: [min: 1, max: 2, max_concurrency: 2, idle_shutdown_after: 500] + @tag runner: [ + min: 1, + max: 2, + strategy: {Pool.PerRunnerMaxConcurrencyStrategy, max_concurrency: 2}, + idle_shutdown_after: 500 + ] test "with exit and default link", %{} = config do ExUnit.CaptureLog.capture_log(fn -> Process.flag(:trap_exit, true) @@ -327,7 +365,12 @@ defmodule FLAME.FLAMETest do end describe "process placement" do - @tag runner: [min: 0, max: 2, max_concurrency: 2, idle_shutdown_after: 100] + @tag runner: [ + min: 0, + max: 2, + strategy: {Pool.PerRunnerMaxConcurrencyStrategy, max_concurrency: 2}, + idle_shutdown_after: 100 + ] test "place_child/2", %{runner_sup: runner_sup} = config do assert [] = Supervisor.which_children(runner_sup) assert {:ok, pid} = FLAME.place_child(config.test, {Agent, fn -> 1 end}) @@ -355,7 +398,12 @@ defmodule FLAME.FLAMETest do assert_receive {:DOWN, _ref, :process, ^runner, _}, 1000 end - @tag runner: [min: 0, max: 2, max_concurrency: 2, idle_shutdown_after: 100] + @tag runner: [ + min: 0, + max: 2, + strategy: {Pool.PerRunnerMaxConcurrencyStrategy, max_concurrency: 2}, + idle_shutdown_after: 100 + ] test "place_child links", %{runner_sup: runner_sup} = config do # links by default Process.flag(:trap_exit, true) @@ -389,7 +437,12 @@ defmodule FLAME.FLAMETest do assert_receive {:DOWN, _ref, :process, ^runner, _}, 1000 end - @tag runner: [min: 0, max: 2, max_concurrency: 2, idle_shutdown_after: 100] + @tag runner: [ + min: 0, + max: 2, + strategy: {Pool.PerRunnerMaxConcurrencyStrategy, max_concurrency: 2}, + idle_shutdown_after: 100 + ] test "place_child when caller exits", %{runner_sup: runner_sup} = config do # links by default parent = self() @@ -464,7 +517,12 @@ defmodule FLAME.FLAMETest do assert_receive {:DOWN, ^monitor_ref, _, _, :normal} end - @tag runner: [min: 0, max: 2, max_concurrency: 2, idle_shutdown_after: 100] + @tag runner: [ + min: 0, + max: 2, + strategy: {Pool.PerRunnerMaxConcurrencyStrategy, max_concurrency: 2}, + idle_shutdown_after: 100 + ] test "remote without tracking", config do name = :"#{config.test}_trackable" non_trackable = URI.new!("/") @@ -481,7 +539,12 @@ defmodule FLAME.FLAMETest do assert %MyTrackable{pid: nil} = map["yes"] end - @tag runner: [min: 0, max: 2, max_concurrency: 2, idle_shutdown_after: 100] + @tag runner: [ + min: 0, + max: 2, + strategy: {Pool.PerRunnerMaxConcurrencyStrategy, max_concurrency: 2}, + idle_shutdown_after: 100 + ] test "remote with tracking", %{runner_sup: runner_sup} = config do name = :"#{config.test}_trackable" non_trackable = URI.new!("/") @@ -513,7 +576,7 @@ defmodule FLAME.FLAMETest do @tag runner: [ min: 0, max: 2, - max_concurrency: 2, + strategy: {Pool.PerRunnerMaxConcurrencyStrategy, max_concurrency: 2}, idle_shutdown_after: 100, track_resources: true ]