From ef505f1ee0e6008a9108286ab0ed276d61de01cf Mon Sep 17 00:00:00 2001 From: "Alejandro M. Ramallo" Date: Thu, 22 Aug 2024 20:09:08 +0100 Subject: [PATCH] Revert change from gen_server to partisan_gen_server Signed-off-by: Alejandro M. Ramallo --- src/partisan_plumtree_broadcast.erl | 111 +++++++++++++++++++--------- 1 file changed, 76 insertions(+), 35 deletions(-) diff --git a/src/partisan_plumtree_broadcast.erl b/src/partisan_plumtree_broadcast.erl index 13e8a8f5..0a626726 100644 --- a/src/partisan_plumtree_broadcast.erl +++ b/src/partisan_plumtree_broadcast.erl @@ -72,7 +72,7 @@ %% ----------------------------------------------------------------------------- -module(partisan_plumtree_broadcast). --behaviour(partisan_gen_server). +-behaviour(gen_server). -include("partisan.hrl"). -include("partisan_logger.hrl"). @@ -307,7 +307,7 @@ is_map(Opts) -> StartOpts = [ {spawn_opt, ?PARALLEL_SIGNAL_OPTIMISATION([])} ], - partisan_gen_server:start_link({local, ?SERVER}, ?MODULE, Args, StartOpts). + gen_server:start_link({local, ?SERVER}, ?MODULE, Args, StartOpts). %% ----------------------------------------------------------------------------- @@ -326,7 +326,7 @@ is_map(Opts) -> broadcast(Broadcast, Mod) -> {MessageId, Payload} = Mod:broadcast_data(Broadcast), - partisan_gen_server:cast(?SERVER, {broadcast, MessageId, Payload, Mod}). + gen_server:cast(?SERVER, {broadcast, MessageId, Payload, Mod}). %% ----------------------------------------------------------------------------- @@ -341,7 +341,12 @@ broadcast(Broadcast, Mod) -> -spec broadcast_channel(Mod :: module()) -> partisan:channel(). broadcast_channel(Mod) -> - partisan_util:apply(Mod, broadcast_channel, [], ?DEFAULT_CHANNEL). + case erlang:function_exported(Mod, broadcast_channel, 0) of + true -> + Mod:broadcast_channel(); + false -> + ?DEFAULT_CHANNEL + end. %% ----------------------------------------------------------------------------- @@ -350,11 +355,11 @@ broadcast_channel(Mod) -> %% partisan_peer_service:add_sup_callback(fun ?MODULE:update/1), %% @end %% ----------------------------------------------------------------------------- --spec update([node()]) -> ok | no_return(). +-spec update([node()]) -> ok. update(LocalState0) -> - MemberList = partisan_peer_service:decode(LocalState0), - partisan_gen_server:cast(?SERVER, {update, MemberList}). + LocalState = partisan_peer_service:decode(LocalState0), + gen_server:cast(?SERVER, {update, LocalState}). %% ----------------------------------------------------------------------------- @@ -362,7 +367,7 @@ update(LocalState0) -> %% Wait indefinitely for a response is returned from the process. %% @end %% ----------------------------------------------------------------------------- --spec broadcast_members() -> nodeset() | no_return(). +-spec broadcast_members() -> nodeset(). broadcast_members() -> broadcast_members(infinity). @@ -373,10 +378,10 @@ broadcast_members() -> %% Waits `Timeout' ms for a response from the server. %% @end %% ----------------------------------------------------------------------------- --spec broadcast_members(infinity | pos_integer()) -> nodeset() | no_return(). +-spec broadcast_members(infinity | pos_integer()) -> nodeset(). broadcast_members(Timeout) -> - partisan_gen_server:call(?SERVER, broadcast_members, Timeout). + gen_server:call(?SERVER, broadcast_members, Timeout). %% ----------------------------------------------------------------------------- @@ -384,17 +389,19 @@ broadcast_members(Timeout) -> %% running. %% @end %% ----------------------------------------------------------------------------- --spec exchanges() -> exchanges() | no_return(). +-spec exchanges() -> {ok, exchanges()}. exchanges() -> - partisan_gen_server:call(?SERVER, exchanges, infinity). + gen_server:call(?SERVER, exchanges, infinity). %% ----------------------------------------------------------------------------- %% @doc Returns a list of running exchanges, started on `Node'. %% @end %% ----------------------------------------------------------------------------- --spec exchanges(node()) -> exchanges() | no_return(). +-spec exchanges(node()) -> + {ok, exchanges()} + | {error, {badrpc, Reason :: any()}}. exchanges(Node) -> exchanges(Node, infinity). @@ -404,61 +411,71 @@ exchanges(Node) -> %% @doc Returns a list of running exchanges, started on `Node'. %% @end %% ----------------------------------------------------------------------------- --spec exchanges(node(), timeout()) -> exchanges() | no_return(). +-spec exchanges(node(), timeout()) -> + {ok, exchanges()} + | {error, {badrpc, Reason :: any()}}. exchanges(Node, Timeout) -> - partisan_gen_server:call({?SERVER, Node}, exchanges, Timeout). + %% This will not work because gen_server uses disterl + %% TODO reconsider turning this server into a partisan_gen_serv + %% gen_server:call({?SERVER, Node}, exchanges, infinity). + case partisan_rpc:call(Node, ?SERVER, exchanges, [], Timeout) of + {ok, _} = OK -> + %% eqwalizer:ignore + OK; + {badrpc, _} = Reason -> + {error, Reason} + end. %% ----------------------------------------------------------------------------- %% @doc Cancel exchanges started by this node. %% @end %% ----------------------------------------------------------------------------- --spec cancel_exchanges(selector()) -> exchanges() | no_return(). +-spec cancel_exchanges(selector()) -> exchanges(). cancel_exchanges(Selector) -> - partisan_gen_server:call(?SERVER, {cancel_exchanges, Selector}, infinity). + gen_server:call(?SERVER, {cancel_exchanges, Selector}, infinity). %% ----------------------------------------------------------------------------- %% @doc %% @end %% ----------------------------------------------------------------------------- --spec get_peers(Root :: node()) -> list() | no_return(). +-spec get_peers(Root :: node()) -> list(). get_peers(Root) -> - partisan_gen_server:call(?SERVER, {get_peers, Root}). + gen_server:call(?SERVER, {get_peers, Root}). %% ----------------------------------------------------------------------------- %% @doc %% @end %% ----------------------------------------------------------------------------- --spec get_peers(Root :: node(), Opts :: [partisan:info_opt()]) -> - list() | no_return(). +-spec get_peers(Root :: node(), Opts :: [partisan:info_opt()]) -> list(). get_peers(Root, Opts) when is_list(Opts) -> - partisan_gen_server:call(?SERVER, {get_peers, Root, Opts}). + gen_server:call(?SERVER, {get_peers, Root, Opts}). %% ----------------------------------------------------------------------------- %% @doc %% @end %% ----------------------------------------------------------------------------- --spec get_eager_peers(Root :: node()) -> list() | no_return(). +-spec get_eager_peers(Root :: node()) -> list(). get_eager_peers(Root) -> - partisan_gen_server:call(?SERVER, {get_eager_peers, Root}). + gen_server:call(?SERVER, {get_eager_peers, Root}). %% ----------------------------------------------------------------------------- %% @doc %% @end %% ----------------------------------------------------------------------------- --spec get_lazy_peers(Root :: node()) -> list() | no_return(). +-spec get_lazy_peers(Root :: node()) -> list(). get_lazy_peers(Root) -> - partisan_gen_server:call(?SERVER, {get_lazy_peers, Root}). + gen_server:call(?SERVER, {get_lazy_peers, Root}). @@ -716,7 +733,19 @@ debug_get_peers(Node, Root) -> {nodeset(), nodeset()} | no_return(). debug_get_peers(Node, Root, Timeout) -> - partisan_gen_server:call({?SERVER, Node}, {get_peers, Root}, Timeout). + %% This will not work because gen_server uses disterl + %% gen_server:call({?SERVER, Node}, {get_peers, Root}, Timeout). + %% TODO reconsider turning this server into a partisan_gen_server + case partisan_rpc:call(Node, ?MODULE, get_peers, [Root], Timeout) of + {badrpc, Reason} -> + error(Reason); + {_, _} = Result -> + %% eqwalizer:ignore + Result; + {_, _, _} = Result -> + %% eqwalizer:ignore + Result + end. %% @doc return the peers for `Node' for the tree rooted at `Root'. @@ -725,7 +754,19 @@ debug_get_peers(Node, Root, Timeout) -> {nodeset(), nodeset(), Info :: map()} | no_return(). debug_get_peers(Node, Root, Opts, Timeout) -> - partisan_gen_server:call({?SERVER, Node}, {get_peers, Root, Opts}, Timeout). + %% This will not work because gen_server uses disterl + %% gen_server:call({?SERVER, Node}, {get_peers, Root}, Timeout). + %% TODO reconsider turning this server into a partisan_gen_server + case partisan_rpc:call(Node, ?MODULE, get_peers, [Root, Opts], Timeout) of + {badrpc, Reason} -> + error(Reason); + {_, _} = Result -> + %% eqwalizer:ignore + Result; + {_, _, _} = Result -> + %% eqwalizer:ignore + Result + end. %% ----------------------------------------------------------------------------- @@ -1068,7 +1109,7 @@ exchange(Peer, #state{exchanges = Exchanges} = State, Mod) -> {error, _Reason} -> State; - _ -> + _ -> State end. @@ -1272,8 +1313,8 @@ send(Msg, Mod, Peers) when is_list(Peers) -> send(Msg, Mod, Peer) -> instrument_transmission(Msg, Mod), - Opts = [{channel, broadcast_channel(Mod)}], - partisan_gen_server:cast({?SERVER, Peer}, Msg, Opts). + Opts = #{channel => broadcast_channel(Mod)}, + partisan:cast_message(Peer, ?SERVER, Msg, Opts). %% @private @@ -1296,10 +1337,10 @@ schedule_tick(Message, Timer, Default) -> -spec reset_peers(nodeset(), nodeset(), nodeset(), state()) -> state(). reset_peers(AllMembers, EagerPeers, LazyPeers, State) -> - Node = State#state.node, + MyNode = partisan:node(), State#state{ - common_eagers = ordsets:del_element(Node, EagerPeers), - common_lazys = ordsets:del_element(Node, LazyPeers), + common_eagers = ordsets:del_element(MyNode, EagerPeers), + common_lazys = ordsets:del_element(MyNode, LazyPeers), eager_sets = maps:new(), lazy_sets = maps:new(), all_members = AllMembers @@ -1329,4 +1370,4 @@ instrument_transmission(Message, Mod) -> end, ToLog ) - end. + end. \ No newline at end of file