Skip to content

Commit

Permalink
grpcbox-stream: Fix to support multiple simultaneous unary/strem calls
Browse files Browse the repository at this point in the history
This commit addresses stress-test failure mentioned in tsloughter#29.

Added two new APIs:
add_channel(Name, Endpoints, Options)
delete_channel(Pid)

This would give ability to user to add and delete channels on the fly.

Also modified stress_test test case to use this logic. With out this change, stress test fails around 10 simultaneous connections. With this change I can see around 90 simultaneous connections.

Signed-off-by: Vasu Dasari <[email protected]>
  • Loading branch information
vasu-dasari committed Nov 16, 2020
1 parent 3ad2a98 commit 1ceee49
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 5 deletions.
21 changes: 20 additions & 1 deletion src/grpcbox_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@
-export([start_link/3,
is_ready/1,
pick/2,
stop/1]).
stop/1,
add_channel/3,
delete_channel/1]).
-export([init/1,
callback_mode/0,
terminate/3,
connected/3,
idle/3]).

-include_lib("stdlib/include/ms_transform.hrl").
-include("grpcbox.hrl").

-define(CHANNEL(Name), {via, gproc, {n, l, {?MODULE, Name}}}).
Expand Down Expand Up @@ -45,6 +48,16 @@
stats_handler :: module() | undefined,
refresh_interval :: timer:time()}).

%% @doc add a new channel
-spec add_channel(name(), [endpoint()], options()) -> {ok, pid()}.
add_channel(Name, Endpoints, Options) ->
grpcbox_channel_sup:start_child(Name, Endpoints, Options).

%% @doc Delete a channel
-spec delete_channel(pid()) -> any().
delete_channel(Pid) when is_pid(Pid) ->
ok = supervisor:terminate_child(grpcbox_channel_sup, Pid).

-spec start_link(name(), [endpoint()], options()) -> {ok, pid()}.
start_link(Name, Endpoints, Options) ->
gen_statem:start_link(?CHANNEL(Name), ?MODULE, [Name, Endpoints, Options], []).
Expand Down Expand Up @@ -129,6 +142,12 @@ handle_event(_, _, Data) ->
{keep_state, Data}.

terminate(_Reason, _State, #data{pool=Name}) ->
[ets:delete(?CHANNELS_TAB, Key) || Key <-
ets:select(?CHANNELS_TAB, ets:fun2ms(fun
({{N, V}, _}) when N == Name ->
{N, V}
end))
],
gproc_pool:force_delete(Name),
ok.

Expand Down
28 changes: 24 additions & 4 deletions test/grpcbox_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -477,8 +477,8 @@ multiple_servers(_Config) ->
unary(_Config),
unary(_Config).

bidirectional(_Config) ->
{ok, S} = routeguide_route_guide_client:route_chat(ctx:new()),
bidirectional(Config) ->
{ok, S} = routeguide_route_guide_client:route_chat(ctx:new(), proplists:get_value(options, Config, #{})),
%% send 2 before receiving since the server only sends what it already had in its list of messages for the
%% location of your last send.
ok = grpcbox_client:send(S, #{location => #{latitude => 1, longitude => 1}, message => <<"hello there">>}),
Expand Down Expand Up @@ -561,10 +561,30 @@ stress_test(Config) ->

stress_test(Config, Count) ->
lists:foreach(fun
(Ref) ->
(ProcId) ->
Parent = self(),
spawn(fun() ->
stress_test_function(fun bidirectional/1, Config, Ref, Parent) end)
Channel = erlang:list_to_atom("proc_" ++ erlang:integer_to_list(ProcId)),
erlang:register(Channel, self()),
{ok, _Pid} = grpcbox_channel:add_channel(
Channel,
[{http, "localhost", 8080, []}],
#{}
),
lists:foldl(fun
(_, not_ready) ->
timer:sleep(10),
grpcbox_channel:is_ready(Channel);
(_,Acc) ->
Acc
end, not_ready, lists:seq(1, 100)),

stress_test_function(fun bidirectional/1,
[{options,#{channel => Channel}} | Config],
ProcId, Parent),
ok
%% grpcbox_channel:delete_channel(Pid)
end)
end, lists:seq(1, Count)),

Loop = fun Loop(LoopCount) ->
Expand Down

0 comments on commit 1ceee49

Please sign in to comment.