diff --git a/rebar.config b/rebar.config index 58684152..a16192d4 100644 --- a/rebar.config +++ b/rebar.config @@ -1,7 +1,7 @@ %% vim:ft=erlang: {minimum_otp_vsn, "24.0"}. -{deps, [{ra, "2.13.6"}, +{deps, [{ra, "2.14.0"}, {horus, "0.3.0"}]}. {project_plugins, [rebar3_proper, diff --git a/rebar.lock b/rebar.lock index 1ebd941b..81c3cb9f 100644 --- a/rebar.lock +++ b/rebar.lock @@ -2,19 +2,19 @@ [{<<"aten">>,{pkg,<<"aten">>,<<"0.6.0">>},1}, {<<"gen_batch_server">>,{pkg,<<"gen_batch_server">>,<<"0.8.8">>},1}, {<<"horus">>,{pkg,<<"horus">>,<<"0.3.0">>},0}, - {<<"ra">>,{pkg,<<"ra">>,<<"2.13.6">>},0}, + {<<"ra">>,{pkg,<<"ra">>,<<"2.14.0">>},0}, {<<"seshat">>,{pkg,<<"seshat">>,<<"0.6.0">>},1}]}. [ {pkg_hash,[ {<<"aten">>, <<"7A57B275A6DAF515AC3683FB9853E280B4D0DCDD74292FD66AC4A01C8694F8C7">>}, {<<"gen_batch_server">>, <<"7840A1FA63EE1EFFC83E8A91D22664847A2BA1192D30EAFFFD914ACB51578068">>}, {<<"horus">>, <<"0AAB22D88FA8159FFB13F83A782B0D9B0819C08DAFD4A4C4BEEA0C31F32B626A">>}, - {<<"ra">>, <<"D0B2A33D38C2B20BCB7C898C6DB882BFE73B402B9B21FE08A8454BBD1A83E663">>}, + {<<"ra">>, <<"AEB630926AE0D4A6EDE62C12CC4C3389DDD4E18B5E9CFB84369251B69FA9D257">>}, {<<"seshat">>, <<"3172EB1D7A2A4F66108CD6933A4E465AFF80F84AA90ED83F047B92F636123CCD">>}]}, {pkg_hash_ext,[ {<<"aten">>, <<"5F39A164206AE3F211EF5880B1F7819415686436E3229D30B6A058564FBAA168">>}, {<<"gen_batch_server">>, <<"C3E6A1A2A0FB62AEE631A98CFA0FD8903E9562422CBF72043953E2FB1D203017">>}, {<<"horus">>, <<"DABE0BD901159D31ADA944310302A155324FE421ADCB12BFC4BB7EABDD9625F8">>}, - {<<"ra">>, <<"0BE7645DCE4A76EDD4C4642D0FA69639518C72B6B60A34FC86590D1909166AEB">>}, + {<<"ra">>, <<"1D553DD971A0B398B7AF0FA8C8458DDA575715FF71C65C972E9500B24039B240">>}, {<<"seshat">>, <<"7CEF700F92831DD7CAE6A6DD223CCC55AC88ECCE0631EE9AB0F2B5FB70E79B90">>}]} ]. diff --git a/src/khepri_machine.erl b/src/khepri_machine.erl index a8155e3a..5e016159 100644 --- a/src/khepri_machine.erl +++ b/src/khepri_machine.erl @@ -78,7 +78,7 @@ handle_aux/6, apply/3, state_enter/2, - snapshot_installed/2, + snapshot_installed/4, overview/1, version/0, which_module/1]). @@ -1565,9 +1565,23 @@ state_enter(_StateName, _State) -> %% @private -snapshot_installed(_Meta, _State) -> - SideEffect = {aux, restore_projections}, - [SideEffect]. +snapshot_installed( + #{machine_version := NewMacVer}, NewState, + #{machine_version := OldMacVer}, OldState) -> + %% A snapshot might be installed on a follower member who has fallen + %% sufficiently far behind in replication of the log from the leader. When + %% a member installs a snapshot it needs to update its projections: new + %% projections may have been registered since the snapshot or old ones + %% unregistered. Projections which did not change need to be triggered + %% with the new changes to state, similar to the `restore_projections' aux + %% effect. Also see `update_projections/2'. + %% + %% Note that the snapshot installation might bump the effective machine + %% version so we need to convert the old state to the new machine version. + OldState1 = convert_state(OldState, OldMacVer, NewMacVer), + ok = update_projections(OldState1, NewState), + ok = clear_compiled_projection_tree(), + []. %% @private @@ -1828,7 +1842,7 @@ create_projection_side_effects2( delete -> InitialTree end, - khepri_pattern_tree:fold( + khepri_pattern_tree:fold_matching( ProjectionTree, PatternMatchingTree, Path, @@ -2331,3 +2345,129 @@ convert_state(State, 0, 1) -> State1 = list_to_tuple(Fields1), ?assert(is_state(State1)), State1. + +-spec update_projections(OldState, NewState) -> ok when + OldState :: khepri_machine:state(), + NewState :: khepri_machine:state(). +%% @doc Updates the machine's projections to account for changes between two +%% states. +%% +%% This is used when installing a projection - the state will jump from the +%% given `OldState' before the snapshot was installed to the given `NewState' +%% after. When we swap states we need to update the projections: the records +%% in the projection tables themselves but also which projection tables exist. +%% The changes glossed over by the snapshot may include projection +%% registrations and unregistrations so we need to initialize new projections +%% and delete unregistered ones, and we need to ensure that the projection +%% tables are up to date for any projections which didn't change. +%% +%% @private + +update_projections(OldState, NewState) -> + OldTree = get_tree(OldState), + OldProjections = set_of_projections(get_projections(OldState)), + NewTree = get_tree(NewState), + NewProjections = set_of_projections(get_projections(NewState)), + + CommonProjections = sets:intersection(OldProjections, NewProjections), + DeletedProjections = sets:subtract(OldProjections, CommonProjections), + CreatedProjections = sets:subtract(NewProjections, CommonProjections), + + %% Tear down any projections which were unregistered. + sets:fold( + fun({_Pattern, Projection}, _Acc) -> + _ = khepri_projection:delete(Projection), + ok + end, ok, DeletedProjections), + + %% Initialize any new projections which were registered. + sets:fold( + fun({Pattern, Projection}, _Acc) -> + ok = restore_projection(Projection, NewTree, Pattern) + end, ok, CreatedProjections), + + %% Update in-place any projections which were not changed themselves (i.e. + %% the projection name, function and pattern) between old and new states. + %% To do this we will find the matching nodes in the old and new tree for + %% the projection's pattern and trigger the projection based on each + %% matching path's old and new properties. + sets:fold( + fun({Pattern, Projection}, _Acc) -> + ok = update_projection(Pattern, Projection, OldTree, NewTree) + end, ok, CommonProjections), + + ok. + +-spec set_of_projections(ProjectionTree) -> Projections when + ProjectionTree :: khepri_machine:projection_tree(), + Element :: {khepri_path:native_pattern(), + khepri_projection:projection()}, + Projections :: sets:set(Element). +%% Folds the set of projections in a projection tree into a version 2 {@link +%% sets:set()}. +%% +%% @private + +set_of_projections(ProjectionTree) -> + khepri_pattern_tree:fold( + ProjectionTree, + fun(Pattern, Projections, Acc) -> + lists:foldl( + fun(Projection, Acc1) -> + Entry = {Pattern, Projection}, + sets:add_element(Entry, Acc1) + end, Acc, Projections) + end, sets:new([{version, 2}])). + +update_projection(Pattern, Projection, OldTree, NewTree) -> + TreeOptions = #{props_to_return => ?PROJECTION_PROPS_TO_RETURN, + include_root_props => true}, + case khepri_tree:find_matching_nodes(OldTree, Pattern, TreeOptions) of + {ok, OldMatchingNodes} -> + Result = khepri_tree:find_matching_nodes( + NewTree, Pattern, TreeOptions), + case Result of + {ok, NewMatchingNodes} -> + Updates = diff_matching_nodes( + OldMatchingNodes, NewMatchingNodes), + maps:foreach( + fun(Path, {OldProps, NewProps}) -> + khepri_projection:trigger( + Projection, Path, OldProps, NewProps) + end, Updates); + Error -> + ?LOG_DEBUG( + "Failed to refresh projection ~s due to an error " + "finding matching nodes in the new tree: ~p", + [khepri_projection:name(Projection), Error], + #{domain => [khepri, ra_machine]}) + end; + Error -> + ?LOG_DEBUG( + "Failed to refresh projection ~s due to an error finding " + "matching nodes in the old tree: ~p", + [khepri_projection:name(Projection), Error], + #{domain => [khepri, ra_machine]}) + end. + +-spec diff_matching_nodes(OldNodeProps, NewNodeProps) -> Changes when + OldNodeProps :: khepri_adv:node_props_map(), + NewNodeProps :: khepri_adv:node_props_map(), + OldProps :: khepri:node_props(), + NewProps :: khepri:node_props(), + Changes :: #{khepri_path:native_path() => {OldProps, NewProps}}. +%% @private + +diff_matching_nodes(OldNodeProps, NewNodeProps) -> + CommonProps = maps:intersect_with( + fun(_Path, OldProps, NewProps) -> {OldProps, NewProps} end, + OldNodeProps, NewNodeProps), + CommonPaths = maps:keys(CommonProps), + AllProps = maps:fold( + fun(Path, OldProps, Acc) -> + Acc#{Path => {OldProps, #{}}} + end, CommonProps, maps:without(CommonPaths, OldNodeProps)), + maps:fold( + fun(Path, NewProps, Acc) -> + Acc#{Path => {#{}, NewProps}} + end, AllProps, maps:without(CommonPaths, NewNodeProps)). diff --git a/src/khepri_pattern_tree.erl b/src/khepri_pattern_tree.erl index 6d7d75d4..cc80e1a3 100644 --- a/src/khepri_pattern_tree.erl +++ b/src/khepri_pattern_tree.erl @@ -51,7 +51,8 @@ -export([empty/0, is_empty/1, update/3, - fold/5, + fold/3, + fold_matching/5, foreach/2, compile/1, map_fold/3, @@ -131,7 +132,37 @@ update_payload(#pattern_node{payload = Payload} = PatternTree, UpdateFun) -> Payload1 = UpdateFun(Payload), PatternTree#pattern_node{payload = Payload1}. --spec fold(PatternTree, Tree, Path, FoldFun, Acc) -> Ret when +-spec fold(PatternTree, Fun, Acc) -> Ret when + PatternTree :: khepri_pattern_tree:tree(Payload), + Fun :: fold_fun(Payload), + Acc :: fold_acc(), + Ret :: fold_acc(), + Payload :: payload(). +%% @doc Folds over the pattern tree passing each pattern, payload and the +%% accumulator to the fold function. +%% +%% Unlike {@link fold_matching/5} all payloads are passed to the fold function. + +fold(#pattern_node{} = PatternTree, Fun, Acc) -> + fold(PatternTree, Fun, [], Acc). + +fold( + #pattern_node{payload = Payload, child_nodes = ChildNodes}, + Fun, ReversedPath, Acc) -> + Acc1 = case Payload of + ?NO_PAYLOAD -> + Acc; + _ -> + Pattern = lists:reverse(ReversedPath), + Fun(Pattern, Payload, Acc) + end, + maps:fold( + fun(PatternComponent, Child, Acc2) -> + ReversedPath1 = [PatternComponent | ReversedPath], + fold(Child, Fun, ReversedPath1, Acc2) + end, Acc1, ChildNodes). + +-spec fold_matching(PatternTree, Tree, Path, FoldFun, Acc) -> Ret when PatternTree :: khepri_pattern_tree:tree(Payload), Tree :: khepri_tree:tree(), Path :: khepri_path:native_path(), @@ -149,17 +180,18 @@ update_payload(#pattern_node{payload = Payload} = PatternTree, UpdateFun) -> %% @see fold_fun(). %% @see fold_acc(). -fold(PatternTree, Tree, Path, FoldFun, Acc) -> +fold_matching(PatternTree, Tree, Path, FoldFun, Acc) -> Path1 = khepri_path:realpath(Path), case Path1 of [] -> fold_data(PatternTree, [], FoldFun, Acc); _ -> Root = Tree#tree.root, - fold1(PatternTree, Root, Path, FoldFun, Acc, []) + fold_matching1(PatternTree, Root, Path, FoldFun, Acc, []) end. --spec fold1(PatternTree, Node, Path, FoldFun, Acc, ReversedPath) -> Ret when +-spec fold_matching1(PatternTree, Node, Path, FoldFun, Acc, ReversedPath) -> + Ret when PatternTree :: khepri_pattern_tree:tree(Payload), Node :: khepri_tree:tree_node(), Path :: khepri_path:native_path(), @@ -170,10 +202,10 @@ fold(PatternTree, Tree, Path, FoldFun, Acc) -> Payload :: payload(). %% @private -fold1( +fold_matching1( _PatternTree, _Node, [], _FoldFun, Acc, _ReversedPath) -> Acc; -fold1( +fold_matching1( PatternTree, Parent, [Component | Rest], FoldFun, Acc, ReversedPath) -> case Parent of #node{child_nodes = #{Component := Node}} -> @@ -200,7 +232,7 @@ fold1( %% not at the end of the path). %% %% We continue with the next component. - fold1( + fold_matching1( PatternSubtree, Node, Rest, FoldFun, Acc0, ReversedPath1); true when AppliesToGrandchildren -> @@ -220,10 +252,10 @@ fold1( PatternTree1 = PatternTree#pattern_node{ child_nodes = #{Condition => PatternSubtree}}, - Acc1 = fold1( + Acc1 = fold_matching1( PatternTree1, Node, Rest, FoldFun, Acc0, ReversedPath1), - Acc2 = fold1( + Acc2 = fold_matching1( PatternSubtree, Node, Rest, FoldFun, Acc1, ReversedPath1), Acc2; diff --git a/test/cluster_SUITE.erl b/test/cluster_SUITE.erl index 69ceb7d9..7ae03cb4 100644 --- a/test/cluster_SUITE.erl +++ b/test/cluster_SUITE.erl @@ -2021,16 +2021,17 @@ projections_are_updated_when_a_snapshot_is_installed(Config) -> %% When this happens the member doesn't see the changes as regular %% commands (i.e. handled in `ra_machine:apply/3'). Instead the machine %% state is replaced entirely. So when a snapshot is installed we must - %% restore projections the same way we do as when we restart a member. - %% In `khepri_machine' this is done in the `snapshot_installed/2` callback - %% implementation. + %% restore projections. The machine is alive at this point though so we + %% can't restore projections the same way as when we restart a member + %% though. We need to diff the old and new state first to handle any newly + %% registered or unregistered projections and then to update any existing + %% projections with updated or deleted records. In `khepri_machine' this is + %% done in the `snapshot_installed/3' callback implementation. %% %% To test this we stop a member, apply enough commands to cause the leader %% to take a snapshot, and then restart the member and assert that the %% projection contents are as expected. - ProjectionName = ?MODULE, - %% We call `khepri_projection:new/2' on the local node and thus need %% Khepri. ?assertMatch({ok, _}, application:ensure_all_started(khepri)), @@ -2041,9 +2042,9 @@ projections_are_updated_when_a_snapshot_is_installed(Config) -> #{ra_system := RaSystem} = maps:get(Node1, PropsPerNode), StoreId = RaSystem, %% Set the snapshot interval low so that we can trigger a snapshot by - %% sending 4 commands. + %% sending a few commands. RaServerConfig = #{cluster_name => StoreId, - machine_config => #{snapshot_interval => 4}}, + machine_config => #{snapshot_interval => 20}}, ct:pal("Start database + cluster nodes"), lists:foreach( @@ -2061,14 +2062,30 @@ projections_are_updated_when_a_snapshot_is_installed(Config) -> rpc:call(Node, khepri_cluster, join, [StoreId, Node3])) end, [Node1, Node2]), - ct:pal("Register projection on node ~s", [Node1]), - Projection = khepri_projection:new( - ProjectionName, - fun(Path, Payload) -> {Path, Payload} end), + ProjectionName1 = projection_1, + Projection1 = khepri_projection:new( + ProjectionName1, + fun(Path, Payload) -> {Path, Payload} end), + ProjectionName2 = projection_2, + Projection2 = khepri_projection:new( + ProjectionName2, + fun(Path, Payload) -> {Path, Payload} end), + ProjectionName3 = projection_3, + Projection3 = khepri_projection:new( + ProjectionName3, + fun(Path, Payload) -> {Path, Payload} end), + + ct:pal("Register projection ~ts on node ~s", [ProjectionName1, Node1]), rpc:call(Node1, khepri, register_projection, - [StoreId, [?KHEPRI_WILDCARD_STAR_STAR], Projection]), - ok = wait_for_projection_on_nodes([Node2, Node3], ProjectionName), + [StoreId, [?KHEPRI_WILDCARD_STAR_STAR], Projection1]), + ok = wait_for_projection_on_nodes([Node2, Node3], ProjectionName1), + + ct:pal("Register projection ~ts on node ~s", [ProjectionName2, Node1]), + rpc:call(Node1, + khepri, register_projection, + [StoreId, [?KHEPRI_WILDCARD_STAR_STAR], Projection2]), + ok = wait_for_projection_on_nodes([Node2, Node3], ProjectionName2), ?assertEqual( ok, @@ -2076,29 +2093,47 @@ projections_are_updated_when_a_snapshot_is_installed(Config) -> #{reply_from => local}])), ?assertEqual( value1v1, - rpc:call(Node3, ets, lookup_element, [ProjectionName, [key1], 2])), + rpc:call(Node3, ets, lookup_element, [ProjectionName1, [key1], 2])), + %% This key will be deleted. + ?assertEqual( + ok, + rpc:call(Node3, khepri, put, [StoreId, [key2], value2v1, + #{reply_from => local}])), + %% So far there isn't a snapshot. + ?assertMatch( + {ok, #{log := #{snapshot_index := undefined}}, _}, + ra:member_overview(khepri_cluster:node_to_member(StoreId, Node3))), ct:pal( "Stop cluster member ~s (quorum is maintained)", [Node1]), ok = rpc:call(Node1, khepri, stop, [StoreId]), - ?assertMatch( - {ok, #{log := #{snapshot_index := undefined}}, _}, - ra:member_overview(khepri_cluster:node_to_member(StoreId, Node3))), - - ct:pal("Submit enough commands to trigger a snapshot"), + ct:pal("Modify paths which are watched by projections"), ct:pal("- set key1:value1v2"), ok = rpc:call(Node3, khepri, put, [StoreId, [key1], value1v2]), - ct:pal("- set key2:value2v1"), - ok = rpc:call(Node3, khepri, put, [StoreId, [key2], value2v1]), + ct:pal("- delete key2"), + ok = rpc:call(Node3, khepri, delete, [StoreId, [key2]]), ct:pal("- set key3:value3v1"), ok = rpc:call(Node3, khepri, put, [StoreId, [key3], value3v1]), ct:pal("- set key4:value4v1"), ok = rpc:call(Node3, khepri, put, [StoreId, [key4], value4v1]), - {ok, #{log := #{snapshot_index := SnapshotIndex}}, _} = - ra:member_overview(khepri_cluster:node_to_member(StoreId, Node3)), - ?assert(is_number(SnapshotIndex) andalso SnapshotIndex > 4), + ct:pal("Register projection ~ts on node ~s", [ProjectionName3, Node3]), + rpc:call(Node3, + khepri, register_projection, + [StoreId, [?KHEPRI_WILDCARD_STAR_STAR], Projection3]), + ct:pal("Unregister projection ~ts on node ~s", [ProjectionName2, Node3]), + rpc:call(Node3, + khepri, unregister_projections, [StoreId, [ProjectionName2]]), + + ct:pal("Send many commands to ensure a snapshot is triggered"), + [ok = rpc:call(Node3, khepri, put, [StoreId, [key5], value5v1]) + || _ <- lists:seq(1, 20)], + + {ok, #{log := #{snapshot_index := SnapshotIndex}}, _} = + ra:member_overview(khepri_cluster:node_to_member(StoreId, Node3)), + ct:pal("New snapshot index: ~p", [SnapshotIndex]), + ?assert(is_number(SnapshotIndex) andalso SnapshotIndex > 20), ct:pal("Restart cluster member ~s", [Node1]), {ok, StoreId} = rpc:call(Node1, khepri, start, [RaSystem, RaServerConfig]), @@ -2112,20 +2147,37 @@ projections_are_updated_when_a_snapshot_is_installed(Config) -> [key5], value5v1, #{reply_from => local}), ?assertEqual( value5v1, - rpc:call(Node1, ets, lookup_element, [ProjectionName, [key5], 2])), - + rpc:call(Node1, ets, lookup_element, [ProjectionName1, [key5], 2])), + + ct:pal("Contents of projection table '~ts'", [ProjectionName1]), + [begin + Contents = rpc:call(Node, ets, tab2list, [ProjectionName1]), + ct:pal("- node ~ts:~n~p", [Node, Contents]) + end || Node <- Nodes], + + [begin + ?assertEqual( + value1v2, + rpc:call(Node, ets, lookup_element, [ProjectionName1, [key1], 2])), + ?assertEqual( + false, + rpc:call(Node, ets, member, [ProjectionName1, [key2]])), + ?assertEqual( + value3v1, + rpc:call(Node, ets, lookup_element, [ProjectionName1, [key3], 2])), + ?assertEqual( + value4v1, + rpc:call(Node, ets, lookup_element, [ProjectionName1, [key4], 2])) + end || Node <- Nodes], + + %% Ensure that the projections themselves are also updated on Node1. + %% ProjectionName2 was unregistered and ProjectionName3 was registered. + ?assertEqual( + undefined, + rpc:call(Node1, ets, info, [ProjectionName2])), ?assertEqual( value1v2, - rpc:call(Node1, ets, lookup_element, [ProjectionName, [key1], 2])), - ?assertEqual( - value2v1, - rpc:call(Node1, ets, lookup_element, [ProjectionName, [key2], 2])), - ?assertEqual( - value3v1, - rpc:call(Node1, ets, lookup_element, [ProjectionName, [key3], 2])), - ?assertEqual( - value4v1, - rpc:call(Node1, ets, lookup_element, [ProjectionName, [key4], 2])), + rpc:call(Node1, ets, lookup_element, [ProjectionName3, [key1], 2])), ok. diff --git a/test/pattern_tree.erl b/test/pattern_tree.erl index eaf78da8..65dd5897 100644 --- a/test/pattern_tree.erl +++ b/test/pattern_tree.erl @@ -102,7 +102,7 @@ fold_finds_all_patterns_matching_a_path_test() -> lists_enumerate(PathPatterns)), PatternTree = khepri_pattern_tree:compile(PatternTree0), MatchingIndices = fun(Path) -> - khepri_pattern_tree:fold( + khepri_pattern_tree:fold_matching( PatternTree, Tree, Path, fun(_PathPattern, Indices, Acc) -> Acc ++ Indices