diff --git a/.formatter.exs b/.formatter.exs new file mode 100644 index 0000000..d2cda26 --- /dev/null +++ b/.formatter.exs @@ -0,0 +1,4 @@ +# Used by "mix format" +[ + inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"] +] diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e4dd1d5 --- /dev/null +++ b/.gitignore @@ -0,0 +1,26 @@ +# The directory Mix will write compiled artifacts to. +/_build/ + +# If you run "mix test --cover", coverage assets end up here. +/cover/ + +# The directory Mix downloads your dependencies sources to. +/deps/ + +# Where third-party dependencies like ExDoc output generated docs. +/doc/ + +# Ignore .fetch files in case you like to edit your project deps locally. +/.fetch + +# If the VM crashes, it generates a dump, let's ignore it too. +erl_crash.dump + +# Also ignore archive artifacts (built via "mix archive.build"). +*.ez + +# Ignore package tarball (built via "mix hex.build"). +busy_bee-*.tar + +# Temporary files, for example, from tests. +/tmp/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..9790119 --- /dev/null +++ b/README.md @@ -0,0 +1,28 @@ +# BusyBee + +### Synopsis + +This is a micro task pool library that provides a unique VM/node-wide task pool named by the user (so you can have any number of them as long as the names are unique) whose gimmick is that it keeps the worker processes alive for as long as the task pool itself (a `Supervisor`) is alive. When work needs to be done (via the `each/2` function) it spawns throwaway processes to process an input list of items with a given function, using the workers in the pool, in parallel. The amount of these throwaway processes (we call them "callers") can be equal to or more than the worker count -- but never less. + +### Usage + +The intended usage of this task pool is to define a module and do `use BusyBee` inside it, akin to an `Ecto.Repo`, like so: + +``` +defmodule MyApp.SendEmails do + use BusyBee, workers: 3, call_timeout: :infinity, shutdown_timeout: :30_000 +end +``` + +This injects a bunch of code in this `use`-ing module that provides `Supervisor` wiring and also the `each/2` function that provides parallel execution of work utilizing the workers in the pool, in parallel. + +This library is not intended for distribution; it provides no guarantees in that regard. + +### Use cases + +The author has successfully used this library in the following scenarios: + +- In a job: to have an unique VM/node-wide task pool in order to ensure small load to a very limited external resource (a 3rd party API). We have given very generous timeout values to the task pool and just let our background workers call the pool's `each/2` function without worrying about manually throttling the access to the external resource. This has worked very well and allowed us to avoid HTTP 429 errors until we later eventually moved to Oban Pro. +- In a personal project: this task pool has been useful for the author's financial trading bot experiments as it allows processing of huge amounts of data pieces without much considerations of the OTP wiring involved -- and the Elixir API used by this micro library allows proper supervision and process linking so any errors have become immediately apparent and were easy to remedy. + +In conclusion, I don't claim that this is a game-breakingly useful library. It was half (1) an exercise in understanding and utilizing OTP to the best of its abilities to do a lot of parallel work, and (2) half a way to reduce code boilerplate and provide better abstractions in several projects, professional and hobby alike. diff --git a/lib/busy_bee.ex b/lib/busy_bee.ex new file mode 100644 index 0000000..4594d02 --- /dev/null +++ b/lib/busy_bee.ex @@ -0,0 +1,148 @@ +defmodule BusyBee do + @moduledoc """ + This is a module that's meant to be `use`-d so it injects code in the using module; + it allows the using module to be a supervisor for a named task pool. + + Special attention deserves the `each` function that allows a list be processed + with a given function through the task pool, in parallel. + """ + + alias BusyBee.{Options, Tools} + + defmacro __using__(opts) do + quote bind_quoted: [opts: opts] do + use Supervisor + + @name Keyword.get(opts, :name, __MODULE__) + @opts opts |> Options.new() |> Keyword.put(:name, @name) + @workers Keyword.fetch!(@opts, :workers) + @callers Keyword.fetch!(@opts, :callers) + @caller_supervisor Keyword.fetch!(@opts, :caller_supervisor) + @call_timeout Keyword.fetch!(@opts, :call_timeout) + @shutdown_timeout Keyword.fetch!(@opts, :shutdown_timeout) + + @doc """ + The `Supervisor` id / name of this worker pool. NOTE: not specifying a name will register + and start a global pool that *cannot* be started more than once. + + Defaults to the name of the module that does `use BusyBee`. + """ + def name(), do: @name + + @doc """ + The amount of workers this pool will distribute tasks to. + The worker processes are *always* live; they do NOT get stopped when there is no work. + There will never be more than this amount of processes doing work in parallel for this pool. + + Defaults to `System.schedulers_online()`. + """ + def workers(), do: @workers + + @doc """ + The amount of throwaway processes that this pool will spawn that will do `GenServer.call` + on the workers. Can never be smaller than the worker count. This setting controls worker + contention; if it's the same value as `workers` then the pool is doing 1:1 full parallel + execution. If the callers are more than the workers then the callers will each wait their + turn to receive a designated worker that calls their function. + + Defaults to the worker count. + """ + def callers(), do: @callers + + @doc """ + The name of the `Task.Supervisor` under which you want the throwaway caller processes + to be spawned. NOTE: you have to have started this supervisor on your own beforehand. + + This library starts a default one if you don't want to start your own; so just omit + this option and the caller (throwaway) processes will be started under it. + """ + def caller_supervisor(), do: @caller_supervisor + + @doc """ + The caller (throwaway process) timeout; the callers do a `GenServer.call` with + infinite timeout so the work itself will not be disrupted but the callers can and will + be killed if they exceed this timeout. + + Defaults to `5_000` ms. + """ + def call_timeout(), do: @call_timeout + + @doc """ + How much milliseconds to wait for a worker to shutdown when the supervisor is stopped. + + Defaults to `15_000` ms. + """ + def shutdown_timeout(), do: @shutdown_timeout + + @doc """ + Return the pool options. + """ + def opts(), do: @opts + + def child_spec(child_spec_opts) do + # Whatever is given in the childspec (inside the app supervision tree) + # always takes a precendence over the options given to the `use` macro. + start_link_opts = + Keyword.merge(@opts, child_spec_opts, fn _k, v1, v2 -> v2 || v1 end) + + %{ + id: @name, + restart: :permanent, + shutdown: @shutdown_timeout, + start: {__MODULE__, :start_link, [start_link_opts]}, + type: :supervisor + } + end + + def start_link(opts) do + Supervisor.start_link(__MODULE__, opts, name: @name) + end + + @impl Supervisor + def init(opts) do + shutdown_timeout = Keyword.fetch!(opts, :shutdown_timeout) + + children = + for i <- 1..@workers do + worker_name = Module.concat([@name, BusyBee.Worker, Integer.to_string(i)]) + + %{ + id: worker_name, + restart: :permanent, + shutdown: shutdown_timeout, + start: {BusyBee.Worker, :start_link, [worker_name]}, + type: :worker + } + end + + Supervisor.init(children, strategy: :one_for_one) + end + + @doc """ + Process all items in the input list with the given function and using the workers + in this pool. + """ + def each(items, fun) when is_function(fun, 1) do + worker_ids = Tools.worker_ids(@name) + task_and_worker_id_pairs = Tools.zip_cycle(items, worker_ids) + + Task.Supervisor.async_stream_nolink( + @caller_supervisor, + task_and_worker_id_pairs, + fn {item, worker_id} -> + # The core value proposition of this library: namely a VM-wide task pool. + # We serialize calls to the same worker so we never run more than the configured + # amount of them at the same time. We might have a huge amount of callers but the + # amount of workers is limited and callers will wait their turn. + # This here is the caller. + GenServer.call(worker_id, {:run, fun, item}, :infinity) + end, + max_concurrency: @callers, + timeout: @call_timeout, + on_timeout: :kill_task + ) + |> Stream.run() + end + end + end +end diff --git a/lib/busy_bee/application.ex b/lib/busy_bee/application.ex new file mode 100644 index 0000000..c9e47ca --- /dev/null +++ b/lib/busy_bee/application.ex @@ -0,0 +1,15 @@ +defmodule BusyBee.Application do + @moduledoc false + + use Application + + @impl true + def start(_type, _args) do + children = [ + {Task.Supervisor, name: BusyBee.CallerSupervisor} + ] + + opts = [strategy: :one_for_one, name: BusyBee.AppSupervisor] + Supervisor.start_link(children, opts) + end +end diff --git a/lib/busy_bee/options.ex b/lib/busy_bee/options.ex new file mode 100644 index 0000000..b569d80 --- /dev/null +++ b/lib/busy_bee/options.ex @@ -0,0 +1,45 @@ +defmodule BusyBee.Options do + @defaults [ + caller_supervisor: BusyBee.CallerSupervisor, + call_timeout: 5_000, + shutdown_timeout: 15_000, + workers: System.schedulers_online(), + callers: nil, + name: BusyBee + ] + + @doc """ + This function: + - `Keyword.merge`s the defaults with the given options; + - Makes sure that `:workers` and `:callers` are >= 2 (uses `System.schedulers_online()` if not); + - Makes sure that caller count is never less than worker count; + - Returns the modified options keyword list. + """ + def new(opts) do + # Merges the defaults with the given options and makes sure worker count is valid. + opts = + @defaults + |> Keyword.merge(opts) + |> replace_if_invalid( + :workers, + fn x -> is_integer(x) and x >= 2 end, + System.schedulers_online() + ) + + workers = Keyword.fetch!(opts, :workers) + + # Makes sure the caller count is valid and is never less than the worker count. + replace_if_invalid(opts, :callers, fn x -> is_integer(x) and x >= workers end, workers) + end + + defp replace_if_invalid(opts, key, validator_fn, default_value) + when is_list(opts) and is_atom(key) and is_function(validator_fn, 1) do + value = Keyword.get(opts, key) + + if validator_fn.(value) do + opts + else + Keyword.put(opts, key, default_value) + end + end +end diff --git a/lib/busy_bee/supervisor.ex b/lib/busy_bee/supervisor.ex new file mode 100644 index 0000000..b68d49a --- /dev/null +++ b/lib/busy_bee/supervisor.ex @@ -0,0 +1,41 @@ +defmodule BusyBee.Supervisor do + use Supervisor + + alias BusyBee.Options + + def child_spec(opts) do + opts = Options.new(opts) + name = Keyword.fetch!(opts, :name) + shutdown_timeout = Keyword.fetch!(opts, :shutdown_timeout) + + %{ + id: name, + restart: :permanent, + shutdown: shutdown_timeout, + start: {__MODULE__, :start_link, [opts]}, + type: :supervisor + } + end + + def start_link(opts) do + opts = Options.new(opts) + name = Keyword.fetch!(opts, :name) + Supervisor.start_link(__MODULE__, opts, name: name) + end + + @impl Supervisor + def init(opts) do + opts = Options.new(opts) + name = Keyword.fetch!(opts, :name) + workers = Keyword.fetch!(opts, :workers) + + children = + for i <- 1..workers do + opts + |> Keyword.put(:name, Module.concat([name, BusyBee.Worker, Integer.to_string(i)])) + |> BusyBee.Worker.child_spec() + end + + Supervisor.init(children, strategy: :one_for_one) + end +end diff --git a/lib/busy_bee/tools.ex b/lib/busy_bee/tools.ex new file mode 100644 index 0000000..23e1e64 --- /dev/null +++ b/lib/busy_bee/tools.ex @@ -0,0 +1,47 @@ +defmodule BusyBee.Tools do + @doc """ + Finds the identifiers of all workers in a pool with the specified name. + Each of the identifiers can be used as a first argument to `GenServer.call`. + """ + def worker_ids(name) do + name + |> Supervisor.which_children() + |> Enum.filter(&valid_worker?/1) + |> Enum.map(&worker_id/1) + end + + @doc """ + Checks if a value returned by `Supervisor.which_children/1` is a valid worker, i.e. one with + non-`nil` and non-`:unfedined` ID and of the `:worker` type. + """ + def valid_worker?({nil, _pid, _type, _modules}), do: false + def valid_worker?({:undefined, _pid, _type, _modules}), do: false + def valid_worker?({_id, _pid, :worker, _modules}), do: true + def valid_worker?(_), do: false + + @doc """ + Extracts an ID from a value returned by `Supervisor.which_children/1` and only if it's a worker. + Returns `nil` on any other input shape. + """ + def worker_id({id, _pid, :worker, _modules}), do: id + def worker_id(_), do: nil + + @doc """ + Similar to `Enum.zip/2` but also wraps around the second list argument f.ex. + `zip_cycle([1, 2, 3, 4], [:x, :y, :z])` yields `[{1, :x}, {2, :y}, {3, :z}, {4, :x}]`. + It never produces more items than the length of the first list. + """ + def zip_cycle([h0 | t0] = _l0, [h1 | t1] = l1), + do: zip_cycle(t0, t1, l1, [{h0, h1}]) + + def zip_cycle(_l0, _l1), do: [] + + defp zip_cycle([h0 | t0], [h1 | t1], l1, acc), + do: zip_cycle(t0, t1, l1, [{h0, h1} | acc]) + + defp zip_cycle([h0 | t0], [], [h1 | t1] = l1, acc), + do: zip_cycle(t0, t1, l1, [{h0, h1} | acc]) + + defp zip_cycle([], _, _, acc), + do: :lists.reverse(acc) +end diff --git a/lib/busy_bee/worker.ex b/lib/busy_bee/worker.ex new file mode 100644 index 0000000..3185839 --- /dev/null +++ b/lib/busy_bee/worker.ex @@ -0,0 +1,38 @@ +defmodule BusyBee.Worker do + @doc """ + This is a `GenServer` that accepts a function with a single argument and executes it. + """ + + use GenServer + + # Server (callbacks) + + @impl GenServer + def init(_state) do + {:ok, _state = []} + end + + @impl GenServer + def handle_call({:run, fun, arg}, _from, _state = []) when is_function(fun, 1) do + {:reply, _return_value = fun.(arg), _state = []} + end + + # Client + + def child_spec(opts) do + name = Keyword.get(opts, :name, __MODULE__) + shutdown_timeout = Keyword.get(opts, :shutdown_timeout, 15_000) + + %{ + id: name, + restart: :permanent, + shutdown: shutdown_timeout, + start: {__MODULE__, :start_link, [name]}, + type: :worker + } + end + + def start_link(name) do + GenServer.start_link(__MODULE__, _state = [], name: name) + end +end diff --git a/mix.exs b/mix.exs new file mode 100644 index 0000000..387fb97 --- /dev/null +++ b/mix.exs @@ -0,0 +1,24 @@ +defmodule BusyBee.MixProject do + use Mix.Project + + def project do + [ + app: :busy_bee, + version: "0.1.0", + elixir: "~> 1.13", + start_permanent: Mix.env() == :prod, + deps: deps() + ] + end + + def application do + [ + extra_applications: [:logger], + mod: {BusyBee.Application, []} + ] + end + + defp deps do + [] + end +end diff --git a/mix.lock b/mix.lock new file mode 100644 index 0000000..0ac823b --- /dev/null +++ b/mix.lock @@ -0,0 +1,2 @@ +%{ +} diff --git a/test/busy_bee_test.exs b/test/busy_bee_test.exs new file mode 100644 index 0000000..6f12040 --- /dev/null +++ b/test/busy_bee_test.exs @@ -0,0 +1,30 @@ +defmodule BusyBeeTest do + use ExUnit.Case + doctest BusyBee + + test "check if options are passed and stored correctly" do + defmodule TestPool_0 do + use BusyBee, + workers: 3, + callers: 5, + call_timeout: 6_000, + shutdown_timeout: 30_000 + end + + assert TestPool_0.opts() == [ + name: BusyBeeTest.TestPool_0, + caller_supervisor: BusyBee.CallerSupervisor, + workers: 3, + callers: 5, + call_timeout: 6000, + shutdown_timeout: 30000 + ] + + assert TestPool_0.name() == BusyBeeTest.TestPool_0 + assert TestPool_0.caller_supervisor() == BusyBee.CallerSupervisor + assert TestPool_0.workers() == 3 + assert TestPool_0.callers() == 5 + assert TestPool_0.call_timeout() == 6000 + assert TestPool_0.shutdown_timeout() == 30000 + end +end diff --git a/test/options_test.exs b/test/options_test.exs new file mode 100644 index 0000000..6b77a12 --- /dev/null +++ b/test/options_test.exs @@ -0,0 +1,59 @@ +defmodule OptionsTest do + use ExUnit.Case + doctest BusyBee.Options + alias BusyBee.Options + + @default_workers System.schedulers_online() + + describe "worker count" do + test "value of nil yields a default" do + assert Options.new(workers: nil) |> Keyword.get(:workers) == @default_workers + end + + test "value of zero yields a default" do + assert Options.new(workers: 0) |> Keyword.get(:workers) == @default_workers + end + + test "value of one yields a default" do + assert Options.new(workers: 1) |> Keyword.get(:workers) == @default_workers + end + + test "value of two works" do + assert Options.new(workers: 2) |> Keyword.get(:workers) == 2 + end + + test "wrong type value yields a default" do + assert Options.new(workers: "x") |> Keyword.get(:workers) == @default_workers + end + end + + describe "caller count" do + test "value of nil yields a default" do + assert Options.new(callers: nil) |> Keyword.get(:callers) == @default_workers + end + + test "value of zero yields a default" do + assert Options.new(callers: 0) |> Keyword.get(:callers) == @default_workers + end + + test "value of one yields a default" do + assert Options.new(callers: 1) |> Keyword.get(:callers) == @default_workers + end + + test "value of two works" do + assert Options.new(workers: 2, callers: 2) |> Keyword.get(:callers) == 2 + end + + test "wrong type value yields a default" do + assert Options.new(callers: "x") |> Keyword.get(:callers) == @default_workers + end + + test "can never be less than the worker count" do + assert Options.new(workers: 3, callers: 2) |> Keyword.get(:callers) == 3 + end + + test "default is equal to worker count" do + assert Options.new(workers: 7) |> Keyword.get(:callers) == 7 + end + end +end diff --git a/test/test_helper.exs b/test/test_helper.exs new file mode 100644 index 0000000..869559e --- /dev/null +++ b/test/test_helper.exs @@ -0,0 +1 @@ +ExUnit.start() diff --git a/test/tools_test.exs b/test/tools_test.exs new file mode 100644 index 0000000..e92691c --- /dev/null +++ b/test/tools_test.exs @@ -0,0 +1,79 @@ +defmodule BusyBee.ToolsTest do + use ExUnit.Case + doctest BusyBee.Tools + + alias BusyBee.Tools + + describe "zip_cycle/2" do + test "two empty lists yield an empty list" do + assert Tools.zip_cycle([], []) == [] + end + + test "two lists with single elements work" do + assert Tools.zip_cycle([1], [2]) == [{1, 2}] + end + + test "does not wrap around the first list" do + assert Tools.zip_cycle([1, 2], [:a, :b, :c]) == [{1, :a}, {2, :b}] + end + + test "wraps around the second list" do + assert Tools.zip_cycle([1, 2, 3], [:a, :b]) == [{1, :a}, {2, :b}, {3, :a}] + end + + test "returns empty list on invalid input" do + assert [] == Tools.zip_cycle(nil, :huh) + assert [] == Tools.zip_cycle("hello", 123.456) + assert [] == Tools.zip_cycle(false, <<11, 17, 98>>) + end + end + + describe "worker_id/1" do + test "finds the ID of a valid worker spec" do + assert Tools.worker_id({:a_name, 123, :worker, [List, Enum]}) == :a_name + end + + test "returns nil when given an invalid worker spec" do + assert nil == Tools.worker_id({:a_name, "other", :stuff, "here"}) + end + end + + describe("valid_worker?/1") do + test "nil id yields false" do + refute Tools.valid_worker?({nil, 123, :supervisor, [List, Enum]}) + end + + test ":undefined id yields false" do + refute Tools.valid_worker?({:undefined, 123, :worker, [List, Enum]}) + end + + test "valid worker id yields true" do + assert Tools.valid_worker?({:a, 123, :worker, [List, Enum]}) + end + + test "valid supervisor id yields false" do + refute Tools.valid_worker?({:a, 123, :supervisor, [List, Enum]}) + end + + test "non-tuple value yields false" do + refute Tools.valid_worker?([:whatever, 123, "hello"]) + end + end + + describe "worker_ids/1" do + test "can find the worker IDs of a successfully started pool" do + spec = + {BusyBee.Supervisor, + name: Whatever, workers: 3, callers: 5, spawn_timeout: 5_000, shutdown_timeout: 30_000} + + pool = start_supervised!(spec) + assert is_pid(pool) + + assert Whatever |> Tools.worker_ids() |> Enum.sort() == [ + Module.concat([Whatever, BusyBee.Worker, "1"]), + Module.concat([Whatever, BusyBee.Worker, "2"]), + Module.concat([Whatever, BusyBee.Worker, "3"]) + ] + end + end +end