Skip to content

Commit

Permalink
Initial release
Browse files Browse the repository at this point in the history
  • Loading branch information
dimitarvp committed Oct 25, 2024
0 parents commit 5386b46
Show file tree
Hide file tree
Showing 15 changed files with 587 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .formatter.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Used by "mix format"
[
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"]
]
26 changes: 26 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# The directory Mix will write compiled artifacts to.
/_build/

# If you run "mix test --cover", coverage assets end up here.
/cover/

# The directory Mix downloads your dependencies sources to.
/deps/

# Where third-party dependencies like ExDoc output generated docs.
/doc/

# Ignore .fetch files in case you like to edit your project deps locally.
/.fetch

# If the VM crashes, it generates a dump, let's ignore it too.
erl_crash.dump

# Also ignore archive artifacts (built via "mix archive.build").
*.ez

# Ignore package tarball (built via "mix hex.build").
busy_bee-*.tar

# Temporary files, for example, from tests.
/tmp/
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# BusyBee

### Synopsis

This is a micro task pool library that provides a unique VM/node-wide task pool named by the user (so you can have any number of them as long as the names are unique) whose gimmick is that it keeps the worker processes alive for as long as the task pool itself (a `Supervisor`) is alive. When work needs to be done (via the `each/2` function) it spawns throwaway processes to process an input list of items with a given function, using the workers in the pool, in parallel. The amount of these throwaway processes (we call them "callers") can be equal to or more than the worker count -- but never less.

### Usage

The intended usage of this task pool is to define a module and do `use BusyBee` inside it, akin to an `Ecto.Repo`, like so:

```
defmodule MyApp.SendEmails do
use BusyBee, workers: 3, call_timeout: :infinity, shutdown_timeout: :30_000
end
```

This injects a bunch of code in this `use`-ing module that provides `Supervisor` wiring and also the `each/2` function that provides parallel execution of work utilizing the workers in the pool, in parallel.

This library is not intended for distribution; it provides no guarantees in that regard.

### Use cases

The author has successfully used this library in the following scenarios:

- In a job: to have an unique VM/node-wide task pool in order to ensure small load to a very limited external resource (a 3rd party API). We have given very generous timeout values to the task pool and just let our background workers call the pool's `each/2` function without worrying about manually throttling the access to the external resource. This has worked very well and allowed us to avoid HTTP 429 errors until we later eventually moved to Oban Pro.
- In a personal project: this task pool has been useful for the author's financial trading bot experiments as it allows processing of huge amounts of data pieces without much considerations of the OTP wiring involved -- and the Elixir API used by this micro library allows proper supervision and process linking so any errors have become immediately apparent and were easy to remedy.

In conclusion, I don't claim that this is a game-breakingly useful library. It was half (1) an exercise in understanding and utilizing OTP to the best of its abilities to do a lot of parallel work, and (2) half a way to reduce code boilerplate and provide better abstractions in several projects, professional and hobby alike.
148 changes: 148 additions & 0 deletions lib/busy_bee.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
defmodule BusyBee do
@moduledoc """
This is a module that's meant to be `use`-d so it injects code in the using module;
it allows the using module to be a supervisor for a named task pool.
Special attention deserves the `each` function that allows a list be processed
with a given function through the task pool, in parallel.
"""

alias BusyBee.{Options, Tools}

defmacro __using__(opts) do
quote bind_quoted: [opts: opts] do
use Supervisor

@name Keyword.get(opts, :name, __MODULE__)
@opts opts |> Options.new() |> Keyword.put(:name, @name)
@workers Keyword.fetch!(@opts, :workers)
@callers Keyword.fetch!(@opts, :callers)
@caller_supervisor Keyword.fetch!(@opts, :caller_supervisor)
@call_timeout Keyword.fetch!(@opts, :call_timeout)
@shutdown_timeout Keyword.fetch!(@opts, :shutdown_timeout)

@doc """
The `Supervisor` id / name of this worker pool. NOTE: not specifying a name will register
and start a global pool that *cannot* be started more than once.
Defaults to the name of the module that does `use BusyBee`.
"""
def name(), do: @name

