Skip to content

Commit

Permalink
Merge pull request #290 from rabbitmq/md/update-projections-on-snapsh…
Browse files Browse the repository at this point in the history
…ot-install
  • Loading branch information
the-mikedavis authored Sep 4, 2024
2 parents b802393 + 7dde526 commit 1ad4a86
Show file tree
Hide file tree
Showing 6 changed files with 280 additions and 56 deletions.
2 changes: 1 addition & 1 deletion rebar.config
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
%% vim:ft=erlang:
{minimum_otp_vsn, "24.0"}.

{deps, [{ra, "2.13.6"},
{deps, [{ra, "2.14.0"},
{horus, "0.3.0"}]}.

{project_plugins, [rebar3_proper,
Expand Down
6 changes: 3 additions & 3 deletions rebar.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@
[{<<"aten">>,{pkg,<<"aten">>,<<"0.6.0">>},1},
{<<"gen_batch_server">>,{pkg,<<"gen_batch_server">>,<<"0.8.8">>},1},
{<<"horus">>,{pkg,<<"horus">>,<<"0.3.0">>},0},
{<<"ra">>,{pkg,<<"ra">>,<<"2.13.6">>},0},
{<<"ra">>,{pkg,<<"ra">>,<<"2.14.0">>},0},
{<<"seshat">>,{pkg,<<"seshat">>,<<"0.6.0">>},1}]}.
[
{pkg_hash,[
{<<"aten">>, <<"7A57B275A6DAF515AC3683FB9853E280B4D0DCDD74292FD66AC4A01C8694F8C7">>},
{<<"gen_batch_server">>, <<"7840A1FA63EE1EFFC83E8A91D22664847A2BA1192D30EAFFFD914ACB51578068">>},
{<<"horus">>, <<"0AAB22D88FA8159FFB13F83A782B0D9B0819C08DAFD4A4C4BEEA0C31F32B626A">>},
{<<"ra">>, <<"D0B2A33D38C2B20BCB7C898C6DB882BFE73B402B9B21FE08A8454BBD1A83E663">>},
{<<"ra">>, <<"AEB630926AE0D4A6EDE62C12CC4C3389DDD4E18B5E9CFB84369251B69FA9D257">>},
{<<"seshat">>, <<"3172EB1D7A2A4F66108CD6933A4E465AFF80F84AA90ED83F047B92F636123CCD">>}]},
{pkg_hash_ext,[
{<<"aten">>, <<"5F39A164206AE3F211EF5880B1F7819415686436E3229D30B6A058564FBAA168">>},
{<<"gen_batch_server">>, <<"C3E6A1A2A0FB62AEE631A98CFA0FD8903E9562422CBF72043953E2FB1D203017">>},
{<<"horus">>, <<"DABE0BD901159D31ADA944310302A155324FE421ADCB12BFC4BB7EABDD9625F8">>},
{<<"ra">>, <<"0BE7645DCE4A76EDD4C4642D0FA69639518C72B6B60A34FC86590D1909166AEB">>},
{<<"ra">>, <<"1D553DD971A0B398B7AF0FA8C8458DDA575715FF71C65C972E9500B24039B240">>},
{<<"seshat">>, <<"7CEF700F92831DD7CAE6A6DD223CCC55AC88ECCE0631EE9AB0F2B5FB70E79B90">>}]}
].
150 changes: 145 additions & 5 deletions src/khepri_machine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
handle_aux/6,
apply/3,
state_enter/2,
snapshot_installed/2,
snapshot_installed/4,
overview/1,
version/0,
which_module/1]).
Expand Down Expand Up @@ -1565,9 +1565,23 @@ state_enter(_StateName, _State) ->

%% @private

snapshot_installed(_Meta, _State) ->
SideEffect = {aux, restore_projections},
[SideEffect].
snapshot_installed(
#{machine_version := NewMacVer}, NewState,
#{machine_version := OldMacVer}, OldState) ->
%% A snapshot might be installed on a follower member who has fallen
%% sufficiently far behind in replication of the log from the leader. When
%% a member installs a snapshot it needs to update its projections: new
%% projections may have been registered since the snapshot or old ones
%% unregistered. Projections which did not change need to be triggered
%% with the new changes to state, similar to the `restore_projections' aux
%% effect. Also see `update_projections/2'.
%%
%% Note that the snapshot installation might bump the effective machine
%% version so we need to convert the old state to the new machine version.
OldState1 = convert_state(OldState, OldMacVer, NewMacVer),
ok = update_projections(OldState1, NewState),
ok = clear_compiled_projection_tree(),
[].

%% @private

Expand Down Expand Up @@ -1828,7 +1842,7 @@ create_projection_side_effects2(
delete ->
InitialTree
end,
khepri_pattern_tree:fold(
khepri_pattern_tree:fold_matching(
ProjectionTree,
PatternMatchingTree,
Path,
Expand Down Expand Up @@ -2331,3 +2345,129 @@ convert_state(State, 0, 1) ->
State1 = list_to_tuple(Fields1),
?assert(is_state(State1)),
State1.

-spec update_projections(OldState, NewState) -> ok when
OldState :: khepri_machine:state(),
NewState :: khepri_machine:state().
%% @doc Updates the machine's projections to account for changes between two
%% states.
%%
%% This is used when installing a projection - the state will jump from the
%% given `OldState' before the snapshot was installed to the given `NewState'
%% after. When we swap states we need to update the projections: the records
%% in the projection tables themselves but also which projection tables exist.
%% The changes glossed over by the snapshot may include projection
%% registrations and unregistrations so we need to initialize new projections
%% and delete unregistered ones, and we need to ensure that the projection
%% tables are up to date for any projections which didn't change.
%%
%% @private

update_projections(OldState, NewState) ->
OldTree = get_tree(OldState),
OldProjections = set_of_projections(get_projections(OldState)),
NewTree = get_tree(NewState),
NewProjections = set_of_projections(get_projections(NewState)),

CommonProjections = sets:intersection(OldProjections, NewProjections),
DeletedProjections = sets:subtract(OldProjections, CommonProjections),
CreatedProjections = sets:subtract(NewProjections, CommonProjections),

%% Tear down any projections which were unregistered.
sets:fold(
fun({_Pattern, Projection}, _Acc) ->
_ = khepri_projection:delete(Projection),
ok
end, ok, DeletedProjections),

%% Initialize any new projections which were registered.
sets:fold(
fun({Pattern, Projection}, _Acc) ->
ok = restore_projection(Projection, NewTree, Pattern)
end, ok, CreatedProjections),

%% Update in-place any projections which were not changed themselves (i.e.
%% the projection name, function and pattern) between old and new states.
%% To do this we will find the matching nodes in the old and new tree for
%% the projection's pattern and trigger the projection based on each
%% matching path's old and new properties.
sets:fold(
fun({Pattern, Projection}, _Acc) ->
ok = update_projection(Pattern, Projection, OldTree, NewTree)
end, ok, CommonProjections),

ok.

-spec set_of_projections(ProjectionTree) -> Projections when
ProjectionTree :: khepri_machine:projection_tree(),
Element :: {khepri_path:native_pattern(),
khepri_projection:projection()},
Projections :: sets:set(Element).
%% Folds the set of projections in a projection tree into a version 2 {@link
%% sets:set()}.
%%
%% @private

set_of_projections(ProjectionTree) ->
khepri_pattern_tree:fold(
ProjectionTree,
fun(Pattern, Projections, Acc) ->
lists:foldl(
fun(Projection, Acc1) ->
Entry = {Pattern, Projection},
sets:add_element(Entry, Acc1)
end, Acc, Projections)
end, sets:new([{version, 2}])).

update_projection(Pattern, Projection, OldTree, NewTree) ->
TreeOptions = #{props_to_return => ?PROJECTION_PROPS_TO_RETURN,
include_root_props => true},
case khepri_tree:find_matching_nodes(OldTree, Pattern, TreeOptions) of
{ok, OldMatchingNodes} ->
Result = khepri_tree:find_matching_nodes(
NewTree, Pattern, TreeOptions),
case Result of
{ok, NewMatchingNodes} ->
Updates = diff_matching_nodes(
OldMatchingNodes, NewMatchingNodes),
maps:foreach(
fun(Path, {OldProps, NewProps}) ->
khepri_projection:trigger(
Projection, Path, OldProps, NewProps)
end, Updates);
Error ->
?LOG_DEBUG(
"Failed to refresh projection ~s due to an error "
"finding matching nodes in the new tree: ~p",
[khepri_projection:name(Projection), Error],
#{domain => [khepri, ra_machine]})
end;
Error ->
?LOG_DEBUG(
"Failed to refresh projection ~s due to an error finding "
"matching nodes in the old tree: ~p",
[khepri_projection:name(Projection), Error],
#{domain => [khepri, ra_machine]})
end.

-spec diff_matching_nodes(OldNodeProps, NewNodeProps) -> Changes when
OldNodeProps :: khepri_adv:node_props_map(),
NewNodeProps :: khepri_adv:node_props_map(),
OldProps :: khepri:node_props(),
NewProps :: khepri:node_props(),
Changes :: #{khepri_path:native_path() => {OldProps, NewProps}}.
%% @private

diff_matching_nodes(OldNodeProps, NewNodeProps) ->
CommonProps = maps:intersect_with(
fun(_Path, OldProps, NewProps) -> {OldProps, NewProps} end,
OldNodeProps, NewNodeProps),
CommonPaths = maps:keys(CommonProps),
AllProps = maps:fold(
fun(Path, OldProps, Acc) ->
Acc#{Path => {OldProps, #{}}}
end, CommonProps, maps:without(CommonPaths, OldNodeProps)),
maps:fold(
fun(Path, NewProps, Acc) ->
Acc#{Path => {#{}, NewProps}}
end, AllProps, maps:without(CommonPaths, NewNodeProps)).
52 changes: 42 additions & 10 deletions src/khepri_pattern_tree.erl
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@
-export([empty/0,
is_empty/1,
update/3,
fold/5,
fold/3,
fold_matching/5,
foreach/2,
compile/1,
map_fold/3,
Expand Down Expand Up @@ -131,7 +132,37 @@ update_payload(#pattern_node{payload = Payload} = PatternTree, UpdateFun) ->
Payload1 = UpdateFun(Payload),
PatternTree#pattern_node{payload = Payload1}.

-spec fold(PatternTree, Tree, Path, FoldFun, Acc) -> Ret when
-spec fold(PatternTree, Fun, Acc) -> Ret when
PatternTree :: khepri_pattern_tree:tree(Payload),
Fun :: fold_fun(Payload),
Acc :: fold_acc(),
Ret :: fold_acc(),
Payload :: payload().
%% @doc Folds over the pattern tree passing each pattern, payload and the
%% accumulator to the fold function.
%%
%% Unlike {@link fold_matching/5} all payloads are passed to the fold function.

fold(#pattern_node{} = PatternTree, Fun, Acc) ->
fold(PatternTree, Fun, [], Acc).

fold(
#pattern_node{payload = Payload, child_nodes = ChildNodes},
Fun, ReversedPath, Acc) ->
Acc1 = case Payload of
?NO_PAYLOAD ->
Acc;
_ ->
Pattern = lists:reverse(ReversedPath),
Fun(Pattern, Payload, Acc)
end,
maps:fold(
fun(PatternComponent, Child, Acc2) ->
ReversedPath1 = [PatternComponent | ReversedPath],
fold(Child, Fun, ReversedPath1, Acc2)
end, Acc1, ChildNodes).

-spec fold_matching(PatternTree, Tree, Path, FoldFun, Acc) -> Ret when
PatternTree :: khepri_pattern_tree:tree(Payload),
Tree :: khepri_tree:tree(),
Path :: khepri_path:native_path(),
Expand All @@ -149,17 +180,18 @@ update_payload(#pattern_node{payload = Payload} = PatternTree, UpdateFun) ->
%% @see fold_fun().
%% @see fold_acc().

fold(PatternTree, Tree, Path, FoldFun, Acc) ->
fold_matching(PatternTree, Tree, Path, FoldFun, Acc) ->
Path1 = khepri_path:realpath(Path),
case Path1 of
[] ->
fold_data(PatternTree, [], FoldFun, Acc);
_ ->
Root = Tree#tree.root,
fold1(PatternTree, Root, Path, FoldFun, Acc, [])
fold_matching1(PatternTree, Root, Path, FoldFun, Acc, [])
end.

-spec fold1(PatternTree, Node, Path, FoldFun, Acc, ReversedPath) -> Ret when
-spec fold_matching1(PatternTree, Node, Path, FoldFun, Acc, ReversedPath) ->
Ret when
PatternTree :: khepri_pattern_tree:tree(Payload),
Node :: khepri_tree:tree_node(),
Path :: khepri_path:native_path(),
Expand All @@ -170,10 +202,10 @@ fold(PatternTree, Tree, Path, FoldFun, Acc) ->
Payload :: payload().
%% @private

fold1(
fold_matching1(
_PatternTree, _Node, [], _FoldFun, Acc, _ReversedPath) ->
Acc;
fold1(
fold_matching1(
PatternTree, Parent, [Component | Rest], FoldFun, Acc, ReversedPath) ->
case Parent of
#node{child_nodes = #{Component := Node}} ->
Expand All @@ -200,7 +232,7 @@ fold1(
%% not at the end of the path).
%%
%% We continue with the next component.
fold1(
fold_matching1(
PatternSubtree, Node, Rest,
FoldFun, Acc0, ReversedPath1);
true when AppliesToGrandchildren ->
Expand All @@ -220,10 +252,10 @@ fold1(
PatternTree1 = PatternTree#pattern_node{
child_nodes =
#{Condition => PatternSubtree}},
Acc1 = fold1(
Acc1 = fold_matching1(
PatternTree1, Node, Rest,
FoldFun, Acc0, ReversedPath1),
Acc2 = fold1(
Acc2 = fold_matching1(
PatternSubtree, Node, Rest,
FoldFun, Acc1, ReversedPath1),
Acc2;
Expand Down
Loading

0 comments on commit 1ad4a86

Please sign in to comment.