-
Notifications
You must be signed in to change notification settings - Fork 84
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: enhance connection rate limit #196
Changes from all commits
cf15a53
e704f70
d5d5f26
f500702
35040c0
9510e4c
f66c1aa
3d2bc5d
8234417
e1e44b0
44673da
18c494a
129da5b
2adb465
0e685ac
1814bac
b8fce2a
10475ca
69ffe5a
2b96fb7
661c947
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
%%-------------------------------------------------------------------- | ||
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. | ||
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved. | ||
%% | ||
%% Licensed under the Apache License, Version 2.0 (the "License"); | ||
%% you may not use this file except in compliance with the License. | ||
|
@@ -50,6 +50,11 @@ | |
code_change/4 | ||
]). | ||
|
||
-define(NOREF, noref). | ||
|
||
%% time to suspend if system-limit reached (emfile/enfile) | ||
-define(SYS_LIMIT_SUSPEND_MS, 1000). | ||
|
||
-record(state, { | ||
proto :: atom(), | ||
listen_on :: esockd:listen_on(), | ||
|
@@ -59,8 +64,8 @@ | |
tune_fun :: esockd:sock_fun(), | ||
upgrade_funs :: [esockd:sock_fun()], | ||
conn_limiter :: undefined | esockd_generic_limiter:limiter(), | ||
conn_sup :: pid(), | ||
accept_ref :: term() | ||
conn_sup :: pid() | {function(), list()}, | ||
accept_ref = ?NOREF :: term() | ||
}). | ||
|
||
%% @doc Start an acceptor | ||
|
@@ -121,21 +126,31 @@ init([Proto, ListenOn, ConnSup, TuneFun, UpgradeFuns, Limiter, LSock]) -> | |
}, | ||
{next_event, internal, begin_waiting}}. | ||
|
||
handle_event(internal, begin_waiting, waiting, State = #state{lsock = LSock}) -> | ||
case prim_inet:async_accept(LSock, -1) of | ||
handle_event(internal, begin_waiting, waiting, #state{accept_ref = Ref}) when Ref =/= ?NOREF -> | ||
%% started waiting in suspending state | ||
keep_state_and_data; | ||
handle_event(internal, begin_waiting, waiting, State = #state{lsock = LSock, accept_ref = ?NOREF}) -> | ||
case async_accept(waiting, LSock) of | ||
{ok, Ref} -> | ||
{keep_state, State#state{accept_ref = Ref}}; | ||
{error, Reason} when | ||
Reason =:= emfile; | ||
Reason =:= enfile | ||
-> | ||
{next_state, suspending, State, {state_timeout, 1000, begin_waiting}}; | ||
{error, econnaborted} -> | ||
inc_stats(State, econnaborted), | ||
{next_state, waiting, State, {next_event, internal, begin_waiting}}; | ||
{error, closed} -> | ||
{stop, normal, State}; | ||
{error, Reason} -> | ||
error_logger:error_msg("~p async_accept error: ~p", [?MODULE, Reason]), | ||
{stop, Reason, State} | ||
end; | ||
handle_event(internal, accept_and_close, suspending, State = #state{lsock = LSock}) -> | ||
case async_accept(suspending, LSock) of | ||
{ok, Ref} -> | ||
{keep_state, State#state{accept_ref = Ref}}; | ||
{error, econnaborted} -> | ||
inc_stats(State, econnaborted), | ||
{keep_state_and_data, {next_event, internal, accept_and_close}}; | ||
{error, closed} -> | ||
{stop, normal, State}; | ||
{error, Reason} -> | ||
{stop, Reason, State} | ||
end; | ||
handle_event( | ||
|
@@ -144,25 +159,35 @@ handle_event( | |
waiting, | ||
State = #state{lsock = LSock, accept_ref = Ref} | ||
) -> | ||
{next_state, token_request, State, {next_event, internal, {token_request, Sock}}}; | ||
NextEvent = {next_event, internal, {token_request, Sock}}, | ||
{next_state, token_request, State#state{accept_ref = ?NOREF}, NextEvent}; | ||
handle_event( | ||
info, | ||
{inet_async, LSock, Ref, {ok, Sock}}, | ||
suspending, | ||
State = #state{lsock = LSock, accept_ref = Ref} | ||
) -> | ||
_ = close(Sock), | ||
inc_stats(State, rate_limited), | ||
NextEvent = {next_event, internal, accept_and_close}, | ||
{keep_state, State#state{accept_ref = ?NOREF}, NextEvent}; | ||
handle_event( | ||
internal, {token_request, Sock} = Content, token_request, State = #state{conn_limiter = Limiter} | ||
internal, {token_request, Sock}, token_request, State = #state{conn_limiter = Limiter} | ||
) -> | ||
case esockd_generic_limiter:consume(1, Limiter) of | ||
{ok, Limiter2} -> | ||
{next_state, accepting, State#state{conn_limiter = Limiter2}, | ||
{next_event, internal, {accept, Sock}}}; | ||
{pause, PauseTime, Limiter2} -> | ||
{next_state, suspending, State#state{conn_limiter = Limiter2}, | ||
{state_timeout, PauseTime, Content}} | ||
_ = close(Sock), | ||
inc_stats(State, rate_limited), | ||
enter_suspending(State#state{conn_limiter = Limiter2}, PauseTime) | ||
end; | ||
handle_event( | ||
internal, | ||
{accept, Sock}, | ||
accepting, | ||
State = #state{ | ||
proto = Proto, | ||
listen_on = ListenOn, | ||
sockmod = SockMod, | ||
tune_fun = TuneFun, | ||
upgrade_funs = UpgradeFuns, | ||
|
@@ -171,48 +196,50 @@ handle_event( | |
) -> | ||
%% make it look like gen_tcp:accept | ||
inet_db:register_socket(Sock, SockMod), | ||
|
||
%% Inc accepted stats. | ||
esockd_server:inc_stats({Proto, ListenOn}, accepted, 1), | ||
|
||
case eval_tune_socket_fun(TuneFun, Sock) of | ||
{ok, Sock} -> | ||
case esockd_connection_sup:start_connection(ConnSup, Sock, UpgradeFuns) of | ||
{ok, NewSock} -> | ||
case start_connection(ConnSup, NewSock, UpgradeFuns) of | ||
{ok, _Pid} -> | ||
%% Inc accepted stats. | ||
inc_stats(State, accepted), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why move to here? I think the old code looks fine. the socket is indeed accepted isn't it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there was previously no clear definition what now it means: accepted from the listener backlog, and successfully started an owner process. |
||
ok; | ||
{error, Reason} -> | ||
handle_accept_error(Reason, "Failed to start connection on ~s: ~p", State), | ||
close(Sock) | ||
handle_accept_error(Reason, "failed_to_start_connection_process", State), | ||
close(NewSock), | ||
inc_stats(State, Reason) | ||
end; | ||
{error, Reason} -> | ||
handle_accept_error(Reason, "Tune buffer failed on ~s: ~s", State), | ||
close(Sock) | ||
%% the socket became invalid before | ||
%% starting the owner process | ||
close(Sock), | ||
inc_stats(State, Reason) | ||
end, | ||
{next_state, waiting, State, {next_event, internal, begin_waiting}}; | ||
handle_event(state_timeout, {token_request, _} = Content, suspending, State) -> | ||
{next_state, token_request, State, {next_event, internal, Content}}; | ||
handle_event(state_timeout, begin_waiting, suspending, State) -> | ||
{next_state, waiting, State, {next_event, internal, begin_waiting}}; | ||
handle_event( | ||
info, | ||
{inet_async, LSock, Ref, {error, Reason}}, | ||
_, | ||
StateName, | ||
State = #state{lsock = LSock, accept_ref = Ref} | ||
) -> | ||
handle_socket_error(Reason, State); | ||
inc_stats(State, Reason), | ||
handle_socket_error(Reason, State#state{accept_ref = ?NOREF}, StateName); | ||
handle_event(Type, Content, StateName, _) -> | ||
error_logger:warning_msg( | ||
"Unhandled message, State:~p, Type:~p Content:~p", | ||
[StateName, Type, Content] | ||
), | ||
logger:log(warning, #{msg => "esockd_acceptor_unhandled_event", | ||
state_name => StateName, | ||
event_type => Type, | ||
event_content => Content}), | ||
keep_state_and_data. | ||
|
||
terminate(normal, _StateName, #state{}) -> | ||
terminate(normal, _StateName, _) -> | ||
ok; | ||
terminate(shutdown, _StateName, _) -> | ||
ok; | ||
terminate(shutdown, _StateName, #state{}) -> | ||
terminate({shutdown, _}, _StateName, _) -> | ||
ok; | ||
terminate(Reason, _StateName, #state{}) -> | ||
error_logger:error_msg("~p terminating due to ~p", [?MODULE, Reason]), | ||
logger:log(error, #{msg => "esockd_acceptor_terminating", reason => Reason}), | ||
ok. | ||
|
||
code_change(_OldVsn, StateName, State, _Extra) -> | ||
|
@@ -224,7 +251,8 @@ code_change(_OldVsn, StateName, State, _Extra) -> | |
|
||
close(Sock) -> | ||
try | ||
true = port_close(Sock), | ||
%% port-close leads to a TCP reset which cuts out the tcp graceful close overheads | ||
_ = port_close(Sock), | ||
receive {'EXIT', Sock, _} -> ok after 1 -> ok end | ||
catch | ||
error:_ -> ok | ||
|
@@ -233,36 +261,97 @@ close(Sock) -> | |
eval_tune_socket_fun({Fun, Args1}, Sock) -> | ||
apply(Fun, [Sock | Args1]). | ||
|
||
handle_accept_error(econnreset, _, _) -> | ||
ok; | ||
handle_accept_error(enotconn, _, _) -> | ||
ok; | ||
handle_accept_error(einval, _, _) -> | ||
ok; | ||
handle_accept_error(overloaded, _, #state{proto = Proto, listen_on = ListenOn}) -> | ||
esockd_server:inc_stats({Proto, ListenOn}, closed_overloaded, 1), | ||
handle_accept_error(overloaded, _, _) -> | ||
ok; | ||
handle_accept_error(Reason, Msg, #state{sockname = Sockname}) -> | ||
error_logger:error_msg(Msg, [esockd:format(Sockname), Reason]). | ||
logger:log(error, #{msg => Msg, | ||
listener => esockd:format(Sockname), | ||
cause => Reason}). | ||
|
||
handle_socket_error(closed, State) -> | ||
handle_socket_error(closed, State, _StateName) -> | ||
{stop, normal, State}; | ||
%% {error, econnaborted} -> accept | ||
%% {error, esslaccept} -> accept | ||
%% {error, esslaccept} -> accept | ||
handle_socket_error(Reason, State) when Reason =:= econnaborted; Reason =:= esslaccept -> | ||
handle_socket_error(Reason, State, suspending) when Reason =:= econnaborted -> | ||
{keep_state, State, {next_event, internal, accept_and_close}}; | ||
handle_socket_error(Reason, State, _StateName) when Reason =:= econnaborted -> | ||
{next_state, waiting, State, {next_event, internal, begin_waiting}}; | ||
%% emfile: The per-process limit of open file descriptors has been reached. | ||
%% enfile: The system limit on the total number of open files has been reached. | ||
%% enfile: The system limit on the total number of open files has been reached. | ||
handle_socket_error(Reason, State) when Reason =:= emfile; Reason =:= enfile -> | ||
error_logger:error_msg( | ||
"Accept error on ~s: ~s", | ||
[esockd:format(State#state.sockname), explain_posix(Reason)] | ||
), | ||
{next_state, suspending, State, {state_timeout, 1000, begin_waiting}}; | ||
handle_socket_error(Reason, State) -> | ||
handle_socket_error(Reason, State, suspending) when Reason =:= emfile; Reason =:= enfile -> | ||
log_system_limit(State, Reason), | ||
{keep_state, State, {next_event, internal, accept_and_close}}; | ||
handle_socket_error(Reason, State, _StateName) when Reason =:= emfile; Reason =:= enfile -> | ||
log_system_limit(State, Reason), | ||
enter_suspending(State, ?SYS_LIMIT_SUSPEND_MS); | ||
handle_socket_error(Reason, State, _StateName) -> | ||
{stop, Reason, State}. | ||
|
||
explain_posix(emfile) -> | ||
"EMFILE (Too many open files)"; | ||
explain_posix(enfile) -> | ||
"ENFILE (File table overflow)". | ||
|
||
log_system_limit(State, Reason) -> | ||
logger:log(critical, | ||
#{msg => "cannot_accept_more_connections", | ||
listener => esockd:format(State#state.sockname), | ||
cause => explain_posix(Reason)}). | ||
|
||
enter_suspending(State, Timeout) -> | ||
Actions = [{next_event, internal, accept_and_close}, | ||
{state_timeout, Timeout, begin_waiting}], | ||
{next_state, suspending, State, Actions}. | ||
|
||
inc_stats(#state{proto = Proto, listen_on = ListenOn}, Tag) -> | ||
Counter = counter(Tag), | ||
case Counter of | ||
closed_sys_limit -> | ||
%% slow down when system limit reached | ||
%% to aovid flooding the error logs | ||
%% also, it makes no sense to drain backlog in such condition | ||
timer:sleep(100); | ||
_ -> | ||
ok | ||
end, | ||
_ = esockd_server:inc_stats({Proto, ListenOn}, Counter, 1), | ||
ok. | ||
|
||
counter(accepted) -> accepted; | ||
counter(emfile) -> closed_sys_limit; | ||
counter(enfile) -> closed_sys_limit; | ||
counter(overloaded) -> closed_overloaded; | ||
counter(rate_limited) -> closed_rate_limited; | ||
counter(_) -> closed_other_reasons. | ||
|
||
start_connection(ConnSup, Sock, UpgradeFuns) when is_pid(ConnSup) -> | ||
esockd_connection_sup:start_connection(ConnSup, Sock, UpgradeFuns); | ||
start_connection({F, A}, Sock, UpgradeFuns) when is_function(F) -> | ||
%% only in tests so far | ||
apply(F, A ++ [Sock, UpgradeFuns]). | ||
|
||
%% To throttle system-limit error logs, | ||
%% if system limit reached, slow down the state machine by | ||
%% turning the immediate return to a delayed async message. | ||
%% The first delayed message should cause acceptor to enter suspending state. | ||
%% Then it should continue to accept 10 more sockets (which are all likely | ||
%% to result in emfile error anyway) during suspending state. | ||
async_accept(Currentstate, LSock) -> | ||
case prim_inet:async_accept(LSock, -1) of | ||
{error, Reason} when Reason =:= emfile orelse Reason =:= enfile -> | ||
Delay = case Currentstate of | ||
suspending -> | ||
?SYS_LIMIT_SUSPEND_MS div 10; | ||
_Waiting -> | ||
0 | ||
end, | ||
Ref = make_ref(), | ||
Msg = {inet_async, LSock, Ref, {error, Reason}}, | ||
_ = erlang:send_after(Delay, self(), Msg), | ||
{ok, Ref}; | ||
Other -> | ||
Other | ||
end. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why the socket is changed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the socket does not change most of the time, but the API is designed to allow it to change.