@doc """
The amount of workers this pool will distribute tasks to.
The worker processes are *always* live; they do NOT get stopped when there is no work.
There will never be more than this amount of processes doing work in parallel for this pool.
Defaults to `System.schedulers_online()`.
"""
def workers(), do: @workers

@doc """
The amount of throwaway processes that this pool will spawn that will do `GenServer.call`
on the workers. Can never be smaller than the worker count. This setting controls worker
contention; if it's the same value as `workers` then the pool is doing 1:1 full parallel
execution. If the callers are more than the workers then the callers will each wait their
turn to receive a designated worker that calls their function.
Defaults to the worker count.
"""
def callers(), do: @callers

@doc """
The name of the `Task.Supervisor` under which you want the throwaway caller processes
to be spawned. NOTE: you have to have started this supervisor on your own beforehand.
This library starts a default one if you don't want to start your own; so just omit
this option and the caller (throwaway) processes will be started under it.
"""
def caller_supervisor(), do: @caller_supervisor

@doc """
The caller (throwaway process) timeout; the callers do a `GenServer.call` with
infinite timeout so the work itself will not be disrupted but the callers can and will
be killed if they exceed this timeout.
Defaults to `5_000` ms.
"""
def call_timeout(), do: @call_timeout

@doc """
How much milliseconds to wait for a worker to shutdown when the supervisor is stopped.
Defaults to `15_000` ms.
"""
def shutdown_timeout(), do: @shutdown_timeout

@doc """
Return the pool options.
"""
def opts(), do: @opts

def child_spec(child_spec_opts) do
# Whatever is given in the childspec (inside the app supervision tree)
# always takes a precendence over the options given to the `use` macro.
start_link_opts =
Keyword.merge(@opts, child_spec_opts, fn _k, v1, v2 -> v2 || v1 end)

%{
id: @name,
restart: :permanent,
shutdown: @shutdown_timeout,
start: {__MODULE__, :start_link, [start_link_opts]},
type: :supervisor
}
end

def start_link(opts) do
Supervisor.start_link(__MODULE__, opts, name: @name)
end

@impl Supervisor
def init(opts) do
shutdown_timeout = Keyword.fetch!(opts, :shutdown_timeout)

children =
for i <- 1..@workers do
worker_name = Module.concat([@name, BusyBee.Worker, Integer.to_string(i)])

%{
id: worker_name,
restart: :permanent,
shutdown: shutdown_timeout,
start: {BusyBee.Worker, :start_link, [worker_name]},
type: :worker
}
end

Supervisor.init(children, strategy: :one_for_one)
end

@doc """
Process all items in the input list with the given function and using the workers
in this pool.
"""
def each(items, fun) when is_function(fun, 1) do
worker_ids = Tools.worker_ids(@name)
task_and_worker_id_pairs = Tools.zip_cycle(items, worker_ids)

Task.Supervisor.async_stream_nolink(
@caller_supervisor,
task_and_worker_id_pairs,
fn {item, worker_id} ->
# The core value proposition of this library: namely a VM-wide task pool.
# We serialize calls to the same worker so we never run more than the configured
# amount of them at the same time. We might have a huge amount of callers but the
# amount of workers is limited and callers will wait their turn.
# This here is the caller.
GenServer.call(worker_id, {:run, fun, item}, :infinity)
end,
max_concurrency: @callers,
timeout: @call_timeout,
on_timeout: :kill_task
)
|> Stream.run()
end
end
end
end
15 changes: 15 additions & 0 deletions lib/busy_bee/application.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
defmodule BusyBee.Application do
@moduledoc false

use Application

@impl true
def start(_type, _args) do
children = [
{Task.Supervisor, name: BusyBee.CallerSupervisor}
]

opts = [strategy: :one_for_one, name: BusyBee.AppSupervisor]
Supervisor.start_link(children, opts)
end
end
45 changes: 45 additions & 0 deletions lib/busy_bee/options.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
defmodule BusyBee.Options do
@defaults [
caller_supervisor: BusyBee.CallerSupervisor,
call_timeout: 5_000,
shutdown_timeout: 15_000,
workers: System.schedulers_online(),
callers: nil,
name: BusyBee
]

