Skip to content

Commit

Permalink
Support customizing how the work list is handled
Browse files Browse the repository at this point in the history
  • Loading branch information
jfcloutier authored and fhunleth committed Jan 11, 2022
1 parent b1251c1 commit 9ca6c0f
Show file tree
Hide file tree
Showing 8 changed files with 432 additions and 226 deletions.
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,20 @@ service and time to live.

<!-- MDOC !-->

## Configuration

Jackalope puts the publish commands on a work list before it sends them to Tortoise311.
The work list moves the commands from waiting to be sent, to pending (sent and waiting for a response),
to discarded when confirmed by Tortoise311 as processed or when they are expired.

The work list has a maximum size which defaults to 100. Only a maximum number of publish commands
can wait, should Tortoise311 be temporarily disconnected, to be forwarded to Tortoise311.

You can set the Jackalope.start_link/1 `:work_list_mod` option to the desired work list implementation.
See the documentation for module `Jackalope`.

<!-- MDOC !-->

## Installation

If [available in Hex](https://hex.pm/docs/publish), the package can be
Expand Down
14 changes: 12 additions & 2 deletions lib/jackalope.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ defmodule Jackalope do
}

@default_max_work_list_size 100
@default_work_list_module Jackalope.TransientWorkList

@doc """
Start a Jackalope session
Expand Down Expand Up @@ -74,6 +75,11 @@ defmodule Jackalope do
customize_hostname_check: [match_fun: :public_key.pkix_verify_hostname_match_fun(:https)]
]
- `work_list_mod` names the module implementing the Jackalope WorkList protocol that will be used to manage
the publish commands sent to Tortoise by the Jackalope Session.
The module must also implement the function `@spec new(function(), function(), non_neg_integer(), Keyword.t()) :: any()`.
See Jackalope.TransientWorkList (the default) for examples.
- `max_work_list_size` (default: #{@default_max_work_list_size}) specifies the maximum
number of unexpired work orders Jackalope will retain in its work list
(the commands yet to be sent to the MQTT server). When the maximum is
Expand Down Expand Up @@ -113,18 +119,22 @@ defmodule Jackalope do
jackalope_handler = Keyword.get(opts, :handler, Jackalope.Handler.Logger)
max_work_list_size = Keyword.get(opts, :max_work_list_size, @default_max_work_list_size)

work_list_mod = Keyword.get(opts, :work_list_mod, @default_work_list_module)

children = [
{Jackalope.Session,
[
handler: jackalope_handler,
max_work_list_size: max_work_list_size
max_work_list_size: max_work_list_size,
work_list_mod: work_list_mod
]},
{Jackalope.Supervisor,
[
handler: jackalope_handler,
client_id: client_id,
connection_options: connection_options(opts),
last_will: Keyword.get(opts, :last_will)
last_will: Keyword.get(opts, :last_will),
work_list_mod: work_list_mod
]}
]

Expand Down
85 changes: 48 additions & 37 deletions lib/jackalope/session.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ defmodule Jackalope.Session do

alias __MODULE__, as: State
alias Jackalope.{TortoiseClient, WorkList}
alias Jackalope.WorkList.Expiration

require Logger

Expand Down Expand Up @@ -73,8 +74,15 @@ defmodule Jackalope.Session do
def init(opts) do
handler = Keyword.fetch!(opts, :handler)
max_work_list_size = Keyword.fetch!(opts, :max_work_list_size)
work_list_mod = Keyword.fetch!(opts, :work_list_mod)

work_list = WorkList.new([], max_work_list_size)
work_list =
work_list_mod.new(
fn {_cmd, opts} -> Keyword.fetch!(opts, :expiration) end,
fn {cmd, opts}, expiration -> {cmd, Keyword.put(opts, :expiration, expiration)} end,
max_work_list_size,
opts
)

initial_state = %State{
work_list: work_list,
Expand Down Expand Up @@ -118,7 +126,7 @@ defmodule Jackalope.Session do

state = %State{
state
| work_list: WorkList.push(state.work_list, work_item)
| work_list: WorkList.push(work_list, work_item)
}

{:noreply, state}
Expand All @@ -132,7 +140,7 @@ defmodule Jackalope.Session do
# to stay in the work list before it is deemed irrelevant
ttl = Keyword.get(opts, :ttl, @default_ttl_msecs)

expiration = WorkList.expiration(ttl)
expiration = Expiration.expiration(ttl)

# Note that we don't really concern ourselves with the order of
# the commands; the work_list is a list (and thus a stack) and when
Expand Down Expand Up @@ -172,41 +180,42 @@ defmodule Jackalope.Session do
work_list: work_list
} = state
) do
if WorkList.empty?(work_list) do
{:noreply, state}
else
{cmd, opts} = WorkList.peek(work_list)
expiration = Keyword.fetch!(opts, :expiration)

if expiration > WorkList.expiration(0) do
case execute_work(cmd) do
:ok ->
# fire and forget work; Publish with QoS=0 is among the work
# that doesn't produce references
{:noreply, %State{state | work_list: WorkList.pop(work_list)},
{:continue, :consume_work_list}}

{:ok, ref} ->
state = %State{
state
| work_list: WorkList.pending(work_list, ref)
}

{:noreply, state, {:continue, :consume_work_list}}

{:error, reason} ->
Logger.warn("[Jackalope] Temporarily failed to execute #{inspect(cmd)}: #{reason}")
{:noreply, state}
end
else
# drop the message, it is outside of the time to live
if function_exported?(state.handler, :handle_error, 1) do
reason = {:publish_error, cmd, :ttl_expired}
state.handler.handle_error(reason)
end
case WorkList.peek(work_list) do
nil ->
{:noreply, state}

{:noreply, state, {:continue, :consume_work_list}}
end
{cmd, opts} ->
expiration = Keyword.fetch!(opts, :expiration)

if expired?(expiration) do
# drop the message, it is outside of the time to live
if function_exported?(state.handler, :handle_error, 1) do
reason = {:publish_error, cmd, :ttl_expired}
state.handler.handle_error(reason)
end

{:noreply, state, {:continue, :consume_work_list}}
else
case execute_work(cmd) do
:ok ->
# fire and forget work; Publish with QoS=0 is among the work
# that doesn't produce references
{:noreply, %State{state | work_list: WorkList.pop(work_list)},
{:continue, :consume_work_list}}

{:ok, ref} ->
state = %State{
state
| work_list: WorkList.pending(work_list, ref)
}

{:noreply, state, {:continue, :consume_work_list}}

{:error, reason} ->
Logger.warn("[Jackalope] Temporarily failed to execute #{inspect(cmd)}: #{reason}")
{:noreply, state}
end
end
end
end

Expand All @@ -215,4 +224,6 @@ defmodule Jackalope.Session do
defp execute_work({:publish, topic, payload, opts}) do
TortoiseClient.publish(topic, payload, opts)
end

defp expired?(expiration), do: Expiration.after?(Expiration.expiration(0), expiration)
end
188 changes: 188 additions & 0 deletions lib/jackalope/transient_work_list.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
defmodule Jackalope.TransientWorkList do
@moduledoc """
A simple work list that does not survive across reboots.
"""

alias Jackalope.WorkList.Expiration

require Logger
@default_max_size 100

defstruct items: [],
pending: %{},
max_size: @default_max_size,
expiration_fn: nil

@type t() :: %__MODULE__{items: list(), max_size: non_neg_integer()}

@doc "Create a new work list"
@spec new(function(), function(), non_neg_integer(), Keyword.t()) :: t()
def new(expiration_fn, _update_expiration_fn, max_size \\ @default_max_size, _opts \\ [])
when max_size > 0 do
%__MODULE__{max_size: max_size, expiration_fn: expiration_fn}
end

@doc false
@spec prepend(t(), list()) :: t()
def prepend(work_list, items) when is_list(items) do
updated_items =
(items ++ work_list.items)
|> bound_work_items(work_list)

%__MODULE__{work_list | items: updated_items}
end

@doc false
@spec bound_pending_items(any, atom | t()) :: any
def bound_pending_items(pending, work_list) do
if Enum.count(pending) > work_list.max_size do
# Trim expired pending requests
kept_pairs =
Enum.reduce(
pending,
[],
fn {ref, item}, acc ->
if Expiration.unexpired?(item, work_list.expiration_fn) do
[{ref, item} | acc]
else
acc
end
end
)

# If still over maximum, remove the oldest pending request (expiration is smallest)
if Enum.count(kept_pairs) > work_list.max_size do
[{ref, item} | newer_pairs] =
Enum.sort(kept_pairs, fn {_, item1}, {_, item2} ->
work_list.expiration_fn.(item1) < work_list.expiration_fn.(item2)
end)

Logger.info(
"[Jackalope] Maximum number of unexpired pending requests reached. Dropping #{inspect(item)}:#{inspect(ref)}."
)

newer_pairs
else
kept_pairs
end
|> Enum.into(%{})
else
pending
end
end

@doc false
@spec bound_work_items([any()], t()) :: [any()]
def bound_work_items(items, work_list) do
current_size = length(items)

if current_size <= work_list.max_size do
items
else
Logger.warn(
"[Jackalope] The worklist exceeds #{work_list.max_size} (#{current_size}). Looking to shrink it."
)

{active_items, deleted_count} = remove_expired_work_items(items, work_list.expiration_fn)

active_size = current_size - deleted_count

if active_size <= work_list.max_size do
active_items
else
{dropped, cropped_list} = List.pop_at(active_items, active_size - 1)

Logger.warn("[Jackalope] Dropped #{inspect(dropped)} from oversized work list")

cropped_list
end
end
end

defp remove_expired_work_items(items, expiration_fn) do
{list, count} =
Enum.reduce(
items,
{[], 0},
fn item, {active_list, deleted_count} ->
if Expiration.unexpired?(item, expiration_fn),
do: {[item | active_list], deleted_count},
else: {active_list, deleted_count + 1}
end
)

{Enum.reverse(list), count}
end
end

defimpl Jackalope.WorkList, for: Jackalope.TransientWorkList do
alias Jackalope.TransientWorkList
require Logger

@impl Jackalope.WorkList
def push(work_list, item) do
updated_items =
[item | work_list.items]
|> TransientWorkList.bound_work_items(work_list)

%TransientWorkList{work_list | items: updated_items}
end

@impl Jackalope.WorkList
def peek(work_list) do
List.first(work_list.items)
end

@impl Jackalope.WorkList
def pop(work_list) do
%TransientWorkList{work_list | items: tl(work_list.items)}
end

@impl Jackalope.WorkList
def pending(work_list, ref) do
item = hd(work_list.items)

updated_pending =
Map.put(work_list.pending, ref, item) |> TransientWorkList.bound_pending_items(work_list)

%TransientWorkList{
work_list
| items: tl(work_list.items),
pending: updated_pending
}
end

@impl Jackalope.WorkList
def reset_pending(work_list) do
pending_items = Map.values(work_list.pending)

TransientWorkList.prepend(%TransientWorkList{work_list | pending: %{}}, pending_items)
end

@impl Jackalope.WorkList
def done(work_list, ref) do
{item, pending} = Map.pop(work_list.pending, ref)
# item can be nil
{%TransientWorkList{work_list | pending: pending}, item}
end

@impl Jackalope.WorkList
def count(work_list) do
length(work_list.items)
end

@impl Jackalope.WorkList
def count_pending(work_list) do
Enum.count(work_list.pending)
end

@impl Jackalope.WorkList
def empty?(work_list) do
work_list.items == []
end

@impl Jackalope.WorkList
def remove_all(work_list) do
%TransientWorkList{work_list | items: [], pending: %{}}
end
end
Loading

0 comments on commit 9ca6c0f

Please sign in to comment.