Skip to content

Commit

Permalink
Revert change from gen_server to partisan_gen_server
Browse files Browse the repository at this point in the history
Signed-off-by: Alejandro M. Ramallo <[email protected]>
  • Loading branch information
aramallo committed Aug 22, 2024
1 parent 816a8c7 commit ef505f1
Showing 1 changed file with 76 additions and 35 deletions.
111 changes: 76 additions & 35 deletions src/partisan_plumtree_broadcast.erl
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
%% -----------------------------------------------------------------------------
-module(partisan_plumtree_broadcast).

-behaviour(partisan_gen_server).
-behaviour(gen_server).

-include("partisan.hrl").
-include("partisan_logger.hrl").
Expand Down Expand Up @@ -307,7 +307,7 @@ is_map(Opts) ->
StartOpts = [
{spawn_opt, ?PARALLEL_SIGNAL_OPTIMISATION([])}
],
partisan_gen_server:start_link({local, ?SERVER}, ?MODULE, Args, StartOpts).
gen_server:start_link({local, ?SERVER}, ?MODULE, Args, StartOpts).


%% -----------------------------------------------------------------------------
Expand All @@ -326,7 +326,7 @@ is_map(Opts) ->

broadcast(Broadcast, Mod) ->
{MessageId, Payload} = Mod:broadcast_data(Broadcast),
partisan_gen_server:cast(?SERVER, {broadcast, MessageId, Payload, Mod}).
gen_server:cast(?SERVER, {broadcast, MessageId, Payload, Mod}).


%% -----------------------------------------------------------------------------
Expand All @@ -341,7 +341,12 @@ broadcast(Broadcast, Mod) ->
-spec broadcast_channel(Mod :: module()) -> partisan:channel().

broadcast_channel(Mod) ->
partisan_util:apply(Mod, broadcast_channel, [], ?DEFAULT_CHANNEL).
case erlang:function_exported(Mod, broadcast_channel, 0) of
true ->
Mod:broadcast_channel();
false ->
?DEFAULT_CHANNEL
end.


%% -----------------------------------------------------------------------------
Expand All @@ -350,19 +355,19 @@ broadcast_channel(Mod) ->
%% partisan_peer_service:add_sup_callback(fun ?MODULE:update/1),
%% @end
%% -----------------------------------------------------------------------------
-spec update([node()]) -> ok | no_return().
-spec update([node()]) -> ok.

update(LocalState0) ->
MemberList = partisan_peer_service:decode(LocalState0),
partisan_gen_server:cast(?SERVER, {update, MemberList}).
LocalState = partisan_peer_service:decode(LocalState0),
gen_server:cast(?SERVER, {update, LocalState}).


%% -----------------------------------------------------------------------------
%% @doc Returns the broadcast servers view of full cluster membership.
%% Wait indefinitely for a response is returned from the process.
%% @end
%% -----------------------------------------------------------------------------
-spec broadcast_members() -> nodeset() | no_return().
-spec broadcast_members() -> nodeset().

broadcast_members() ->
broadcast_members(infinity).
Expand All @@ -373,28 +378,30 @@ broadcast_members() ->
%% Waits `Timeout' ms for a response from the server.
%% @end
%% -----------------------------------------------------------------------------
-spec broadcast_members(infinity | pos_integer()) -> nodeset() | no_return().
-spec broadcast_members(infinity | pos_integer()) -> nodeset().

broadcast_members(Timeout) ->
partisan_gen_server:call(?SERVER, broadcast_members, Timeout).
gen_server:call(?SERVER, broadcast_members, Timeout).


%% -----------------------------------------------------------------------------
%% @doc return a list of exchanges, started by broadcast on this node, that are
%% running.
%% @end
%% -----------------------------------------------------------------------------
-spec exchanges() -> exchanges() | no_return().
-spec exchanges() -> {ok, exchanges()}.

exchanges() ->
partisan_gen_server:call(?SERVER, exchanges, infinity).
gen_server:call(?SERVER, exchanges, infinity).


%% -----------------------------------------------------------------------------
%% @doc Returns a list of running exchanges, started on `Node'.
%% @end
%% -----------------------------------------------------------------------------
-spec exchanges(node()) -> exchanges() | no_return().
-spec exchanges(node()) ->
{ok, exchanges()}
| {error, {badrpc, Reason :: any()}}.

