From cf15a5357a7817011b9523490bf2f55ce89d0065 Mon Sep 17 00:00:00 2001 From: zmstone Date: Sat, 9 Nov 2024 12:15:20 +0100 Subject: [PATCH 01/21] ci: fix ci --- .github/workflows/run_test_case.yaml | 21 ++------------------- 1 file changed, 2 insertions(+), 19 deletions(-) diff --git a/.github/workflows/run_test_case.yaml b/.github/workflows/run_test_case.yaml index f08cbe0..67efaec 100644 --- a/.github/workflows/run_test_case.yaml +++ b/.github/workflows/run_test_case.yaml @@ -12,15 +12,15 @@ jobs: fail-fast: false matrix: otp: + - erlang:27 - erlang:26 - erlang:25 - - erlang:24 container: image: ${{ matrix.otp }} steps: - - uses: actions/checkout@v1 + - uses: actions/checkout@v2 - name: Code dialyzer run: | make xref @@ -30,21 +30,4 @@ jobs: make eunit make ct make cover - - uses: actions/upload-artifact@v1 - if: failure() - with: - name: logs - path: _build/test/logs - coveralls: - runs-on: ubuntu-latest - container: - image: erlang:25 - steps: - - uses: actions/checkout@v1 - - name: Coveralls - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - run: | - rebar3 as test do eunit,ct,cover - make coveralls From e704f700c6f171fc8013548afb748e70f075ed98 Mon Sep 17 00:00:00 2001 From: zmstone Date: Mon, 11 Nov 2024 18:31:46 +0100 Subject: [PATCH 02/21] ci: only trigger github actions on pull-request --- .github/workflows/run_test_case.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/run_test_case.yaml b/.github/workflows/run_test_case.yaml index 67efaec..97dc87a 100644 --- a/.github/workflows/run_test_case.yaml +++ b/.github/workflows/run_test_case.yaml @@ -1,6 +1,6 @@ name: Run test case -on: [push, pull_request] +on: [pull_request] jobs: From d5d5f2655229759fddbb779b1653751b8308c998 Mon Sep 17 00:00:00 2001 From: zmstone Date: Sat, 9 Nov 2024 12:07:34 +0100 Subject: [PATCH 03/21] fix: enhance connection rate limit Prior to this change, if connection rate is limited, the acceptors will enter suspending state and stop accepting the sockets leaving the sockets in the system backlog. If the acceptor backlog (default=1024) is filled up, for long enough time to cause the majority of the clients to have closed socket from their end and try to reconnect aggressively, the acceptor may never be able to get a normal socket again. The fix is: in suspending state, accept the sockets and immediately cose them to free up the backlog. The close triggers TCP-RST to cut the TCP graceful close overheads. --- src/esockd_acceptor.erl | 100 ++++++++++++++++++++++++++++------------ 1 file changed, 71 insertions(+), 29 deletions(-) diff --git a/src/esockd_acceptor.erl b/src/esockd_acceptor.erl index a672947..d34c39d 100644 --- a/src/esockd_acceptor.erl +++ b/src/esockd_acceptor.erl @@ -60,7 +60,7 @@ upgrade_funs :: [esockd:sock_fun()], conn_limiter :: undefined | esockd_generic_limiter:limiter(), conn_sup :: pid(), - accept_ref :: term() + accept_ref = no_ref :: term() }). %% @doc Start an acceptor @@ -121,7 +121,10 @@ init([Proto, ListenOn, ConnSup, TuneFun, UpgradeFuns, Limiter, LSock]) -> }, {next_event, internal, begin_waiting}}. -handle_event(internal, begin_waiting, waiting, State = #state{lsock = LSock}) -> +handle_event(internal, begin_waiting, waiting, #state{accept_ref = Ref}) when Ref =/= no_ref -> + %% started waiting in suspending state + keep_state_and_data; +handle_event(internal, begin_waiting, waiting, State = #state{lsock = LSock, accept_ref = no_ref}) -> case prim_inet:async_accept(LSock, -1) of {ok, Ref} -> {keep_state, State#state{accept_ref = Ref}}; @@ -129,13 +132,28 @@ handle_event(internal, begin_waiting, waiting, State = #state{lsock = LSock}) -> Reason =:= emfile; Reason =:= enfile -> - {next_state, suspending, State, {state_timeout, 1000, begin_waiting}}; + start_suspending(State, 1000); {error, econnaborted} -> {next_state, waiting, State, {next_event, internal, begin_waiting}}; {error, closed} -> {stop, normal, State}; {error, Reason} -> - error_logger:error_msg("~p async_accept error: ~p", [?MODULE, Reason]), + {stop, Reason, State} + end; +handle_event(internal, accept_and_close, suspending, State = #state{lsock = LSock}) -> + case prim_inet:async_accept(LSock, -1) of + {ok, Ref} -> + {keep_state, State#state{accept_ref = Ref}}; + {error, Reason} when + Reason =:= emfile; + Reason =:= enfile + -> + {keep_state_and_data, {next_event, internal, accept_and_close}}; + {error, econnaborted} -> + {keep_state_and_data, {next_event, internal, accept_and_close}}; + {error, closed} -> + {stop, normal, State}; + {error, Reason} -> {stop, Reason, State} end; handle_event( @@ -144,17 +162,27 @@ handle_event( waiting, State = #state{lsock = LSock, accept_ref = Ref} ) -> - {next_state, token_request, State, {next_event, internal, {token_request, Sock}}}; + NextEvent = {next_event, internal, {token_request, Sock}}, + {next_state, token_request, State#state{accept_ref = no_ref}, NextEvent}; +handle_event( + info, + {inet_async, LSock, Ref, {ok, Sock}}, + suspending, + State = #state{lsock = LSock, accept_ref = Ref} +) -> + _ = close(Sock), + NextEvent = {next_event, internal, accept_and_close}, + {keep_state, State#state{accept_ref = no_ref}, NextEvent}; handle_event( - internal, {token_request, Sock} = Content, token_request, State = #state{conn_limiter = Limiter} + internal, {token_request, Sock}, token_request, State = #state{conn_limiter = Limiter} ) -> case esockd_generic_limiter:consume(1, Limiter) of {ok, Limiter2} -> {next_state, accepting, State#state{conn_limiter = Limiter2}, {next_event, internal, {accept, Sock}}}; {pause, PauseTime, Limiter2} -> - {next_state, suspending, State#state{conn_limiter = Limiter2}, - {state_timeout, PauseTime, Content}} + _ = close(Sock), + start_suspending(State#state{conn_limiter = Limiter2}, PauseTime) end; handle_event( internal, @@ -181,30 +209,28 @@ handle_event( {ok, _Pid} -> ok; {error, Reason} -> - handle_accept_error(Reason, "Failed to start connection on ~s: ~p", State), + handle_accept_error(Reason, "failed_to_start_connection_process", State), close(Sock) end; {error, Reason} -> - handle_accept_error(Reason, "Tune buffer failed on ~s: ~s", State), + handle_accept_error(Reason, "failed_to_apply_tune_funcs", State), close(Sock) end, {next_state, waiting, State, {next_event, internal, begin_waiting}}; -handle_event(state_timeout, {token_request, _} = Content, suspending, State) -> - {next_state, token_request, State, {next_event, internal, Content}}; handle_event(state_timeout, begin_waiting, suspending, State) -> {next_state, waiting, State, {next_event, internal, begin_waiting}}; handle_event( info, {inet_async, LSock, Ref, {error, Reason}}, - _, + StateName, State = #state{lsock = LSock, accept_ref = Ref} ) -> - handle_socket_error(Reason, State); + handle_socket_error(Reason, State#state{accept_ref = no_ref}, StateName); handle_event(Type, Content, StateName, _) -> - error_logger:warning_msg( - "Unhandled message, State:~p, Type:~p Content:~p", - [StateName, Type, Content] - ), + logger:log(warning, #{msg => "esockd_acceptor_unhandled_event", + state_name => StateName, + event_type => Type, + event_content => Content}), keep_state_and_data. terminate(normal, _StateName, #state{}) -> @@ -212,7 +238,8 @@ terminate(normal, _StateName, #state{}) -> terminate(shutdown, _StateName, #state{}) -> ok; terminate(Reason, _StateName, #state{}) -> - error_logger:error_msg("~p terminating due to ~p", [?MODULE, Reason]), + logger:log(error, #{msg => "esockd_acceptor_terminating", + reaseon => Reason}), ok. code_change(_OldVsn, StateName, State, _Extra) -> @@ -224,6 +251,7 @@ code_change(_OldVsn, StateName, State, _Extra) -> close(Sock) -> try + %% port-close leads to a TPC reset which cuts out the tcp graceful close overheads true = port_close(Sock), receive {'EXIT', Sock, _} -> ok after 1 -> ok end catch @@ -241,28 +269,42 @@ handle_accept_error(overloaded, _, #state{proto = Proto, listen_on = ListenOn}) esockd_server:inc_stats({Proto, ListenOn}, closed_overloaded, 1), ok; handle_accept_error(Reason, Msg, #state{sockname = Sockname}) -> - error_logger:error_msg(Msg, [esockd:format(Sockname), Reason]). + logger:log(error, #{msg => Msg, + listener => esockd:format(Sockname), + cause => Reason}). -handle_socket_error(closed, State) -> +handle_socket_error(closed, State, _StateName) -> {stop, normal, State}; %% {error, econnaborted} -> accept %% {error, esslaccept} -> accept %% {error, esslaccept} -> accept -handle_socket_error(Reason, State) when Reason =:= econnaborted; Reason =:= esslaccept -> +handle_socket_error(Reason, State, suspending) when Reason =:= econnaborted; Reason =:= esslaccept -> + {keep_state, State, {next_event, internal, accept_and_close}}; +handle_socket_error(Reason, State, _StateName) when Reason =:= econnaborted; Reason =:= esslaccept -> {next_state, waiting, State, {next_event, internal, begin_waiting}}; %% emfile: The per-process limit of open file descriptors has been reached. %% enfile: The system limit on the total number of open files has been reached. %% enfile: The system limit on the total number of open files has been reached. -handle_socket_error(Reason, State) when Reason =:= emfile; Reason =:= enfile -> - error_logger:error_msg( - "Accept error on ~s: ~s", - [esockd:format(State#state.sockname), explain_posix(Reason)] - ), - {next_state, suspending, State, {state_timeout, 1000, begin_waiting}}; -handle_socket_error(Reason, State) -> +handle_socket_error(Reason, State, suspending) when Reason =:= emfile; Reason =:= enfile -> + log_system_limit(State, Reason), + {keep_state, State, {next_event, internal, accept_and_close}}; +handle_socket_error(Reason, State, _StateName) when Reason =:= emfile; Reason =:= enfile -> + log_system_limit(State, Reason), + start_suspending(State, 1000); +handle_socket_error(Reason, State, _StateName) -> {stop, Reason, State}. explain_posix(emfile) -> "EMFILE (Too many open files)"; explain_posix(enfile) -> "ENFILE (File table overflow)". + +log_system_limit(State, Reason) -> + logger:log(error, #{msg => "cannot_accept_more_connections", + listener => esockd:format(State#state.sockname), + cause => explain_posix(Reason)}). + +start_suspending(State, Timeout) -> + Actions = [{next_event, internal, accept_and_close}, + {state_timeout, Timeout, begin_waiting}], + {next_state, suspending, State, Actions}. From f500702431e0884b5109fea807ca9be3823c31b4 Mon Sep 17 00:00:00 2001 From: zmstone Date: Sat, 9 Nov 2024 12:32:14 +0100 Subject: [PATCH 04/21] fix: maybe -> 'maybe' --- src/esockd_udp.erl | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/esockd_udp.erl b/src/esockd_udp.erl index 9fd2b3d..84f603e 100644 --- a/src/esockd_udp.erl +++ b/src/esockd_udp.erl @@ -55,22 +55,22 @@ -export([proxy_request/1]). --type(maybe(T) :: undefined | T). +-type('maybe'(T) :: undefined | T). -record(state, { proto :: atom(), sock :: inet:socket(), port :: inet:port_number(), - rate_limit :: maybe(esockd_rate_limit:bucket()), + rate_limit :: 'maybe'(esockd_rate_limit:bucket()), conn_limiter :: esockd_generic_limiter:limiter(), - limit_timer :: maybe(reference()), + limit_timer :: 'maybe'(reference()), max_peers :: infinity | pos_integer(), peers :: map(), options :: [esockd:option()], access_rules :: list(), mfa :: esockd:mfargs(), - health_check_request :: maybe(binary()), - health_check_reply :: maybe(binary()) + health_check_request :: 'maybe'(binary()), + health_check_reply :: 'maybe'(binary()) }). -define(ACTIVE_N, 100). From 35040c0a81c0d6c3b1bd2a5d44008950f612c7d6 Mon Sep 17 00:00:00 2001 From: zmstone Date: Sat, 9 Nov 2024 16:57:13 +0100 Subject: [PATCH 05/21] chore: remove unused dependency --- rebar.config | 5 ----- 1 file changed, 5 deletions(-) diff --git a/rebar.config b/rebar.config index 202b9f0..11bb374 100644 --- a/rebar.config +++ b/rebar.config @@ -17,11 +17,6 @@ {src_dirs, ["src"]}. - -{deps, - [ {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} - ]}. - {cover_enabled, true}. {cover_opts, [verbose]}. {cover_export_enabled, true}. From 9510e4c2a3d34c945f8dc3bf95092aeebe86ad3d Mon Sep 17 00:00:00 2001 From: zmstone Date: Sat, 9 Nov 2024 17:19:14 +0100 Subject: [PATCH 06/21] fix: dialyzer on otp 27 --- src/esockd_acceptor.erl | 4 ++-- src/esockd_dtls_acceptor.erl | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/esockd_acceptor.erl b/src/esockd_acceptor.erl index d34c39d..c1f8e52 100644 --- a/src/esockd_acceptor.erl +++ b/src/esockd_acceptor.erl @@ -201,7 +201,7 @@ handle_event( inet_db:register_socket(Sock, SockMod), %% Inc accepted stats. - esockd_server:inc_stats({Proto, ListenOn}, accepted, 1), + _ = esockd_server:inc_stats({Proto, ListenOn}, accepted, 1), case eval_tune_socket_fun(TuneFun, Sock) of {ok, Sock} -> @@ -266,7 +266,7 @@ handle_accept_error(enotconn, _, _) -> handle_accept_error(einval, _, _) -> ok; handle_accept_error(overloaded, _, #state{proto = Proto, listen_on = ListenOn}) -> - esockd_server:inc_stats({Proto, ListenOn}, closed_overloaded, 1), + _ = esockd_server:inc_stats({Proto, ListenOn}, closed_overloaded, 1), ok; handle_accept_error(Reason, Msg, #state{sockname = Sockname}) -> logger:log(error, #{msg => Msg, diff --git a/src/esockd_dtls_acceptor.erl b/src/esockd_dtls_acceptor.erl index b6c29de..2b5c60f 100644 --- a/src/esockd_dtls_acceptor.erl +++ b/src/esockd_dtls_acceptor.erl @@ -85,7 +85,7 @@ accepting(internal, accept, case ssl:transport_accept(LSock) of {ok, Sock} -> %% Inc accepted stats. - esockd_server:inc_stats({Proto, ListenOn}, accepted, 1), + _ = esockd_server:inc_stats({Proto, ListenOn}, accepted, 1), _ = case eval_tune_socket_fun(TuneFun, Sock) of {ok, Sock} -> case esockd_connection_sup:start_connection(ConnSup, Sock, UpgradeFuns) of From f66c1aad82ef43ed9268b1705e05c6bf8b0418a8 Mon Sep 17 00:00:00 2001 From: zmstone Date: Mon, 11 Nov 2024 16:10:57 +0100 Subject: [PATCH 07/21] refactor: delete coveralls --- rebar.config | 3 +-- rebar.config.script | 20 -------------------- 2 files changed, 1 insertion(+), 22 deletions(-) delete mode 100644 rebar.config.script diff --git a/rebar.config b/rebar.config index 11bb374..80b133d 100644 --- a/rebar.config +++ b/rebar.config @@ -26,8 +26,7 @@ {profiles, [{test, - [{plugins, [{coveralls, {git, "https://github.com/emqx/coveralls-erl", {branch, "github"}}}]}, - {deps, [{meck, "0.8.13"}]}, + [{deps, [{meck, "0.8.13"}]}, {erl_opts, [debug_info]}, {extra_src_dirs, ["examples/client", diff --git a/rebar.config.script b/rebar.config.script deleted file mode 100644 index 5447ffb..0000000 --- a/rebar.config.script +++ /dev/null @@ -1,20 +0,0 @@ -%% -*-: erlang -*- - -case {os:getenv("GITHUB_ACTIONS"), os:getenv("GITHUB_TOKEN")} of - {"true", Token} when is_list(Token) -> - CONFIG1 = [{coveralls_repo_token, Token}, - {coveralls_service_job_id, os:getenv("GITHUB_RUN_ID")}, - {coveralls_commit_sha, os:getenv("GITHUB_SHA")}, - {coveralls_service_number, os:getenv("GITHUB_RUN_NUMBER")}, - {coveralls_coverdata, "_build/test/cover/*.coverdata"}, - {coveralls_service_name, "github"} | CONFIG], - case os:getenv("GITHUB_EVENT_NAME") =:= "pull_request" - andalso string:tokens(os:getenv("GITHUB_REF"), "/") of - [_, "pull", PRNO, _] -> - [{coveralls_service_pull_request, PRNO} | CONFIG1]; - _ -> - CONFIG1 - end; - _ -> - CONFIG -end. From 3d2bc5de94f46149531d7149ada53615cf461a80 Mon Sep 17 00:00:00 2001 From: zmstone Date: Mon, 11 Nov 2024 16:13:25 +0100 Subject: [PATCH 08/21] refactor: return socket tune_fun error to acceptor --- src/esockd_acceptor_sup.erl | 34 +++++++++++++++++++++++++--------- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/src/esockd_acceptor_sup.erl b/src/esockd_acceptor_sup.erl index d9ec1dc..f1755c5 100644 --- a/src/esockd_acceptor_sup.erl +++ b/src/esockd_acceptor_sup.erl @@ -32,6 +32,9 @@ %% callbacks -export([tune_socket/2]). +%% Test +-export([tune_socket_fun/1]). + -define(ACCEPTOR_POOL, 16). %%-------------------------------------------------------------------- @@ -105,9 +108,19 @@ init({AcceptorMod, AcceptorArgs}) -> %% ------------------------------------------------------------------- tune_socket_fun(Opts) -> - TuneOpts = [ {tune_buffer, proplists:get_bool(tune_buffer, Opts)} - %% optional callback, returns ok | {error, Reason} - , {tune_fun, proplists:get_value(tune_fun, Opts, undefined)}], + Opts1 = case proplists:get_bool(tune_buffer, Opts) of + true -> + [{tune_buffer, true}]; + false -> + [] + end, + Opts2 = case proplists:get_value(tune_fun, Opts) of + undefined -> + []; + MFA -> + [{tune_fun, MFA}] + end, + TuneOpts = Opts1 ++ Opts2, {fun ?MODULE:tune_socket/2, [TuneOpts]}. upgrade_funs(Type, Opts) -> @@ -139,12 +152,15 @@ tune_socket(Sock, [{tune_buffer, true}|More]) -> case esockd_transport:getopts(Sock, [sndbuf, recbuf, buffer]) of {ok, BufSizes} -> BufSz = lists:max([Sz || {_Opt, Sz} <- BufSizes]), - _ = esockd_transport:setopts(Sock, [{buffer, BufSz}]), - tune_socket(Sock, More); - Error -> Error - end; -tune_socket(Sock, [{tune_fun, undefined} | More]) -> - tune_socket(Sock, More); + case esockd_transport:setopts(Sock, [{buffer, BufSz}]) of + ok -> + tune_socket(Sock, More); + Error -> + Error + end; + Error -> + Error + end; tune_socket(Sock, [{tune_fun, {M, F, A}} | More]) -> case apply(M, F, A) of ok -> From 823441712dff4314e0348b0a98cd71210faa3b0c Mon Sep 17 00:00:00 2001 From: zmstone Date: Mon, 11 Nov 2024 16:27:18 +0100 Subject: [PATCH 09/21] feat: add more close counters --- src/esockd_acceptor.erl | 73 +++++++++++++++++++++++++----------- src/esockd_server.erl | 23 ++++++++---- test/esockd_server_SUITE.erl | 2 +- 3 files changed, 67 insertions(+), 31 deletions(-) diff --git a/src/esockd_acceptor.erl b/src/esockd_acceptor.erl index c1f8e52..7b80407 100644 --- a/src/esockd_acceptor.erl +++ b/src/esockd_acceptor.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -59,7 +59,7 @@ tune_fun :: esockd:sock_fun(), upgrade_funs :: [esockd:sock_fun()], conn_limiter :: undefined | esockd_generic_limiter:limiter(), - conn_sup :: pid(), + conn_sup :: pid() | {function(), list()}, accept_ref = no_ref :: term() }). @@ -125,15 +125,18 @@ handle_event(internal, begin_waiting, waiting, #state{accept_ref = Ref}) when Re %% started waiting in suspending state keep_state_and_data; handle_event(internal, begin_waiting, waiting, State = #state{lsock = LSock, accept_ref = no_ref}) -> - case prim_inet:async_accept(LSock, -1) of + case async_accept(LSock) of {ok, Ref} -> {keep_state, State#state{accept_ref = Ref}}; {error, Reason} when Reason =:= emfile; Reason =:= enfile -> + inc_stats(State, Reason), + log_system_limit(State, Reason), start_suspending(State, 1000); {error, econnaborted} -> + inc_stats(State, econnaborted), {next_state, waiting, State, {next_event, internal, begin_waiting}}; {error, closed} -> {stop, normal, State}; @@ -141,15 +144,17 @@ handle_event(internal, begin_waiting, waiting, State = #state{lsock = LSock, acc {stop, Reason, State} end; handle_event(internal, accept_and_close, suspending, State = #state{lsock = LSock}) -> - case prim_inet:async_accept(LSock, -1) of + case async_accept(LSock) of {ok, Ref} -> {keep_state, State#state{accept_ref = Ref}}; {error, Reason} when Reason =:= emfile; Reason =:= enfile -> + inc_stats(State, Reason), {keep_state_and_data, {next_event, internal, accept_and_close}}; {error, econnaborted} -> + inc_stats(State, econnaborted), {keep_state_and_data, {next_event, internal, accept_and_close}}; {error, closed} -> {stop, normal, State}; @@ -171,6 +176,7 @@ handle_event( State = #state{lsock = LSock, accept_ref = Ref} ) -> _ = close(Sock), + inc_stats(State, rate_limitted), NextEvent = {next_event, internal, accept_and_close}, {keep_state, State#state{accept_ref = no_ref}, NextEvent}; handle_event( @@ -182,6 +188,7 @@ handle_event( {next_event, internal, {accept, Sock}}}; {pause, PauseTime, Limiter2} -> _ = close(Sock), + inc_stats(State, rate_limitted), start_suspending(State#state{conn_limiter = Limiter2}, PauseTime) end; handle_event( @@ -189,8 +196,6 @@ handle_event( {accept, Sock}, accepting, State = #state{ - proto = Proto, - listen_on = ListenOn, sockmod = SockMod, tune_fun = TuneFun, upgrade_funs = UpgradeFuns, @@ -199,22 +204,23 @@ handle_event( ) -> %% make it look like gen_tcp:accept inet_db:register_socket(Sock, SockMod), - - %% Inc accepted stats. - _ = esockd_server:inc_stats({Proto, ListenOn}, accepted, 1), - case eval_tune_socket_fun(TuneFun, Sock) of - {ok, Sock} -> - case esockd_connection_sup:start_connection(ConnSup, Sock, UpgradeFuns) of + {ok, NewSock} -> + case start_connection(ConnSup, NewSock, UpgradeFuns) of {ok, _Pid} -> + %% Inc accepted stats. + inc_stats(State, accepted), ok; {error, Reason} -> handle_accept_error(Reason, "failed_to_start_connection_process", State), - close(Sock) + close(NewSock), + inc_stats(State, Reason) end; - {error, Reason} -> - handle_accept_error(Reason, "failed_to_apply_tune_funcs", State), - close(Sock) + {error, _Reason} -> + %% the socket became invalid before + %% starting the owner process + close(Sock), + inc_stats(State, closed_nostart) end, {next_state, waiting, State, {next_event, internal, begin_waiting}}; handle_event(state_timeout, begin_waiting, suspending, State) -> @@ -225,6 +231,7 @@ handle_event( StateName, State = #state{lsock = LSock, accept_ref = Ref} ) -> + inc_stats(State, Reason), handle_socket_error(Reason, State#state{accept_ref = no_ref}, StateName); handle_event(Type, Content, StateName, _) -> logger:log(warning, #{msg => "esockd_acceptor_unhandled_event", @@ -252,7 +259,7 @@ code_change(_OldVsn, StateName, State, _Extra) -> close(Sock) -> try %% port-close leads to a TPC reset which cuts out the tcp graceful close overheads - true = port_close(Sock), + _ = port_close(Sock), receive {'EXIT', Sock, _} -> ok after 1 -> ok end catch error:_ -> ok @@ -265,8 +272,7 @@ handle_accept_error(enotconn, _, _) -> ok; handle_accept_error(einval, _, _) -> ok; -handle_accept_error(overloaded, _, #state{proto = Proto, listen_on = ListenOn}) -> - _ = esockd_server:inc_stats({Proto, ListenOn}, closed_overloaded, 1), +handle_accept_error(overloaded, _, _) -> ok; handle_accept_error(Reason, Msg, #state{sockname = Sockname}) -> logger:log(error, #{msg => Msg, @@ -300,11 +306,34 @@ explain_posix(enfile) -> "ENFILE (File table overflow)". log_system_limit(State, Reason) -> - logger:log(error, #{msg => "cannot_accept_more_connections", - listener => esockd:format(State#state.sockname), - cause => explain_posix(Reason)}). + logger:log(critical, + #{msg => "cannot_accept_more_connections", + listener => esockd:format(State#state.sockname), + cause => explain_posix(Reason)}). start_suspending(State, Timeout) -> Actions = [{next_event, internal, accept_and_close}, {state_timeout, Timeout, begin_waiting}], {next_state, suspending, State, Actions}. + +inc_stats(#state{proto = Proto, listen_on = ListenOn}, Tag) -> + Counter = counter(Tag), + _ = esockd_server:inc_stats({Proto, ListenOn}, Counter, 1), + ok. + +counter(accepted) -> accepted; +counter(closed_nostart) -> closed_nostart; +counter(emfile) -> closed_overloaded; +counter(enfile) -> closed_overloaded; +counter(overloaded) -> closed_overloaded; +counter(rate_limitted) -> closed_rate_limitted; +counter(_) -> closed_other_reasons. + +start_connection(ConnSup, Sock, UpgradeFuns) when is_pid(ConnSup) -> + esockd_connection_sup:start_connection(ConnSup, Sock, UpgradeFuns); +start_connection({F, A}, Sock, UpgradeFuns) when is_function(F) -> + %% only in tests so far + apply(F, A ++ [Sock, UpgradeFuns]). + +async_accept(LSock) -> + prim_inet:async_accept(LSock, -1). diff --git a/src/esockd_server.erl b/src/esockd_server.erl index d4990a8..c4504c7 100644 --- a/src/esockd_server.erl +++ b/src/esockd_server.erl @@ -66,14 +66,14 @@ stop() -> gen_server:stop(?SERVER). -spec(stats_fun({atom(), esockd:listen_on()}, atom()) -> fun()). stats_fun({Protocol, ListenOn}, Metric) -> - init_stats({Protocol, ListenOn}, Metric), + init_stats({Protocol, ListenOn}, [Metric]), fun({inc, Num}) -> esockd_server:inc_stats({Protocol, ListenOn}, Metric, Num); ({dec, Num}) -> esockd_server:dec_stats({Protocol, ListenOn}, Metric, Num) end. --spec(init_stats({atom(), esockd:listen_on()}, atom()) -> ok). -init_stats({Protocol, ListenOn}, Metric) -> - gen_server:call(?SERVER, {init, {Protocol, ListenOn}, Metric}). +-spec(init_stats({atom(), esockd:listen_on()}, [atom()]) -> ok). +init_stats({Protocol, ListenOn}, Metrics) -> + gen_server:call(?SERVER, {init, {Protocol, ListenOn}, Metrics}). -spec(get_stats({atom(), esockd:listen_on()}) -> [{atom(), non_neg_integer()}]). get_stats({Protocol, ListenOn}) -> @@ -97,8 +97,13 @@ del_stats({Protocol, ListenOn}) -> -spec ensure_stats({atom(), esockd:listen_on()}) -> ok. ensure_stats(StatsKey) -> - ok = ?MODULE:init_stats(StatsKey, accepted), - ok = ?MODULE:init_stats(StatsKey, closed_overloaded). + Stats = [accepted, + closed_nostart, + closed_overloaded, + closed_rate_limitted, + closed_other_reasons], + ok = ?MODULE:init_stats(StatsKey, Stats), + ok. -spec get_listener_prop(esockd:listener_ref(), _Name) -> _Value | undefined. get_listener_prop(ListenerRef = {_Proto, _ListenOn}, Name) -> @@ -125,8 +130,10 @@ init([]) -> {write_concurrency, true}]), {ok, #state{listener_props = #{}}}. -handle_call({init, {Protocol, ListenOn}, Metric}, _From, State) -> - true = ets:insert(?STATS_TAB, {{{Protocol, ListenOn}, Metric}, 0}), +handle_call({init, {Protocol, ListenOn}, Metrics}, _From, State) -> + lists:foreach(fun(Metric) -> + true = ets:insert(?STATS_TAB, {{{Protocol, ListenOn}, Metric}, 0}) + end, Metrics), {reply, ok, State, hibernate}; handle_call({get_listener_prop, ListenerRef, Name}, _From, diff --git a/test/esockd_server_SUITE.erl b/test/esockd_server_SUITE.erl index 0eb258c..eb37139 100644 --- a/test/esockd_server_SUITE.erl +++ b/test/esockd_server_SUITE.erl @@ -26,7 +26,7 @@ all() -> esockd_ct:all(?MODULE). t_inc_dec_stats(_) -> {ok, _} = esockd_server:start_link(), Name = {echo, 3000}, - esockd_server:init_stats(Name, accepting), + esockd_server:init_stats(Name, [accepting]), esockd_server:inc_stats(Name, accepting, 2), esockd_server:inc_stats(Name, accepting, 2), esockd_server:dec_stats(Name, accepting, 1), From e1e44b05055585ff9dd83e01e8f1db5d8dd737d0 Mon Sep 17 00:00:00 2001 From: zmstone Date: Mon, 11 Nov 2024 17:32:13 +0100 Subject: [PATCH 10/21] test: add test case for esockd_acceptor --- test/esockd_acceptor_SUITE.erl | 222 +++++++++++++++++++++++++++++++++ 1 file changed, 222 insertions(+) create mode 100644 test/esockd_acceptor_SUITE.erl diff --git a/test/esockd_acceptor_SUITE.erl b/test/esockd_acceptor_SUITE.erl new file mode 100644 index 0000000..8e12e23 --- /dev/null +++ b/test/esockd_acceptor_SUITE.erl @@ -0,0 +1,222 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(esockd_acceptor_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include("esockd.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +-define(PORT, 30000 + ?LINE). +-define(COUNTER_ACCPETED, 1). +-define(COUNTER_NOSTART, 2). +-define(COUNTER_OVERLOADED, 3). +-define(COUNTER_RATE_LIMITTED, 4). +-define(COUNTER_OTHER_REASONSE, 5). +-define(COUNTER_LAST, 10). + +counter_tag_to_index(accepted) -> ?COUNTER_ACCPETED; +counter_tag_to_index(closed_nostart) -> ?COUNTER_NOSTART; +counter_tag_to_index(closed_overloaded) -> ?COUNTER_OVERLOADED; +counter_tag_to_index(closed_rate_limitted) -> ?COUNTER_RATE_LIMITTED; +counter_tag_to_index(closed_other_reasons) -> ?COUNTER_OTHER_REASONSE. + +all() -> esockd_ct:all(?MODULE). + +init_per_suite(Config) -> + Config. + +end_per_suite(_Config) -> + ok. + +init_per_testcase(_Case, Config) -> + Counters = counters:new(?COUNTER_LAST, []), + meck:new(esockd_server, [passthrough, no_history]), + meck:expect(esockd_server, inc_stats, + fun(_, Tag, Count) -> + Index = counter_tag_to_index(Tag), + counters:add(Counters, Index, Count) + end), + [{counters, Counters} | Config]. + +end_per_testcase(_Case, _Config) -> + meck:unload(esockd_server). + +start(PortNumber, Limiter) -> + start(PortNumber, Limiter, #{}). + +start(PortNumber, Limiter, Opts) -> + SockOpts = [binary, + {active, false}, + {reuseaddr, true}, + {nodelay, true}, + {backlog, maps:get(backlog, Opts, 1024)}], + {ok, ListenSocket} = gen_tcp:listen(PortNumber, SockOpts), + TuneFun = maps:get(tune_fun, Opts, esockd_acceptor_sup:tune_socket_fun([])), + StartConn = {fun ?MODULE:start_connection/3, [Opts]}, + {ok, AccPid} = esockd_acceptor:start_link(tcp, PortNumber, StartConn, TuneFun, _UpFuns = [], Limiter, ListenSocket), + #{lsock => ListenSocket, acceptor => AccPid}. + +stop(#{lsock := ListenSocket, acceptor := AccPid}) -> + ok = gen_statem:stop(AccPid), + gen_tcp:close(ListenSocket), + ok. + +connect(Port) -> + connect(Port, 1000, #{}). + +connect(Port, Timeout, Opts0) -> + Opts = [binary, + {active, maps:get(active, Opts0, false)}, + {nodelay, true}], + gen_tcp:connect("localhost", Port, Opts, Timeout). + +%% This is the very basic test, if this fails, nothing elese matters. +t_normal(Config) -> + Port = ?PORT, + Server = start(Port, no_limit()), + {ok, ClientSock} = connect(Port), + try + ok = wait_for_counter(Config, ?COUNTER_ACCPETED, 1, 2000) + after + disconnect(ClientSock), + stop(Server) + end. + +t_rate_limitted(Config) -> + Port = ?PORT, + Pause = 200, + Server = start(Port, pause_then_allow(Pause)), + try + Count = 10, + Socks = lists:map(fun(_) -> + {ok, Sock} = connect(Port, 1000, #{active => true}), + Sock + end, lists:seq(1, Count)), + lists:foreach(fun(Sock) -> + receive + {tcp_closed, Sock} -> + ok; + Other -> + ct:fail({unexpected, Other}) + end + end, Socks), + ok = wait_for_counter(Config, ?COUNTER_RATE_LIMITTED, Count, 2000), + timer:sleep(Pause), + {ok, Sock2} = connect(Port), + ok = wait_for_counter(Config, ?COUNTER_ACCPETED, 1, 2000), + disconnect(Sock2) + after + stop(Server) + end. + +%% Failed to spawn new connection process +t_overloaded(Config) -> + Port = ?PORT, + Server = start(Port, no_limit(), #{start_connection_result => {error, overloaded}}), + {ok, Sock1} = connect(Port), + try + ok = wait_for_counter(Config, ?COUNTER_OVERLOADED, 1, 2000), + disconnect(Sock1) + after + stop(Server) + end. + +%% Failed to tune the socket opts +t_nostart(Config) -> + Port = ?PORT, + Server = start(Port, no_limit(), #{tune_fun => {fun(_) -> {error, einval} end, []}}), + {ok, Sock1} = connect(Port), + try + ok = wait_for_counter(Config, ?COUNTER_NOSTART, 1, 2000), + disconnect(Sock1) + after + stop(Server) + end. + +disconnect(Socket) -> + port_close(Socket), + ok. + +%% no connection can get through +pause_then_allow(Pause) -> + #{module => ?MODULE, + name => pause_then_allow, + current => pause, + next => allow, + pause => Pause + }. + +%% make a no-limit limiter +no_limit() -> + #{module => ?MODULE, name => no_limit}. + +%% limiter callback +consume(_Token, #{name := pause_then_allow} = Limiter) -> + case Limiter of + #{current := pause} -> + {pause, maps:get(pause, Limiter), Limiter#{current => allow}}; + #{current := allow} -> + {ok, Limiter} + end; +consume(_Token, #{name := no_limit} = Limiter) -> + {ok, Limiter}. + +%% inspect during tests +get_pd_counter(Tag) -> + get({counter, Tag}). + +now_ts() -> erlang:system_time(millisecond). + +wait_for_counter(Config, Index, Count, Timeout) -> + Counters = proplists:get_value(counters, Config), + Now = now_ts(), + Deadline = Now + Timeout, + do_wait_for_counter(Counters, Index, Count, Deadline). + +do_wait_for_counter(Counters, Index, Count, Deadline) -> + case counters:get(Counters, Index) of + Count -> + ok; + Other when Other > Count -> + error(#{cause => counter_exceeded_expect, + expected => Count, + counter_index => Index, + got => Other}); + Other -> + case now_ts() > Deadline of + true -> + error(#{cause => timeout, + expected => Count, + counter_index=> Index, + got => Other}); + false -> + timer:sleep(100), + do_wait_for_counter(Counters, Index, Count, Deadline) + end + end. + +%% dummy callback to start connection +start_connection(Opts, _Sock, _UpgradeFuns) -> + case maps:get(start_connection_result, Opts, undefined) of + undefined -> + {ok, pid}; + Other -> + Other + end. From 44673da4fc1afc331daa067fa92ded81489b0ca6 Mon Sep 17 00:00:00 2001 From: zmstone Date: Mon, 11 Nov 2024 18:01:25 +0100 Subject: [PATCH 11/21] refactor: move econnaborted to closed_nostart counter group --- src/esockd_acceptor.erl | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/esockd_acceptor.erl b/src/esockd_acceptor.erl index 7b80407..615880c 100644 --- a/src/esockd_acceptor.erl +++ b/src/esockd_acceptor.erl @@ -149,13 +149,11 @@ handle_event(internal, accept_and_close, suspending, State = #state{lsock = LSoc {keep_state, State#state{accept_ref = Ref}}; {error, Reason} when Reason =:= emfile; - Reason =:= enfile + Reason =:= enfile; + Reason =:= econnaborted -> inc_stats(State, Reason), {keep_state_and_data, {next_event, internal, accept_and_close}}; - {error, econnaborted} -> - inc_stats(State, econnaborted), - {keep_state_and_data, {next_event, internal, accept_and_close}}; {error, closed} -> {stop, normal, State}; {error, Reason} -> @@ -283,14 +281,12 @@ handle_socket_error(closed, State, _StateName) -> {stop, normal, State}; %% {error, econnaborted} -> accept %% {error, esslaccept} -> accept -%% {error, esslaccept} -> accept handle_socket_error(Reason, State, suspending) when Reason =:= econnaborted; Reason =:= esslaccept -> {keep_state, State, {next_event, internal, accept_and_close}}; handle_socket_error(Reason, State, _StateName) when Reason =:= econnaborted; Reason =:= esslaccept -> {next_state, waiting, State, {next_event, internal, begin_waiting}}; %% emfile: The per-process limit of open file descriptors has been reached. %% enfile: The system limit on the total number of open files has been reached. -%% enfile: The system limit on the total number of open files has been reached. handle_socket_error(Reason, State, suspending) when Reason =:= emfile; Reason =:= enfile -> log_system_limit(State, Reason), {keep_state, State, {next_event, internal, accept_and_close}}; @@ -322,6 +318,9 @@ inc_stats(#state{proto = Proto, listen_on = ListenOn}, Tag) -> ok. counter(accepted) -> accepted; +counter(econnaborted) -> closed_nostart; +%% unsure how this could happen +counter(esslaccept ) -> closed_nostart; counter(closed_nostart) -> closed_nostart; counter(emfile) -> closed_overloaded; counter(enfile) -> closed_overloaded; From 18c494a914271ef62d3e71cf107612334b9f965d Mon Sep 17 00:00:00 2001 From: zmstone Date: Mon, 11 Nov 2024 19:04:38 +0100 Subject: [PATCH 12/21] fix: overload detection is a tune-fun --- src/esockd_acceptor.erl | 8 ++------ src/esockd_acceptor_sup.erl | 6 +++--- src/esockd_server.erl | 1 - test/esockd_SUITE.erl | 2 +- test/esockd_acceptor_SUITE.erl | 18 ++++++++---------- 5 files changed, 14 insertions(+), 21 deletions(-) diff --git a/src/esockd_acceptor.erl b/src/esockd_acceptor.erl index 615880c..db3be77 100644 --- a/src/esockd_acceptor.erl +++ b/src/esockd_acceptor.erl @@ -214,11 +214,11 @@ handle_event( close(NewSock), inc_stats(State, Reason) end; - {error, _Reason} -> + {error, Reason} -> %% the socket became invalid before %% starting the owner process close(Sock), - inc_stats(State, closed_nostart) + inc_stats(State, Reason) end, {next_state, waiting, State, {next_event, internal, begin_waiting}}; handle_event(state_timeout, begin_waiting, suspending, State) -> @@ -318,10 +318,6 @@ inc_stats(#state{proto = Proto, listen_on = ListenOn}, Tag) -> ok. counter(accepted) -> accepted; -counter(econnaborted) -> closed_nostart; -%% unsure how this could happen -counter(esslaccept ) -> closed_nostart; -counter(closed_nostart) -> closed_nostart; counter(emfile) -> closed_overloaded; counter(enfile) -> closed_overloaded; counter(overloaded) -> closed_overloaded; diff --git a/src/esockd_acceptor_sup.erl b/src/esockd_acceptor_sup.erl index f1755c5..6dccddd 100644 --- a/src/esockd_acceptor_sup.erl +++ b/src/esockd_acceptor_sup.erl @@ -33,7 +33,7 @@ -export([tune_socket/2]). %% Test --export([tune_socket_fun/1]). +-export([tune_socket_fun/2]). -define(ACCEPTOR_POOL, 16). @@ -46,7 +46,7 @@ start_supervised(ListenerRef = {Proto, ListenOn}) -> Type = esockd_server:get_listener_prop(ListenerRef, type), Opts = esockd_server:get_listener_prop(ListenerRef, options), - TuneFun = tune_socket_fun(Opts), + TuneFun = tune_socket_fun(Type, Opts), UpgradeFuns = upgrade_funs(Type, Opts), LimiterOpts = esockd_listener_sup:conn_limiter_opts(Opts, {listener, Proto, ListenOn}), Limiter = esockd_listener_sup:conn_rate_limiter(LimiterOpts), @@ -107,7 +107,7 @@ init({AcceptorMod, AcceptorArgs}) -> %% Internal functions %% ------------------------------------------------------------------- -tune_socket_fun(Opts) -> +tune_socket_fun(Type, Opts) -> Opts1 = case proplists:get_bool(tune_buffer, Opts) of true -> [{tune_buffer, true}]; diff --git a/src/esockd_server.erl b/src/esockd_server.erl index c4504c7..1b45982 100644 --- a/src/esockd_server.erl +++ b/src/esockd_server.erl @@ -98,7 +98,6 @@ del_stats({Protocol, ListenOn}) -> -spec ensure_stats({atom(), esockd:listen_on()}) -> ok. ensure_stats(StatsKey) -> Stats = [accepted, - closed_nostart, closed_overloaded, closed_rate_limitted, closed_other_reasons], diff --git a/test/esockd_SUITE.erl b/test/esockd_SUITE.erl index 3f041b8..d7746c3 100644 --- a/test/esockd_SUITE.erl +++ b/test/esockd_SUITE.erl @@ -527,7 +527,7 @@ t_tune_fun_overload(_) -> ?assertEqual(Socket, S), timer:sleep(10), Cnts = esockd_server:get_stats({Name, LPort}), - ?assertEqual(1, proplists:get_value(accepted, Cnts)), + ?assertEqual(0, proplists:get_value(accepted, Cnts)), ?assertEqual(1, proplists:get_value(closed_overloaded, Cnts)), %% Still possible to conenct afterwards {ok, _S} = gen_tcp:connect("127.0.0.1", LPort, [{active, true}]), diff --git a/test/esockd_acceptor_SUITE.erl b/test/esockd_acceptor_SUITE.erl index 8e12e23..5ac2da6 100644 --- a/test/esockd_acceptor_SUITE.erl +++ b/test/esockd_acceptor_SUITE.erl @@ -25,17 +25,15 @@ -define(PORT, 30000 + ?LINE). -define(COUNTER_ACCPETED, 1). --define(COUNTER_NOSTART, 2). --define(COUNTER_OVERLOADED, 3). --define(COUNTER_RATE_LIMITTED, 4). --define(COUNTER_OTHER_REASONSE, 5). +-define(COUNTER_OVERLOADED, 2). +-define(COUNTER_RATE_LIMITTED, 3). +-define(COUNTER_OTHER_REASONS, 4). -define(COUNTER_LAST, 10). counter_tag_to_index(accepted) -> ?COUNTER_ACCPETED; -counter_tag_to_index(closed_nostart) -> ?COUNTER_NOSTART; counter_tag_to_index(closed_overloaded) -> ?COUNTER_OVERLOADED; counter_tag_to_index(closed_rate_limitted) -> ?COUNTER_RATE_LIMITTED; -counter_tag_to_index(closed_other_reasons) -> ?COUNTER_OTHER_REASONSE. +counter_tag_to_index(closed_other_reasons) -> ?COUNTER_OTHER_REASONS. all() -> esockd_ct:all(?MODULE). @@ -68,7 +66,7 @@ start(PortNumber, Limiter, Opts) -> {nodelay, true}, {backlog, maps:get(backlog, Opts, 1024)}], {ok, ListenSocket} = gen_tcp:listen(PortNumber, SockOpts), - TuneFun = maps:get(tune_fun, Opts, esockd_acceptor_sup:tune_socket_fun([])), + TuneFun = maps:get(tune_fun, Opts, esockd_acceptor_sup:tune_socket_fun(tcp, [])), StartConn = {fun ?MODULE:start_connection/3, [Opts]}, {ok, AccPid} = esockd_acceptor:start_link(tcp, PortNumber, StartConn, TuneFun, _UpFuns = [], Limiter, ListenSocket), #{lsock => ListenSocket, acceptor => AccPid}. @@ -127,7 +125,7 @@ t_rate_limitted(Config) -> end. %% Failed to spawn new connection process -t_overloaded(Config) -> +t_error_when_spawn(Config) -> Port = ?PORT, Server = start(Port, no_limit(), #{start_connection_result => {error, overloaded}}), {ok, Sock1} = connect(Port), @@ -139,12 +137,12 @@ t_overloaded(Config) -> end. %% Failed to tune the socket opts -t_nostart(Config) -> +t_einval(Config) -> Port = ?PORT, Server = start(Port, no_limit(), #{tune_fun => {fun(_) -> {error, einval} end, []}}), {ok, Sock1} = connect(Port), try - ok = wait_for_counter(Config, ?COUNTER_NOSTART, 1, 2000), + ok = wait_for_counter(Config, ?COUNTER_OTHER_REASONS, 1, 2000), disconnect(Sock1) after stop(Server) From 129da5bf71f8b99f80c132407e530f387d6b3197 Mon Sep 17 00:00:00 2001 From: zmstone Date: Mon, 11 Nov 2024 19:28:29 +0100 Subject: [PATCH 13/21] refactor: delete dead-code esslaccept error reason has been deleted since OTP 16 --- src/esockd_acceptor.erl | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/esockd_acceptor.erl b/src/esockd_acceptor.erl index db3be77..776baca 100644 --- a/src/esockd_acceptor.erl +++ b/src/esockd_acceptor.erl @@ -280,10 +280,9 @@ handle_accept_error(Reason, Msg, #state{sockname = Sockname}) -> handle_socket_error(closed, State, _StateName) -> {stop, normal, State}; %% {error, econnaborted} -> accept -%% {error, esslaccept} -> accept -handle_socket_error(Reason, State, suspending) when Reason =:= econnaborted; Reason =:= esslaccept -> +handle_socket_error(Reason, State, suspending) when Reason =:= econnaborted -> {keep_state, State, {next_event, internal, accept_and_close}}; -handle_socket_error(Reason, State, _StateName) when Reason =:= econnaborted; Reason =:= esslaccept -> +handle_socket_error(Reason, State, _StateName) when Reason =:= econnaborted -> {next_state, waiting, State, {next_event, internal, begin_waiting}}; %% emfile: The per-process limit of open file descriptors has been reached. %% enfile: The system limit on the total number of open files has been reached. From 2adb465077a4dffd8da82af7764ee97e2a97482a Mon Sep 17 00:00:00 2001 From: zmstone Date: Mon, 11 Nov 2024 21:23:16 +0100 Subject: [PATCH 14/21] chore: fix typos --- src/esockd_acceptor.erl | 11 +++++------ src/esockd_server.erl | 2 +- test/esockd_acceptor_SUITE.erl | 4 ++-- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/src/esockd_acceptor.erl b/src/esockd_acceptor.erl index 776baca..3cd830e 100644 --- a/src/esockd_acceptor.erl +++ b/src/esockd_acceptor.erl @@ -174,7 +174,7 @@ handle_event( State = #state{lsock = LSock, accept_ref = Ref} ) -> _ = close(Sock), - inc_stats(State, rate_limitted), + inc_stats(State, rate_limited), NextEvent = {next_event, internal, accept_and_close}, {keep_state, State#state{accept_ref = no_ref}, NextEvent}; handle_event( @@ -186,7 +186,7 @@ handle_event( {next_event, internal, {accept, Sock}}}; {pause, PauseTime, Limiter2} -> _ = close(Sock), - inc_stats(State, rate_limitted), + inc_stats(State, rate_limited), start_suspending(State#state{conn_limiter = Limiter2}, PauseTime) end; handle_event( @@ -243,8 +243,7 @@ terminate(normal, _StateName, #state{}) -> terminate(shutdown, _StateName, #state{}) -> ok; terminate(Reason, _StateName, #state{}) -> - logger:log(error, #{msg => "esockd_acceptor_terminating", - reaseon => Reason}), + logger:log(error, #{msg => "esockd_acceptor_terminating", reason => Reason}), ok. code_change(_OldVsn, StateName, State, _Extra) -> @@ -256,7 +255,7 @@ code_change(_OldVsn, StateName, State, _Extra) -> close(Sock) -> try - %% port-close leads to a TPC reset which cuts out the tcp graceful close overheads + %% port-close leads to a TCP reset which cuts out the tcp graceful close overheads _ = port_close(Sock), receive {'EXIT', Sock, _} -> ok after 1 -> ok end catch @@ -320,7 +319,7 @@ counter(accepted) -> accepted; counter(emfile) -> closed_overloaded; counter(enfile) -> closed_overloaded; counter(overloaded) -> closed_overloaded; -counter(rate_limitted) -> closed_rate_limitted; +counter(rate_limited) -> closed_rate_limited; counter(_) -> closed_other_reasons. start_connection(ConnSup, Sock, UpgradeFuns) when is_pid(ConnSup) -> diff --git a/src/esockd_server.erl b/src/esockd_server.erl index 1b45982..fac2049 100644 --- a/src/esockd_server.erl +++ b/src/esockd_server.erl @@ -99,7 +99,7 @@ del_stats({Protocol, ListenOn}) -> ensure_stats(StatsKey) -> Stats = [accepted, closed_overloaded, - closed_rate_limitted, + closed_rate_limited, closed_other_reasons], ok = ?MODULE:init_stats(StatsKey, Stats), ok. diff --git a/test/esockd_acceptor_SUITE.erl b/test/esockd_acceptor_SUITE.erl index 5ac2da6..440f9d8 100644 --- a/test/esockd_acceptor_SUITE.erl +++ b/test/esockd_acceptor_SUITE.erl @@ -26,13 +26,13 @@ -define(PORT, 30000 + ?LINE). -define(COUNTER_ACCPETED, 1). -define(COUNTER_OVERLOADED, 2). --define(COUNTER_RATE_LIMITTED, 3). +-define(COUNTER_RATE_LIMITED, 3). -define(COUNTER_OTHER_REASONS, 4). -define(COUNTER_LAST, 10). counter_tag_to_index(accepted) -> ?COUNTER_ACCPETED; counter_tag_to_index(closed_overloaded) -> ?COUNTER_OVERLOADED; -counter_tag_to_index(closed_rate_limitted) -> ?COUNTER_RATE_LIMITTED; +counter_tag_to_index(closed_rate_limited) -> ?COUNTER_RATE_LIMITED; counter_tag_to_index(closed_other_reasons) -> ?COUNTER_OTHER_REASONS. all() -> esockd_ct:all(?MODULE). From 0e685acf8e1f3c0348d876ea41e5a2901be2b663 Mon Sep 17 00:00:00 2001 From: zmstone Date: Mon, 11 Nov 2024 21:25:49 +0100 Subject: [PATCH 15/21] chore: no_ref -> ?NOREF --- src/esockd_acceptor.erl | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/esockd_acceptor.erl b/src/esockd_acceptor.erl index 3cd830e..e21be3d 100644 --- a/src/esockd_acceptor.erl +++ b/src/esockd_acceptor.erl @@ -50,6 +50,8 @@ code_change/4 ]). +-define(NOREF, noref). + -record(state, { proto :: atom(), listen_on :: esockd:listen_on(), @@ -60,7 +62,7 @@ upgrade_funs :: [esockd:sock_fun()], conn_limiter :: undefined | esockd_generic_limiter:limiter(), conn_sup :: pid() | {function(), list()}, - accept_ref = no_ref :: term() + accept_ref = ?NOREF :: term() }). %% @doc Start an acceptor @@ -121,10 +123,10 @@ init([Proto, ListenOn, ConnSup, TuneFun, UpgradeFuns, Limiter, LSock]) -> }, {next_event, internal, begin_waiting}}. -handle_event(internal, begin_waiting, waiting, #state{accept_ref = Ref}) when Ref =/= no_ref -> +handle_event(internal, begin_waiting, waiting, #state{accept_ref = Ref}) when Ref =/= ?NOREF -> %% started waiting in suspending state keep_state_and_data; -handle_event(internal, begin_waiting, waiting, State = #state{lsock = LSock, accept_ref = no_ref}) -> +handle_event(internal, begin_waiting, waiting, State = #state{lsock = LSock, accept_ref = ?NOREF}) -> case async_accept(LSock) of {ok, Ref} -> {keep_state, State#state{accept_ref = Ref}}; @@ -166,7 +168,7 @@ handle_event( State = #state{lsock = LSock, accept_ref = Ref} ) -> NextEvent = {next_event, internal, {token_request, Sock}}, - {next_state, token_request, State#state{accept_ref = no_ref}, NextEvent}; + {next_state, token_request, State#state{accept_ref = ?NOREF}, NextEvent}; handle_event( info, {inet_async, LSock, Ref, {ok, Sock}}, @@ -176,7 +178,7 @@ handle_event( _ = close(Sock), inc_stats(State, rate_limited), NextEvent = {next_event, internal, accept_and_close}, - {keep_state, State#state{accept_ref = no_ref}, NextEvent}; + {keep_state, State#state{accept_ref = ?NOREF}, NextEvent}; handle_event( internal, {token_request, Sock}, token_request, State = #state{conn_limiter = Limiter} ) -> @@ -230,7 +232,7 @@ handle_event( State = #state{lsock = LSock, accept_ref = Ref} ) -> inc_stats(State, Reason), - handle_socket_error(Reason, State#state{accept_ref = no_ref}, StateName); + handle_socket_error(Reason, State#state{accept_ref = ?NOREF}, StateName); handle_event(Type, Content, StateName, _) -> logger:log(warning, #{msg => "esockd_acceptor_unhandled_event", state_name => StateName, From 1814bac6ccea2fcb7e052a2855096a93da01300b Mon Sep 17 00:00:00 2001 From: zmstone Date: Mon, 11 Nov 2024 22:12:56 +0100 Subject: [PATCH 16/21] chore: delay handling of emfile when system limit is reached, it does not make sense to continue accepting new connections aggressively --- src/esockd_acceptor.erl | 76 ++++++++++++++++------- src/esockd_server.erl | 1 + test/esockd_acceptor_SUITE.erl | 107 +++++++++++++++++++++++++++------ test/esockd_test_lib.erl | 24 ++++++++ 4 files changed, 166 insertions(+), 42 deletions(-) create mode 100644 test/esockd_test_lib.erl diff --git a/src/esockd_acceptor.erl b/src/esockd_acceptor.erl index e21be3d..7a12878 100644 --- a/src/esockd_acceptor.erl +++ b/src/esockd_acceptor.erl @@ -52,6 +52,9 @@ -define(NOREF, noref). +%% time to suspend if system-limit reached (emfile/enfile) +-define(SYS_LIMIT_SUSPEND_MS, 1000). + -record(state, { proto :: atom(), listen_on :: esockd:listen_on(), @@ -127,16 +130,9 @@ handle_event(internal, begin_waiting, waiting, #state{accept_ref = Ref}) when Re %% started waiting in suspending state keep_state_and_data; handle_event(internal, begin_waiting, waiting, State = #state{lsock = LSock, accept_ref = ?NOREF}) -> - case async_accept(LSock) of + case async_accept(waiting, LSock) of {ok, Ref} -> {keep_state, State#state{accept_ref = Ref}}; - {error, Reason} when - Reason =:= emfile; - Reason =:= enfile - -> - inc_stats(State, Reason), - log_system_limit(State, Reason), - start_suspending(State, 1000); {error, econnaborted} -> inc_stats(State, econnaborted), {next_state, waiting, State, {next_event, internal, begin_waiting}}; @@ -146,15 +142,11 @@ handle_event(internal, begin_waiting, waiting, State = #state{lsock = LSock, acc {stop, Reason, State} end; handle_event(internal, accept_and_close, suspending, State = #state{lsock = LSock}) -> - case async_accept(LSock) of + case async_accept(suspending, LSock) of {ok, Ref} -> {keep_state, State#state{accept_ref = Ref}}; - {error, Reason} when - Reason =:= emfile; - Reason =:= enfile; - Reason =:= econnaborted - -> - inc_stats(State, Reason), + {error, connaborted} -> + inc_stats(State, connaborted), {keep_state_and_data, {next_event, internal, accept_and_close}}; {error, closed} -> {stop, normal, State}; @@ -189,7 +181,7 @@ handle_event( {pause, PauseTime, Limiter2} -> _ = close(Sock), inc_stats(State, rate_limited), - start_suspending(State#state{conn_limiter = Limiter2}, PauseTime) + enter_suspending(State#state{conn_limiter = Limiter2}, PauseTime) end; handle_event( internal, @@ -285,14 +277,12 @@ handle_socket_error(Reason, State, suspending) when Reason =:= econnaborted -> {keep_state, State, {next_event, internal, accept_and_close}}; handle_socket_error(Reason, State, _StateName) when Reason =:= econnaborted -> {next_state, waiting, State, {next_event, internal, begin_waiting}}; -%% emfile: The per-process limit of open file descriptors has been reached. -%% enfile: The system limit on the total number of open files has been reached. handle_socket_error(Reason, State, suspending) when Reason =:= emfile; Reason =:= enfile -> log_system_limit(State, Reason), {keep_state, State, {next_event, internal, accept_and_close}}; handle_socket_error(Reason, State, _StateName) when Reason =:= emfile; Reason =:= enfile -> log_system_limit(State, Reason), - start_suspending(State, 1000); + enter_suspending(State, ?SYS_LIMIT_SUSPEND_MS); handle_socket_error(Reason, State, _StateName) -> {stop, Reason, State}. @@ -307,19 +297,28 @@ log_system_limit(State, Reason) -> listener => esockd:format(State#state.sockname), cause => explain_posix(Reason)}). -start_suspending(State, Timeout) -> +enter_suspending(State, Timeout) -> Actions = [{next_event, internal, accept_and_close}, {state_timeout, Timeout, begin_waiting}], {next_state, suspending, State, Actions}. inc_stats(#state{proto = Proto, listen_on = ListenOn}, Tag) -> Counter = counter(Tag), + case Counter of + closed_sys_limit -> + %% slow down when system limit reached + %% to aovid flooding the error logs + %% also, it makes no sense to drain backlog in such condition + timer:sleep(100); + _ -> + ok + end, _ = esockd_server:inc_stats({Proto, ListenOn}, Counter, 1), ok. counter(accepted) -> accepted; -counter(emfile) -> closed_overloaded; -counter(enfile) -> closed_overloaded; +counter(emfile) -> closed_sys_limit; +counter(enfile) -> closed_sys_limit; counter(overloaded) -> closed_overloaded; counter(rate_limited) -> closed_rate_limited; counter(_) -> closed_other_reasons. @@ -330,5 +329,36 @@ start_connection({F, A}, Sock, UpgradeFuns) when is_function(F) -> %% only in tests so far apply(F, A ++ [Sock, UpgradeFuns]). -async_accept(LSock) -> +%% To throttle system-limit error logs, +%% if system limit reached, slow down the state machine by +%% trunning the immediate return to a delayed async message. +%% The first delayed message should cause acceptor to enter suspending state. +%% Then it should continue to accept 10 more sockets (which are all likely +%% to result in emfile error anyway) during suspending state. +async_accept(Currentstate, LSock) -> + case do_async_accept(LSock) of + {error, Reason} when Reason =:= emfile orelse Reason =:= enfile -> + Delay = case Currentstate of + suspending -> + ?SYS_LIMIT_SUSPEND_MS div 10; + _Waiting -> + 0 + end, + Ref = make_ref(), + Msg = {inet_async, LSock, Ref, {error, Reason}}, + _ = erlang:send_after(Delay, self(), Msg), + {ok, Ref}; + Other -> + Other + end. + +%% prim_inet is a sticky module. +%% delegate the call to esockd_test_lib in tests +%% so it can be mockde. +-ifndef(TEST). +do_async_accept(LSock) -> prim_inet:async_accept(LSock, -1). +-else. +do_async_accept(LSock) -> + esockd_test_lib:async_accept(LSock). +-endif. diff --git a/src/esockd_server.erl b/src/esockd_server.erl index fac2049..20572ba 100644 --- a/src/esockd_server.erl +++ b/src/esockd_server.erl @@ -98,6 +98,7 @@ del_stats({Protocol, ListenOn}) -> -spec ensure_stats({atom(), esockd:listen_on()}) -> ok. ensure_stats(StatsKey) -> Stats = [accepted, + closed_sys_limit, closed_overloaded, closed_rate_limited, closed_other_reasons], diff --git a/test/esockd_acceptor_SUITE.erl b/test/esockd_acceptor_SUITE.erl index 440f9d8..17ca22b 100644 --- a/test/esockd_acceptor_SUITE.erl +++ b/test/esockd_acceptor_SUITE.erl @@ -27,10 +27,12 @@ -define(COUNTER_ACCPETED, 1). -define(COUNTER_OVERLOADED, 2). -define(COUNTER_RATE_LIMITED, 3). --define(COUNTER_OTHER_REASONS, 4). +-define(COUNTER_SYS_LIMIT, 4). +-define(COUNTER_OTHER_REASONS, 5). -define(COUNTER_LAST, 10). counter_tag_to_index(accepted) -> ?COUNTER_ACCPETED; +counter_tag_to_index(closed_sys_limit) -> ?COUNTER_SYS_LIMIT; counter_tag_to_index(closed_overloaded) -> ?COUNTER_OVERLOADED; counter_tag_to_index(closed_rate_limited) -> ?COUNTER_RATE_LIMITED; counter_tag_to_index(closed_other_reasons) -> ?COUNTER_OTHER_REASONS. @@ -45,7 +47,7 @@ end_per_suite(_Config) -> init_per_testcase(_Case, Config) -> Counters = counters:new(?COUNTER_LAST, []), - meck:new(esockd_server, [passthrough, no_history]), + meck:new(esockd_server, [passthrough, no_history, no_sticky]), meck:expect(esockd_server, inc_stats, fun(_, Tag, Count) -> Index = counter_tag_to_index(Tag), @@ -115,7 +117,7 @@ t_rate_limitted(Config) -> ct:fail({unexpected, Other}) end end, Socks), - ok = wait_for_counter(Config, ?COUNTER_RATE_LIMITTED, Count, 2000), + ok = wait_for_counter(Config, ?COUNTER_RATE_LIMITED, Count, 2000), timer:sleep(Pause), {ok, Sock2} = connect(Port), ok = wait_for_counter(Config, ?COUNTER_ACCPETED, 1, 2000), @@ -148,6 +150,57 @@ t_einval(Config) -> stop(Server) end. +%% It not possible to trigger a real emfile error while keeping +%% the Erlang VM healthy (test case may need to write files etc), +%% so we use meck to simulate one. +t_sys_limit(Config) -> + meck:new(esockd_test_lib, [passthrough, no_history]), + meck:expect(esockd_test_lib, async_accept, fun(_) -> {error, emfile} end), + Port = ?PORT, + Server = start(Port, no_limit()), + try + %% acceptor to enter suspending state after started + %% because async_accept always returns {error, emfile} + ok = wait_for_counter(Config, ?COUNTER_SYS_LIMIT, {'>', 1}, 2000), + %% now unload the mock + meck:unload(esockd_test_lib), + %% this one is closed immediately because acceptor is still in suspending state + {ok, Sock1} = connect(Port), + ok = wait_for_counter(Config, ?COUNTER_RATE_LIMITED, 1, 2000), + ok = assert_socket_disconnected(Sock1), + %% allow acceptor to exit from suspending state + timer:sleep(1000), + {ok, Sock2} = connect(Port), + ok = wait_for_counter(Config, ?COUNTER_ACCPETED, 1, 2000), + ok = assert_socket_connected(Sock2), + disconnect(Sock2) + after + stop(Server) + end. + +assert_socket_connected(Sock) -> + ok = inet:setopts(Sock, [{active, true}]), + receive + Msg -> + error({unexpected, Msg}) + after + 10 -> + ok + end. + +assert_socket_disconnected(Sock) -> + ok = inet:setopts(Sock, [{active, true}]), + receive + {tcp_closed, Sock} -> + ok; + Other -> + error({unexpected, Other}) + after + 100 -> + error(timeout) + end, + ok. + disconnect(Socket) -> port_close(Socket), ok. @@ -186,30 +239,46 @@ wait_for_counter(Config, Index, Count, Timeout) -> Counters = proplists:get_value(counters, Config), Now = now_ts(), Deadline = Now + Timeout, - do_wait_for_counter(Counters, Index, Count, Deadline). + try + do_wait_for_counter(Counters, Index, Count, Deadline) + catch throw : ok -> + ok + end. do_wait_for_counter(Counters, Index, Count, Deadline) -> - case counters:get(Counters, Index) of - Count -> + Value = counters:get(Counters, Index), + Match = match_counter(Value, Count), + case Match of + true -> + throw(ok); + false -> ok; - Other when Other > Count -> + error -> error(#{cause => counter_exceeded_expect, expected => Count, counter_index => Index, - got => Other}); - Other -> - case now_ts() > Deadline of - true -> - error(#{cause => timeout, - expected => Count, - counter_index=> Index, - got => Other}); - false -> - timer:sleep(100), - do_wait_for_counter(Counters, Index, Count, Deadline) - end + got => Value}) + end, + case now_ts() > Deadline of + true -> + error(#{cause => timeout, + expected => Count, + counter_index=> Index, + got => Value}); + false -> + timer:sleep(100), + do_wait_for_counter(Counters, Index, Count, Deadline) end. +match_counter(Value, {'>', Expect}) -> + Value > Expect; +match_counter(Value, Value) -> + true; +match_counter(Value, Expect) when Value > Expect -> + error; +match_counter(_Vlaue, _Expect) -> + false. + %% dummy callback to start connection start_connection(Opts, _Sock, _UpgradeFuns) -> case maps:get(start_connection_result, Opts, undefined) of diff --git a/test/esockd_test_lib.erl b/test/esockd_test_lib.erl new file mode 100644 index 0000000..3f01f98 --- /dev/null +++ b/test/esockd_test_lib.erl @@ -0,0 +1,24 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(esockd_test_lib). + +-export([async_accept/1]). + +%% prim_inet call delegation for meck +%% because prim_inet is a sticky module +async_accept(LSock) -> + prim_inet:async_accept(LSock, -1). From b8fce2a0952d92cd45b0d669ba1d1dd189075c9c Mon Sep 17 00:00:00 2001 From: zmstone Date: Mon, 11 Nov 2024 23:28:08 +0100 Subject: [PATCH 17/21] test: cover listener socket close --- src/esockd_acceptor.erl | 6 ++++-- test/esockd_acceptor_SUITE.erl | 24 ++++++++++++++++++++++++ 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/src/esockd_acceptor.erl b/src/esockd_acceptor.erl index 7a12878..b7b3403 100644 --- a/src/esockd_acceptor.erl +++ b/src/esockd_acceptor.erl @@ -232,9 +232,11 @@ handle_event(Type, Content, StateName, _) -> event_content => Content}), keep_state_and_data. -terminate(normal, _StateName, #state{}) -> +terminate(normal, _StateName, _) -> ok; -terminate(shutdown, _StateName, #state{}) -> +terminate(shutdown, _StateName, _) -> + ok; +terminate({shutdown, _}, _StateName, _) -> ok; terminate(Reason, _StateName, #state{}) -> logger:log(error, #{msg => "esockd_acceptor_terminating", reason => Reason}), diff --git a/test/esockd_acceptor_SUITE.erl b/test/esockd_acceptor_SUITE.erl index 17ca22b..17694a1 100644 --- a/test/esockd_acceptor_SUITE.erl +++ b/test/esockd_acceptor_SUITE.erl @@ -178,6 +178,30 @@ t_sys_limit(Config) -> stop(Server) end. +t_close_listener_socket_cause_acceptor_stop(_Config) -> + Port = ?PORT, + #{acceptor := Acceptor, lsock := LSock} = start(Port, no_limit()), + Mref = monitor(process, Acceptor), + unlink(Acceptor), + unlink(LSock), + {ok, Sock1} = connect(Port), + ok = assert_socket_connected(Sock1), + exit(LSock, kill), + receive + {'DOWN', Mref, process, Acceptor, Reason} -> + ?assertEqual(normal, Reason) + after + 1000 -> + error(timeout) + end, + receive + {tcp_closed, Sock1} -> + ok + after + 1000 -> + error(timeout) + end. + assert_socket_connected(Sock) -> ok = inet:setopts(Sock, [{active, true}]), receive From 10475ca4d67a9d09973190b0336f8e8eacfb3bbb Mon Sep 17 00:00:00 2001 From: zmstone Date: Mon, 11 Nov 2024 23:39:09 +0100 Subject: [PATCH 18/21] test: delete unused function --- test/esockd_acceptor_SUITE.erl | 4 ---- 1 file changed, 4 deletions(-) diff --git a/test/esockd_acceptor_SUITE.erl b/test/esockd_acceptor_SUITE.erl index 17694a1..6762d6b 100644 --- a/test/esockd_acceptor_SUITE.erl +++ b/test/esockd_acceptor_SUITE.erl @@ -253,10 +253,6 @@ consume(_Token, #{name := pause_then_allow} = Limiter) -> consume(_Token, #{name := no_limit} = Limiter) -> {ok, Limiter}. -%% inspect during tests -get_pd_counter(Tag) -> - get({counter, Tag}). - now_ts() -> erlang:system_time(millisecond). wait_for_counter(Config, Index, Count, Timeout) -> From 69ffe5afd318e01cac2f5adafe4522cc662e390a Mon Sep 17 00:00:00 2001 From: zmstone Date: Tue, 12 Nov 2024 13:51:40 +0100 Subject: [PATCH 19/21] test: unstick prim_inet for meck --- src/esockd_acceptor.erl | 17 +++-------------- test/esockd_acceptor_SUITE.erl | 6 +++--- test/esockd_test_lib.erl | 24 ------------------------ 3 files changed, 6 insertions(+), 41 deletions(-) delete mode 100644 test/esockd_test_lib.erl diff --git a/src/esockd_acceptor.erl b/src/esockd_acceptor.erl index b7b3403..99e5e49 100644 --- a/src/esockd_acceptor.erl +++ b/src/esockd_acceptor.erl @@ -145,8 +145,8 @@ handle_event(internal, accept_and_close, suspending, State = #state{lsock = LSoc case async_accept(suspending, LSock) of {ok, Ref} -> {keep_state, State#state{accept_ref = Ref}}; - {error, connaborted} -> - inc_stats(State, connaborted), + {error, econnaborted} -> + inc_stats(State, econnaborted), {keep_state_and_data, {next_event, internal, accept_and_close}}; {error, closed} -> {stop, normal, State}; @@ -338,7 +338,7 @@ start_connection({F, A}, Sock, UpgradeFuns) when is_function(F) -> %% Then it should continue to accept 10 more sockets (which are all likely %% to result in emfile error anyway) during suspending state. async_accept(Currentstate, LSock) -> - case do_async_accept(LSock) of + case prim_inet:async_accept(LSock, -1) of {error, Reason} when Reason =:= emfile orelse Reason =:= enfile -> Delay = case Currentstate of suspending -> @@ -353,14 +353,3 @@ async_accept(Currentstate, LSock) -> Other -> Other end. - -%% prim_inet is a sticky module. -%% delegate the call to esockd_test_lib in tests -%% so it can be mockde. --ifndef(TEST). -do_async_accept(LSock) -> - prim_inet:async_accept(LSock, -1). --else. -do_async_accept(LSock) -> - esockd_test_lib:async_accept(LSock). --endif. diff --git a/test/esockd_acceptor_SUITE.erl b/test/esockd_acceptor_SUITE.erl index 6762d6b..39e0cce 100644 --- a/test/esockd_acceptor_SUITE.erl +++ b/test/esockd_acceptor_SUITE.erl @@ -154,8 +154,8 @@ t_einval(Config) -> %% the Erlang VM healthy (test case may need to write files etc), %% so we use meck to simulate one. t_sys_limit(Config) -> - meck:new(esockd_test_lib, [passthrough, no_history]), - meck:expect(esockd_test_lib, async_accept, fun(_) -> {error, emfile} end), + meck:new(prim_inet, [passthrough, no_history, unstick]), + meck:expect(prim_inet, async_accept, fun(_, _) -> {error, emfile} end), Port = ?PORT, Server = start(Port, no_limit()), try @@ -163,7 +163,7 @@ t_sys_limit(Config) -> %% because async_accept always returns {error, emfile} ok = wait_for_counter(Config, ?COUNTER_SYS_LIMIT, {'>', 1}, 2000), %% now unload the mock - meck:unload(esockd_test_lib), + meck:unload(prim_inet), %% this one is closed immediately because acceptor is still in suspending state {ok, Sock1} = connect(Port), ok = wait_for_counter(Config, ?COUNTER_RATE_LIMITED, 1, 2000), diff --git a/test/esockd_test_lib.erl b/test/esockd_test_lib.erl deleted file mode 100644 index 3f01f98..0000000 --- a/test/esockd_test_lib.erl +++ /dev/null @@ -1,24 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(esockd_test_lib). - --export([async_accept/1]). - -%% prim_inet call delegation for meck -%% because prim_inet is a sticky module -async_accept(LSock) -> - prim_inet:async_accept(LSock, -1). From 2b96fb7b53c5fb5595d5f79b088269a4056784ca Mon Sep 17 00:00:00 2001 From: zmstone Date: Tue, 12 Nov 2024 21:40:45 +0100 Subject: [PATCH 20/21] fix: ignore econnreset error reason for error level logging --- src/esockd_acceptor.erl | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/esockd_acceptor.erl b/src/esockd_acceptor.erl index 99e5e49..f8317db 100644 --- a/src/esockd_acceptor.erl +++ b/src/esockd_acceptor.erl @@ -261,6 +261,8 @@ close(Sock) -> eval_tune_socket_fun({Fun, Args1}, Sock) -> apply(Fun, [Sock | Args1]). +handle_accept_error(econnreset, _, _) -> + ok; handle_accept_error(enotconn, _, _) -> ok; handle_accept_error(einval, _, _) -> From 661c947d3b4de9ccc649aaa53af346f2455e3d5c Mon Sep 17 00:00:00 2001 From: zmstone Date: Wed, 13 Nov 2024 13:56:26 +0100 Subject: [PATCH 21/21] chore: fix a typo in code comment --- src/esockd_acceptor.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/esockd_acceptor.erl b/src/esockd_acceptor.erl index f8317db..7471953 100644 --- a/src/esockd_acceptor.erl +++ b/src/esockd_acceptor.erl @@ -335,7 +335,7 @@ start_connection({F, A}, Sock, UpgradeFuns) when is_function(F) -> %% To throttle system-limit error logs, %% if system limit reached, slow down the state machine by -%% trunning the immediate return to a delayed async message. +%% turning the immediate return to a delayed async message. %% The first delayed message should cause acceptor to enter suspending state. %% Then it should continue to accept 10 more sockets (which are all likely %% to result in emfile error anyway) during suspending state.