@doc """
This function:
- `Keyword.merge`s the defaults with the given options;
- Makes sure that `:workers` and `:callers` are >= 2 (uses `System.schedulers_online()` if not);
- Makes sure that caller count is never less than worker count;
- Returns the modified options keyword list.
"""
def new(opts) do
# Merges the defaults with the given options and makes sure worker count is valid.
opts =
@defaults
|> Keyword.merge(opts)
|> replace_if_invalid(
:workers,
fn x -> is_integer(x) and x >= 2 end,
System.schedulers_online()
)

workers = Keyword.fetch!(opts, :workers)

# Makes sure the caller count is valid and is never less than the worker count.
replace_if_invalid(opts, :callers, fn x -> is_integer(x) and x >= workers end, workers)
end

defp replace_if_invalid(opts, key, validator_fn, default_value)
when is_list(opts) and is_atom(key) and is_function(validator_fn, 1) do
value = Keyword.get(opts, key)

if validator_fn.(value) do
opts
else
Keyword.put(opts, key, default_value)
end
end
end
41 changes: 41 additions & 0 deletions lib/busy_bee/supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
defmodule BusyBee.Supervisor do
use Supervisor

alias BusyBee.Options

def child_spec(opts) do
opts = Options.new(opts)
name = Keyword.fetch!(opts, :name)
shutdown_timeout = Keyword.fetch!(opts, :shutdown_timeout)

%{
id: name,
restart: :permanent,
shutdown: shutdown_timeout,
start: {__MODULE__, :start_link, [opts]},
type: :supervisor
}
end

def start_link(opts) do
opts = Options.new(opts)
name = Keyword.fetch!(opts, :name)
Supervisor.start_link(__MODULE__, opts, name: name)
end

@impl Supervisor
def init(opts) do
opts = Options.new(opts)
name = Keyword.fetch!(opts, :name)
workers = Keyword.fetch!(opts, :workers)

children =
for i <- 1..workers do
opts
|> Keyword.put(:name, Module.concat([name, BusyBee.Worker, Integer.to_string(i)]))
|> BusyBee.Worker.child_spec()
end

Supervisor.init(children, strategy: :one_for_one)
end
end
47 changes: 47 additions & 0 deletions lib/busy_bee/tools.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
defmodule BusyBee.Tools do
@doc """
Finds the identifiers of all workers in a pool with the specified name.
Each of the identifiers can be used as a first argument to `GenServer.call`.
"""
def worker_ids(name) do
name
|> Supervisor.which_children()
|> Enum.filter(&valid_worker?/1)
|> Enum.map(&worker_id/1)
end

@doc """
Checks if a value returned by `Supervisor.which_children/1` is a valid worker, i.e. one with
non-`nil` and non-`:unfedined` ID and of the `:worker` type.
"""
def valid_worker?({nil, _pid, _type, _modules}), do: false
def valid_worker?({:undefined, _pid, _type, _modules}), do: false
def valid_worker?({_id, _pid, :worker, _modules}), do: true
def valid_worker?(_), do: false

@doc """
Extracts an ID from a value returned by `Supervisor.which_children/1` and only if it's a worker.
Returns `nil` on any other input shape.
"""
def worker_id({id, _pid, :worker, _modules}), do: id
def worker_id(_), do: nil

@doc """
Similar to `Enum.zip/2` but also wraps around the second list argument f.ex.
`zip_cycle([1, 2, 3, 4], [:x, :y, :z])` yields `[{1, :x}, {2, :y}, {3, :z}, {4, :x}]`.
It never produces more items than the length of the first list.
"""
def zip_cycle([h0 | t0] = _l0, [h1 | t1] = l1),
do: zip_cycle(t0, t1, l1, [{h0, h1}])

def zip_cycle(_l0, _l1), do: []

defp zip_cycle([h0 | t0], [h1 | t1], l1, acc),
do: zip_cycle(t0, t1, l1, [{h0, h1} | acc])

defp zip_cycle([h0 | t0], [], [h1 | t1] = l1, acc),
do: zip_cycle(t0, t1, l1, [{h0, h1} | acc])

defp zip_cycle([], _, _, acc),
do: :lists.reverse(acc)
end
Loading

0 comments on commit 5386b46

Please sign in to comment.