exchanges(Node) ->
exchanges(Node, infinity).
Expand All @@ -404,61 +411,71 @@ exchanges(Node) ->
%% @doc Returns a list of running exchanges, started on `Node'.
%% @end
%% -----------------------------------------------------------------------------
-spec exchanges(node(), timeout()) -> exchanges() | no_return().
-spec exchanges(node(), timeout()) ->
{ok, exchanges()}
| {error, {badrpc, Reason :: any()}}.

exchanges(Node, Timeout) ->
partisan_gen_server:call({?SERVER, Node}, exchanges, Timeout).
%% This will not work because gen_server uses disterl
%% TODO reconsider turning this server into a partisan_gen_serv
%% gen_server:call({?SERVER, Node}, exchanges, infinity).
case partisan_rpc:call(Node, ?SERVER, exchanges, [], Timeout) of
{ok, _} = OK ->
%% eqwalizer:ignore
OK;
{badrpc, _} = Reason ->
{error, Reason}
end.


%% -----------------------------------------------------------------------------
%% @doc Cancel exchanges started by this node.
%% @end
%% -----------------------------------------------------------------------------
-spec cancel_exchanges(selector()) -> exchanges() | no_return().
-spec cancel_exchanges(selector()) -> exchanges().

cancel_exchanges(Selector) ->
partisan_gen_server:call(?SERVER, {cancel_exchanges, Selector}, infinity).
gen_server:call(?SERVER, {cancel_exchanges, Selector}, infinity).


%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec get_peers(Root :: node()) -> list() | no_return().
-spec get_peers(Root :: node()) -> list().

get_peers(Root) ->
partisan_gen_server:call(?SERVER, {get_peers, Root}).
gen_server:call(?SERVER, {get_peers, Root}).


%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec get_peers(Root :: node(), Opts :: [partisan:info_opt()]) ->
list() | no_return().
-spec get_peers(Root :: node(), Opts :: [partisan:info_opt()]) -> list().

get_peers(Root, Opts) when is_list(Opts) ->
partisan_gen_server:call(?SERVER, {get_peers, Root, Opts}).
gen_server:call(?SERVER, {get_peers, Root, Opts}).


%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec get_eager_peers(Root :: node()) -> list() | no_return().
-spec get_eager_peers(Root :: node()) -> list().

get_eager_peers(Root) ->
partisan_gen_server:call(?SERVER, {get_eager_peers, Root}).
gen_server:call(?SERVER, {get_eager_peers, Root}).


%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec get_lazy_peers(Root :: node()) -> list() | no_return().
-spec get_lazy_peers(Root :: node()) -> list().

get_lazy_peers(Root) ->
partisan_gen_server:call(?SERVER, {get_lazy_peers, Root}).
gen_server:call(?SERVER, {get_lazy_peers, Root}).



Expand Down Expand Up @@ -716,7 +733,19 @@ debug_get_peers(Node, Root) ->
{nodeset(), nodeset()} | no_return().

debug_get_peers(Node, Root, Timeout) ->
partisan_gen_server:call({?SERVER, Node}, {get_peers, Root}, Timeout).
%% This will not work because gen_server uses disterl
%% gen_server:call({?SERVER, Node}, {get_peers, Root}, Timeout).
%% TODO reconsider turning this server into a partisan_gen_server
case partisan_rpc:call(Node, ?MODULE, get_peers, [Root], Timeout) of
{badrpc, Reason} ->
error(Reason);
{_, _} = Result ->
%% eqwalizer:ignore
Result;
{_, _, _} = Result ->
%% eqwalizer:ignore
Result
end.


%% @doc return the peers for `Node' for the tree rooted at `Root'.
Expand All @@ -725,7 +754,19 @@ debug_get_peers(Node, Root, Timeout) ->
{nodeset(), nodeset(), Info :: map()} | no_return().

debug_get_peers(Node, Root, Opts, Timeout) ->
partisan_gen_server:call({?SERVER, Node}, {get_peers, Root, Opts}, Timeout).
%% This will not work because gen_server uses disterl
%% gen_server:call({?SERVER, Node}, {get_peers, Root}, Timeout).
%% TODO reconsider turning this server into a partisan_gen_server
case partisan_rpc:call(Node, ?MODULE, get_peers, [Root, Opts], Timeout) of
{badrpc, Reason} ->
error(Reason);
{_, _} = Result ->
%% eqwalizer:ignore
Result;
{_, _, _} = Result ->
%% eqwalizer:ignore
Result
end.


%% -----------------------------------------------------------------------------
Expand Down Expand Up @@ -1068,7 +1109,7 @@ exchange(Peer, #state{exchanges = Exchanges} = State, Mod) ->

{error, _Reason} ->
State;
_ ->
_ ->
State
end.

Expand Down Expand Up @@ -1272,8 +1313,8 @@ send(Msg, Mod, Peers) when is_list(Peers) ->

send(Msg, Mod, Peer) ->
instrument_transmission(Msg, Mod),
Opts = [{channel, broadcast_channel(Mod)}],
partisan_gen_server:cast({?SERVER, Peer}, Msg, Opts).
Opts = #{channel => broadcast_channel(Mod)},
partisan:cast_message(Peer, ?SERVER, Msg, Opts).


%% @private
Expand All @@ -1296,10 +1337,10 @@ schedule_tick(Message, Timer, Default) ->
-spec reset_peers(nodeset(), nodeset(), nodeset(), state()) -> state().

reset_peers(AllMembers, EagerPeers, LazyPeers, State) ->
Node = State#state.node,
MyNode = partisan:node(),
State#state{
common_eagers = ordsets:del_element(Node, EagerPeers),
common_lazys = ordsets:del_element(Node, LazyPeers),
common_eagers = ordsets:del_element(MyNode, EagerPeers),
common_lazys = ordsets:del_element(MyNode, LazyPeers),
eager_sets = maps:new(),
lazy_sets = maps:new(),
all_members = AllMembers
Expand Down Expand Up @@ -1329,4 +1370,4 @@ instrument_transmission(Message, Mod) ->
end,
ToLog
)
end.
end.

0 comments on commit ef505f1

Please sign in to comment.