From 709d3bb8e3bfe0bf27e68fa871aa34a136b38ae1 Mon Sep 17 00:00:00 2001 From: firest Date: Mon, 8 Jul 2024 10:41:22 +0800 Subject: [PATCH] feat: add content-sensitive proxy behaviour for UDP --- include/esockd_proxy.hrl | 63 +++ src/esockd_sup.erl | 14 +- .../doc/proxy.plantuml} | 0 .../router.png => udp_proxy/doc/proxy.png} | Bin src/udp_proxy/esockd_udp_proxy.erl | 292 ++++++++++++++ src/udp_proxy/esockd_udp_proxy_connection.erl | 67 ++++ src/udp_proxy/esockd_udp_proxy_db.erl | 127 ++++++ src/udp_router/esockd_udp_router.erl | 361 ------------------ src/udp_router/esockd_udp_router_db.erl | 91 ----- src/udp_router/esockd_udp_router_monitor.erl | 217 ----------- src/udp_router/esockd_udp_router_sup.erl | 112 ------ 11 files changed, 559 insertions(+), 785 deletions(-) create mode 100644 include/esockd_proxy.hrl rename src/{udp_router/doc/router.plantuml => udp_proxy/doc/proxy.plantuml} (100%) rename src/{udp_router/doc/router.png => udp_proxy/doc/proxy.png} (100%) create mode 100644 src/udp_proxy/esockd_udp_proxy.erl create mode 100644 src/udp_proxy/esockd_udp_proxy_connection.erl create mode 100644 src/udp_proxy/esockd_udp_proxy_db.erl delete mode 100644 src/udp_router/esockd_udp_router.erl delete mode 100644 src/udp_router/esockd_udp_router_db.erl delete mode 100644 src/udp_router/esockd_udp_router_monitor.erl delete mode 100644 src/udp_router/esockd_udp_router_sup.erl diff --git a/include/esockd_proxy.hrl b/include/esockd_proxy.hrl new file mode 100644 index 0000000..4926957 --- /dev/null +++ b/include/esockd_proxy.hrl @@ -0,0 +1,63 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-ifndef(ESOCKD_PROXY_HRL). +-define(ESOCKD_PROXY_HRL, true). + +-define(SSL_TRANSPORT, esockd_transport). +-define(PROXY_TRANSPORT, esockd_udp_proxy). + +-type proxy_id() :: pid(). +-type socket_packet() :: binary(). +-type socket() :: inet:socket() | ssl:sslsocket(). + +-type transport() :: {udp, pid(), socket()} | ?SSL_TRANSPORT. +-type proxy_transport() :: {?PROXY_TRANSPORT, pid(), socket()}. +-type address() :: {inet:ip_address(), inet:port_number()}. +-type peer() :: socket() | address(). + +-type connection_module() :: atom(). +-type connection_state() :: term(). +-type connection_packet() :: term(). + +-type connection_id() :: + peer() + | integer() + | string() + | binary(). + +-type proxy_packet() :: + {?PROXY_TRANSPORT, proxy_id(), binary(), connection_packet()}. + +%% Routing information search results + +%% send raw socket packet +-type get_connection_id_result() :: + %% send decoded packet + {ok, connection_id(), connection_packet(), connection_state()} + | invalid. + +-type connection_options() :: #{ + esockd_proxy_opts := proxy_options(), + atom() => term() +}. + +-type proxy_options() :: #{ + connection_mod := connection_module(), + heartbeat => non_neg_integer() +}. + +-endif. diff --git a/src/esockd_sup.erl b/src/esockd_sup.erl index 5565a58..704993d 100644 --- a/src/esockd_sup.erl +++ b/src/esockd_sup.erl @@ -53,7 +53,7 @@ start_link() -> child_spec(Proto, ListenOn, Opts) when is_atom(Proto) -> ListenerRef = {Proto, ListenOn}, _ = esockd_server:set_listener_prop(ListenerRef, type, tcp), - _ = esockd_server:set_listener_prop(ListenerRef, options, Opts), + _ = esockd_server:set_listener_prop(ListenerRef, options, Opts), #{id => child_id(Proto, ListenOn), start => {esockd_listener_sup, start_link, [Proto, ListenOn]}, restart => transient, @@ -76,7 +76,7 @@ udp_child_spec(Proto, Port, Opts) when is_atom(Proto) -> dtls_child_spec(Proto, Port, Opts) when is_atom(Proto) -> ListenerRef = {Proto, Port}, _ = esockd_server:set_listener_prop(ListenerRef, type, dtls), - _ = esockd_server:set_listener_prop(ListenerRef, options, Opts), + _ = esockd_server:set_listener_prop(ListenerRef, options, Opts), #{id => child_id(Proto, Port), start => {esockd_listener_sup, start_link, [Proto, Port]}, restart => transient, @@ -191,5 +191,11 @@ init([]) -> type => worker, modules => [esockd_server] }, - {ok, {SupFlags, [Limiter, Server]}}. - + ProxyDB = #{id => esockd_udp_proxy_db, + start => {esockd_udp_proxy_db, start_link, []}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [esockd_udp_proxy_db] + }, + {ok, {SupFlags, [Limiter, Server, ProxyDB]}}. diff --git a/src/udp_router/doc/router.plantuml b/src/udp_proxy/doc/proxy.plantuml similarity index 100% rename from src/udp_router/doc/router.plantuml rename to src/udp_proxy/doc/proxy.plantuml diff --git a/src/udp_router/doc/router.png b/src/udp_proxy/doc/proxy.png similarity index 100% rename from src/udp_router/doc/router.png rename to src/udp_proxy/doc/proxy.png diff --git a/src/udp_proxy/esockd_udp_proxy.erl b/src/udp_proxy/esockd_udp_proxy.erl new file mode 100644 index 0000000..48bf15f --- /dev/null +++ b/src/udp_proxy/esockd_udp_proxy.erl @@ -0,0 +1,292 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(esockd_udp_proxy). + +-behaviour(gen_server). + +-include("include/esockd_proxy.hrl"). + +%% API +-export([start_link/3, send/2, close/1]). + +%% gen_server callbacks +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2 +]). + +-export_type([connection_options/0]). + +-define(NOW, erlang:system_time(second)). +-define(ERROR_MSG(Format, Args), + error_logger:error_msg("[~s]: " ++ Format, [?MODULE | Args]) +). +-define(DEF_HEARTBEAT, 60). + +-type timespan() :: non_neg_integer(). + +%%-------------------------------------------------------------------- +%% Definitions +%%-------------------------------------------------------------------- + +-type state() :: #{ + connection_mod := connection_module(), + connection_id := connection_id() | undefined, + connection_state := connection_state(), + connection_options := connection_options(), + %% last source's connection active time + last_time := pos_integer(), + transport := proxy_transport(), + peer := peer() +}. + +%%-------------------------------------------------------------------- +%%- API +%%-------------------------------------------------------------------- + +start_link(Transport, Peer, Opts) -> + gen_server:start_link(?MODULE, [Transport, Peer, Opts], []). + +-spec send(proxy_id(), binary()) -> ok. +send(ProxyId, Data) -> + gen_server:cast(ProxyId, {send, Data}). + +close(ProxyId) -> + case erlang:is_process_alive(ProxyId) of + true -> + gen_server:call(ProxyId, close); + _ -> + ok + end. + +%%-------------------------------------------------------------------- +%%- gen_server callbacks +%%-------------------------------------------------------------------- + +init([Transport, Peer, #{esockd_proxy_opts := Opts} = COpts]) -> + #{connection_mod := Mod} = Opts, + heartbeat(maps:get(heartbeat, Opts, ?DEF_HEARTBEAT)), + init_transport(Transport, Peer, #{ + last_time => ?NOW, + connection_mod => Mod, + connection_options => COpts, + connection_state => esockd_udp_proxy_connection:initialize(Mod, COpts), + connection_id => undefined + }). + +handle_call(close, _From, State) -> + {stop, {shutdown, close_transport}, ok, State}; +handle_call(Request, _From, State) -> + ?ERROR_MSG("Unexpected call: ~p", [Request]), + {reply, ok, State}. + +handle_cast({send, Data}, #{transport := Transport, peer := Peer} = State) -> + case send(Transport, Peer, Data) of + ok -> + {noreply, State}; + {error, Reason} -> + ?ERROR_MSG("Send failed, Reason: ~0p", [Reason]), + {stop, {sock_error, Reason}, State} + end; +handle_cast(Request, State) -> + ?ERROR_MSG("Unexpected cast: ~p", [Request]), + {noreply, State}. + +handle_info({datagram, _SockPid, Data}, State) -> + {noreply, handle_incoming(Data, State)}; +handle_info({ssl, _Socket, Data}, State) -> + {noreply, handle_incoming(Data, State)}; +handle_info({heartbeat, Span}, #{last_time := LastTime} = State) -> + Now = ?NOW, + case Now - LastTime > Span of + true -> + {stop, normal, State}; + _ -> + heartbeat(Span), + {noreply, State} + end; +handle_info({ssl_error, _Sock, Reason}, State) -> + {stop, Reason, socket_exit(State)}; +handle_info({ssl_closed, _Sock}, State) -> + {stop, ssl_closed, socket_exit(State)}; +handle_info( + {'DOWN', _, process, _, _Reason}, + State +) -> + {stop, {shutdown, connection_closed}, State}; +handle_info(Info, State) -> + ?ERROR_MSG("Unexpected info: ~p", [Info]), + {noreply, State}. + +terminate(Reason, #{transport := Transport} = State) -> + close_transport(Transport), + Clear = + case Reason of + close_transport -> + false; + connection_closed -> + false; + _ -> + true + end, + detach(State, Clear). + +%%-------------------------------------------------------------------- +%%- Internal functions +%%-------------------------------------------------------------------- +-spec handle_incoming(socket_packet(), state()) -> state(). +handle_incoming( + Data, + #{transport := Transport, peer := Peer, connection_mod := Mod, connection_state := CState} = + State +) -> + State2 = State#{last_time := ?NOW}, + case esockd_udp_proxy_connection:get_connection_id(Mod, Transport, Peer, CState, Data) of + {ok, CId, Packet, CState2} -> + dispatch(Mod, CId, Data, Packet, State2#{connection_state := CState2}); + invalid -> + ?ERROR_MSG("Can't get connection id, Transport:~0p, Peer:~0p, Mod:~0p", [ + Transport, Peer, Mod + ]), + State2 + end. + +-spec dispatch( + connection_module(), + esockd_transport:socket(), + connection_id(), + connection_packet(), + state() +) -> + state(). +dispatch( + Mod, + CId, + Data, + Packet, + #{ + transport := Transport, + peer := Peer, + connection_state := CState, + connection_options := Opts + } = + State +) -> + case lookup(Mod, Transport, Peer, CId, Opts) of + {ok, Pid} -> + esockd_udp_proxy_connection:dispatch( + Mod, Pid, CState, {Transport, Data, Packet} + ), + attach(CId, State); + {error, Reason} -> + ?ERROR_MSG("Dispatch failed, Reason:~0p", [Reason]), + State + end. + +-spec attach(connection_id(), state()) -> state(). +attach(CId, #{connection_mod := Mod, connection_id := undefined} = State) -> + esockd_udp_proxy_db:attach(Mod, CId), + State#{connection_id := CId}; +attach(CId, #{connection_id := OldId} = State) when CId =/= OldId -> + State2 = detach(State), + attach(CId, State2); +attach(_CId, State) -> + State. + +-spec detach(state()) -> state(). +detach(State) -> + detach(State, true). + +-spec detach(state(), boolean()) -> state(). +detach(#{connection_id := undefined} = State, _Clear) -> + State; +detach(#{connection_id := CId, connection_mod := Mod, connection_state := CState} = State, Clear) -> + case esockd_udp_proxy_db:detach(Mod, CId) of + {Clear, Pid} -> + case erlang:is_process_alive(Pid) of + true -> + esockd_udp_proxy_connection:close(Mod, Pid, CState); + _ -> + ok + end; + _ -> + ok + end, + State#{connection_id := undefined}. + +-spec socket_exit(state()) -> state(). +socket_exit(State) -> + detach(State). + +-spec heartbeat(timespan()) -> ok. +heartbeat(Span) -> + erlang:send_after(timer:seconds(Span), self(), {?FUNCTION_NAME, Span}), + ok. + +-spec lookup( + connection_module(), + proxy_transport(), + peer(), + connection_id(), + connection_options() +) -> {ok, pid()} | {error, Reason :: term()}. +lookup(Mod, Transport, Peer, CId, Opts) -> + case esockd_udp_proxy_db:lookup(Mod, CId) of + {ok, _} = Ok -> + Ok; + undefined -> + case esockd_udp_proxy_connection:create(Mod, Transport, Peer, Opts) of + {ok, Pid} -> + esockd_udp_proxy_db:insert(Mod, CId, Pid), + _ = erlang:monitor(process, Pid), + {ok, Pid}; + ignore -> + {error, ignore}; + Error -> + Error + end + end. + +-spec send(proxy_transport(), peer(), binary()) -> _. +send({?PROXY_TRANSPORT, _, Socket}, {IP, Port}, Data) when is_port(Socket) -> + gen_udp:send(Socket, IP, Port, Data); +send({?PROXY_TRANSPORT, _, Socket}, _Peer, Data) -> + esockd_transport:send(Socket, Data). + +init_transport({udp, _, Sock}, Peer, State) -> + {ok, State#{ + transport => {?PROXY_TRANSPORT, self(), Sock}, + peer => Peer + }}; +init_transport(esockd_transport, Sock, State) -> + case esockd_transport:wait(Sock) of + {ok, NSock} -> + {ok, State#{ + transport => {?PROXY_TRANSPORT, self(), NSock}, + peer => esockd_transport:peername(NSock) + }}; + Error -> + Error + end. + +close_transport({?PROXY_TRANSPORT, _, Sock}) when is_port(Sock) -> + ok; +close_transport({?PROXY_TRANSPORT, _, Sock}) -> + esockd_transport:fast_close(Sock). diff --git a/src/udp_proxy/esockd_udp_proxy_connection.erl b/src/udp_proxy/esockd_udp_proxy_connection.erl new file mode 100644 index 0000000..64e0d1e --- /dev/null +++ b/src/udp_proxy/esockd_udp_proxy_connection.erl @@ -0,0 +1,67 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(esockd_udp_proxy_connection). + +-include("include/esockd_proxy.hrl"). + +-export([ + initialize/2, + create/4, + get_connection_id/5, + dispatch/4, + close/3 +]). + +-export_type([connection_id/0, connection_module/0]). + +%%-------------------------------------------------------------------- +%%- Callbacks +%%-------------------------------------------------------------------- +-callback initialize(connection_options()) -> connection_state(). + +%% Create new connection +-callback create(transport(), peer(), connection_options()) -> gen_server:start_ret(). + +%% Find routing information +-callback get_connection_id( + transport(), peer(), connection_state(), socket_packet() +) -> + get_connection_id_result(). + +%% Dispacth message +-callback dispatch(pid(), connection_state(), proxy_packet()) -> ok. + +%% Close Connection +-callback close(pid(), connection_state()) -> ok. + +%%-------------------------------------------------------------------- +%%- API +%%-------------------------------------------------------------------- +initialize(Mod, Opts) -> + Mod:initialize(Opts). + +create(Mod, Transport, Peer, Opts) -> + Mod:create(Transport, Peer, Opts). + +get_connection_id(Mod, Transport, Peer, State, Data) -> + Mod:get_connection_id(Transport, Peer, State, Data). + +dispatch(Mod, Pid, State, Packet) -> + Mod:dispatch(Pid, State, Packet). + +close(Mod, Pid, State) -> + Mod:close(Pid, State). diff --git a/src/udp_proxy/esockd_udp_proxy_db.erl b/src/udp_proxy/esockd_udp_proxy_db.erl new file mode 100644 index 0000000..d8a5dba --- /dev/null +++ b/src/udp_proxy/esockd_udp_proxy_db.erl @@ -0,0 +1,127 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(esockd_udp_proxy_db). + +-behaviour(gen_server). + +-include("include/esockd_proxy.hrl"). + +%% API +-export([ + start_link/0, + insert/3, + attach/2, + detach/2, + lookup/2 +]). + +%% gen_server callbacks +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2 +]). + +-define(ID(Mod, CId), {Mod, CId}). + +-record(connection, { + id :: ?ID(connection_module(), connection_id()), + %% the connection pid + pid :: pid(), + %% Reference Counter + count :: non_neg_integer() +}). + +-define(TAB, esockd_udp_proxy_db). +-define(MINIMUM_VAL, -2147483647). + +%%-------------------------------------------------------------------- +%%- API +%%-------------------------------------------------------------------- +-spec start_link() -> {ok, pid()}. +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +-spec insert(connection_module(), connection_id(), pid()) -> boolean(). +insert(Mod, CId, Pid) -> + ets:insert_new(?TAB, #connection{ + id = ?ID(Mod, CId), + pid = Pid, + count = 0 + }). + +-spec attach(connection_module(), connection_id()) -> integer(). +attach(Mod, CId) -> + ets:update_counter(?TAB, ?ID(Mod, CId), {#connection.count, 1}). + +-spec detach(connection_module(), connection_id()) -> {Clear :: true, connection_state()} | false. +detach(Mod, CId) -> + Id = ?ID(Mod, CId), + RC = ets:update_counter(?TAB, Id, {#connection.count, -1, 0, ?MINIMUM_VAL}), + if + RC < 0 -> + case ets:lookup(?TAB, Id) of + [#connection{pid = Pid}] -> + ets:delete(?TAB, Id), + {true, Pid}; + _ -> + false + end; + true -> + false + end. + +-spec lookup(connection_module(), connection_id()) -> {ok, pid()} | undefined. +lookup(Mod, CId) -> + case ets:lookup(?TAB, ?ID(Mod, CId)) of + [#connection{pid = Pid}] -> + {ok, Pid}; + _ -> + undefined + end. + +%%-------------------------------------------------------------------- +%%- gen_server callbacks +%%-------------------------------------------------------------------- +init([]) -> + ?TAB = ets:new(?TAB, [ + set, + public, + named_table, + {keypos, #connection.id}, + {write_concurrency, true}, + {read_concurrency, true} + ]), + {ok, #{}}. + +handle_call(Req, _From, State) -> + error_logger:error_msg("Unexpected call: ~p", [Req]), + {reply, ignore, State}. + +handle_cast(Msg, State) -> + error_logger:error_msg("Unexpected cast: ~p~n", [Msg]), + {noreply, State}. + +handle_info(Info, State) -> + error_logger:error_msg("Unexpected info: ~p~n", [Info]), + {noreply, State}. + +terminate(_Reason, _State) -> + ets:delete(?TAB), + ok. diff --git a/src/udp_router/esockd_udp_router.erl b/src/udp_router/esockd_udp_router.erl deleted file mode 100644 index 46cf7a4..0000000 --- a/src/udp_router/esockd_udp_router.erl +++ /dev/null @@ -1,361 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(esockd_udp_router). - --behaviour(gen_server). - -%% API --export([start_link/2]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3, format_status/2]). - --export_type([sourceid/0, router_module/0, router_options/0]). - --define(NOW, erlang:system_time(second)). --define(SSL_TRANSPORT, esockd_transport). --define(ERROR_MSG(Format, Args), - error_logger:error_msg("[~s]: " ++ Format, [?MODULE | Args])). --define(DEF_HEARTBEAT, 60). - -%%-------------------------------------------------------------------- -%% Definitions -%%-------------------------------------------------------------------- --record(state, - {behaviour :: router_module(), - source :: socket(), - attachs :: #{sourceid => pos_integer()} %% last source's connection active time - }). - --type state() :: #state{}. - --type router_module() :: atom(). --type socket_packet() :: binary(). --type app_packet() :: term(). --type socket() :: inet:socket() | ssl:sslsocket(). - --type router_packet() :: {datagram, sourceinfo(), socket_packet()} - | {ssl, socket(), socket_packet()} - | {incoming, app_packet()}. - --type transport() :: {udp, pid(), socket()} | ?SSL_TRANSPORT. --type sourceinfo() :: {inet:ip_address(), inet:port_number()}. --type peer() :: socket() | sourceinfo(). --type sourceid() :: peer() - | integer() - | string() - | binary(). - -%% Routing information search results --type routing_find_result() :: sourceid() %% send raw socket packet - | {sourceid(), app_packet()} %% send decoded packet - | invalid. - --type timespan() :: non_neg_integer(). %% second, 0 = infinity --type router_options() :: #{behaviour := router_module(), - heartbeat => timespan()}. - -%%-------------------------------------------------------------------- -%%- Callbacks -%%-------------------------------------------------------------------- -%% Create new connection --callback create(transport(), peer()) -> {ok, pid()}. - -%% Find routing information --callback find(transport(), peer(), socket(), socket_packet()) -> - routing_find_result(). - -%% Dispacth message --callback dispatch(router_packet()) -> ok. - -%% Close Connection --callback close(pid()) -> ok. - -%%-------------------------------------------------------------------- -%%- API -%%-------------------------------------------------------------------- - -%%-------------------------------------------------------------------- -%% @doc -%% Starts the server -%% @end -%%-------------------------------------------------------------------- --spec start_link(socket(), router_options()) -> {ok, Pid :: pid()} | - {error, Error :: {already_started, pid()}} | - {error, Error :: term()} | - ignore. -start_link(Socket, Opts) -> - gen_server:start_link(?MODULE, [Socket, Opts], []). - -%%-------------------------------------------------------------------- -%%- gen_server callbacks -%%-------------------------------------------------------------------- - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Initializes the server -%% @end -%%-------------------------------------------------------------------- --spec init(Args :: term()) -> {ok, State :: term()} | - {ok, State :: term(), Timeout :: timeout()} | - {ok, State :: term(), hibernate} | - {stop, Reason :: term()} | - ignore. -init([Socket, #{behaviour := Behaviour} = Opts]) -> - parse_opts(Opts), - {ok, #state{source = Socket, - behaviour = Behaviour, - attachs = #{}}}. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Handling call messages -%% @end -%%-------------------------------------------------------------------- --spec handle_call(Request :: term(), From :: {pid(), term()}, State :: term()) -> - {reply, Reply :: term(), NewState :: term()} | - {reply, Reply :: term(), NewState :: term(), Timeout :: timeout()} | - {reply, Reply :: term(), NewState :: term(), hibernate} | - {noreply, NewState :: term()} | - {noreply, NewState :: term(), Timeout :: timeout()} | - {noreply, NewState :: term(), hibernate} | - {stop, Reason :: term(), Reply :: term(), NewState :: term()} | - {stop, Reason :: term(), NewState :: term()}. - -handle_call(Request, _From, State) -> - ?ERROR_MSG("Unexpected call: ~p", [Request]), - {reply, ok, State}. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Handling cast messages -%% @end -%%-------------------------------------------------------------------- --spec handle_cast(Request :: term(), State :: term()) -> - {noreply, NewState :: term()} | - {noreply, NewState :: term(), Timeout :: timeout()} | - {noreply, NewState :: term(), hibernate} | - {stop, Reason :: term(), NewState :: term()}. -handle_cast(Request, State) -> - ?ERROR_MSG("Unexpected cast: ~p", [Request]), - {noreply, State}. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Handling all non call/cast messages -%% @end -%%-------------------------------------------------------------------- --spec handle_info(Info :: timeout() | term(), State :: term()) -> - {noreply, NewState :: term()} | - {noreply, NewState :: term(), Timeout :: timeout()} | - {noreply, NewState :: term(), hibernate} | - {stop, Reason :: normal | term(), NewState :: term()}. -handle_info({udp, Socket, IP, Port, Data}, State) -> - Transport = {udp, self(), Socket}, - Peer = {IP, Port}, - {noreply, - handle_incoming(Transport, Peer, Socket, Data, State)}; - -handle_info({ssl, Socket, Data}, State) -> - {noreply, - handle_incoming(?SSL_TRANSPORT, Socket, Socket, Data, State)}; - -handle_info({heartbeat, Span}, #state{behaviour = Behaviour, attachs = Attachs} = State) -> - heartbeat(Span), - Now = ?NOW, - Self = self(), - Attachs2 = maps:fold(fun(Sid, LastActive, Acc) -> - if Now - LastActive > Span -> - esockd_udp_router_db:delete_in(Behaviour, Self, Sid), - Acc; - true -> - Acc#{Sid => LastActive} - end - end, - #{}, - Attachs), - State2 = State#state{attachs = Attachs2}, - case Attachs2 of - #{} -> - {stop, normal, State2}; - _ -> - {noreply, State2} - end; - -handle_info({ssl_error, _Sock, Reason}, State) -> - {stop, Reason, socket_exit(State)}; - -handle_info({ssl_closed, _Sock}, State) -> - {stop, ssl_closed, socket_exit(State)}; - -handle_info(Info, State) -> - ?ERROR_MSG("Unexpected info: ~p", [Info]), - {noreply, State}. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any -%% necessary cleaning up. When it returns, the gen_server terminates -%% with Reason. The return value is ignored. -%% @end -%%-------------------------------------------------------------------- --spec terminate(Reason :: normal | shutdown | {shutdown, term()} | term(), - State :: term()) -> any(). -terminate(_Reason, #state{behaviour = Behaviour, - attachs = Attachs}) -> - Self = self(), - _ = maps:fold(fun(Sid, _, _) -> - esockd_udp_router_db:delete_in(Behaviour, Self, Sid) - end, - ok, - Attachs), - ok. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Convert process state when code is changed -%% @end -%%-------------------------------------------------------------------- --spec code_change(OldVsn :: term() | {down, term()}, - State :: term(), - Extra :: term()) -> {ok, NewState :: term()} | - {error, Reason :: term()}. -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% This function is called for changing the form and appearance -%% of gen_server status when it is returned from sys:get_status/1,2 -%% or when it appears in termination error logs. -%% @end -%%-------------------------------------------------------------------- --spec format_status(Opt :: normal | terminate, - Status :: list()) -> Status :: term(). -format_status(_Opt, Status) -> - Status. - -%%-------------------------------------------------------------------- -%%- Internal functions -%%-------------------------------------------------------------------- --spec handle_incoming(transport(), peer(), socket(), socket_packet(), state()) -> state(). -handle_incoming(Transport, Peer, Socket, Data, #state{behaviour = Behaviour} = State) -> - case Behaviour:find(Transport, Peer, Socket, Data) of - {ok, Sid} -> - Tag = raw_packet_tag(Transport), - dispatch(Transport, Peer, Behaviour, Sid, {Tag, Peer, Data}, State); - {ok, Sid, AppPacket} -> - dispatch(Transport, Peer, Behaviour, Sid, {incoming, AppPacket}, State); - invalid -> - State - end. - --spec dispatch(transport(), peer(), router_module(), sourceid(), router_packet(), state()) -> state(). -dispatch(Transport, Peer, Behaviour, Sid, Packet, State) -> - {ok, Pid} = - case esockd_udp_router_db:lookup(Sid) of - {ok, Dest} -> - safe_attach(Transport, Peer, Sid, Dest, State); - undefined -> - Result = esockd_udp_router_monitor:create(Transport, Peer, Sid, Behaviour), - unsafe_attach(Sid, State), - Result - end, - dispatch(Behaviour, Sid, Pid, Packet, State). - --spec dispatch(router_module(), sourceid(), pid(), router_packet(), state()) -> state(). -dispatch(Behaviour, Sid, Pid, Packet, #state{attachs = Attachs} = State) -> - Behaviour:dispatch(Pid, Packet), - State#state{attachs = Attachs#{Sid => ?NOW}}. - --spec raw_packet_tag(transport()) -> datagram | ssl. -raw_packet_tag(?SSL_TRANSPORT) -> ssl; -raw_packet_tag(_) -> datagram. - --spec unsafe_attach(sourceid(), state()) -> state(). -unsafe_attach(Sid, #state{attachs = Attachs}) -> - case maps:is_key(Sid, Attachs) of - false -> - esockd_udp_router_db:insert_in(self(), Sid), - ok; - _ -> - ok - end. - --spec safe_attach(transport(), peer(), sourceid(), pid(), state()) -> {ok, pid(), state()}. -safe_attach(Transport, Peer, Sid, Dest, #state{behaviour = Behaviour, - attachs = Attachs}) -> - case maps:is_key(Sid, Attachs) of - true -> - {ok, Dest}; - false -> - Result = esockd_udp_router_db:insert_in(self(), Sid), - if Result > 0 -> - %% the connection are exists, but maybe not Dest - %%(some router close the old, and some create new, when this router doing attach) - case esockd_udp_router_db:lookup(Sid) of - {ok, RDest} -> - {ok, RDest}; - undefined -> - esockd_udp_router_monitor:create(Transport, Peer, Sid, Behaviour) - end; - true -> - %% when this router attach Sid's connection - %% ohter router closed this connection, so we need create new - %% or maybe we should just throw a exception? - esockd_udp_router_monitor:create(Transport, Peer, Sid, Behaviour) - end - end. - --spec socket_exit(state()) -> state(). -socket_exit(#state{behaviour = Behaviour, attachs = Attachs} = State) -> - _ = maps:fold(fun(Sid, _, _) -> - esockd_udp_router_db:delete_in(Behaviour, self(), Sid) - end, - ok, - Attachs), - State#state{attachs = #{}}. - --spec heartbeat(timespan()) -> ok. -heartbeat(Span) -> - erlang:send_after(self(), timer:seconds(Span), {?FUNCTION_NAME, Span}), - ok. - -parse_opts(Opts) -> - lists:foreach(fun(Opt) -> - parse_opt(Opt, Opts) - end, - [heartbeat]). - -parse_opt(heartbeat, #{heartbeat := Span}) -> - if Span -> - heartbeat(Span); - true -> - ok - end; - -parse_opt(heartbeat, _) -> - heartbeat(?DEF_HEARTBEAT). diff --git a/src/udp_router/esockd_udp_router_db.erl b/src/udp_router/esockd_udp_router_db.erl deleted file mode 100644 index 249078b..0000000 --- a/src/udp_router/esockd_udp_router_db.erl +++ /dev/null @@ -1,91 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(esockd_udp_router_db). - -%% API --export([ insert/2, insert_in/2, delete_in/3 - , delete_out/1, lookup/1, start/0]). - --record(in, { source :: pid() %% the router pid - , sid :: sourceid()}). - --record(out, { sid :: sourceid() - , destination :: pid() %% the connection pid - , count :: non_neg_integer()}). %% Reference Counter - --define(TAB, esockd_udp_router_db). --define(MINIMUM_VAL, -2147483647). - --type sourceid() :: esockd_udp_router:sourceid(). --type router_module() :: esockd_udp_router:router_mdoule(). - -%%-------------------------------------------------------------------- -%%- API -%%-------------------------------------------------------------------- -start() -> - ets:new(?TAB, [ set, public, named_table - , {keypos, #out.sid} - , {write_concurrency, true} - , {read_concurrency, true}]). - --spec insert(sourceid(), pid()) -> boolean(). -insert(Sid, Destination) -> - ets:insert_new(?TAB, #out{sid = Sid, - destination = Destination, - count = 1}). - --spec insert_in(pid(), sourceid()) -> integer(). -insert_in(Source, Sid) -> - ets:insert(?TAB, #in{source = Source, sid = Sid}), %% junk ? - ets:update_counter(?TAB, Sid, {#out.count, 1}, 1). - --spec delete_in(router_module(), pid(), sourceid()) -> ok. -delete_in(Behaviour, Source, Sid) -> - ets:delete(?TAB, Source), %% XXX maybe shodule this ? - RC = ets:update_counter(?TAB, Sid, {#out.count, -1, 0, ?MINIMUM_VAL}, 0), - if RC < 0 -> - Pid = ets:lookup_element(?TAB, Sid, #out.sid), - Behaviour:close(Pid), - ets:delete(?TAB, Sid), - ok; - true -> - ok - end. - --spec delete_out(sourceid()) -> ok. -delete_out(Sid) -> - ets:delete(?TAB, Sid), - ok. - --spec lookup(sourceid()) -> {ok, pid()} | undefined. -lookup(Sid) -> - case ets:lookup(?TAB, Sid) of - [#out{destination = Pid}] -> - {ok, Pid}; - _ -> - undefined - end. - -%%-------------------------------------------------------------------- -%% @doc -%% @spec -%% @end -%%-------------------------------------------------------------------- - -%%-------------------------------------------------------------------- -%%- Internal functions -%%-------------------------------------------------------------------- diff --git a/src/udp_router/esockd_udp_router_monitor.erl b/src/udp_router/esockd_udp_router_monitor.erl deleted file mode 100644 index 4d15360..0000000 --- a/src/udp_router/esockd_udp_router_monitor.erl +++ /dev/null @@ -1,217 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(esockd_udp_router_monitor). - --behaviour(gen_server). - -%% API --export([start_link/1, - create/4]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3, format_status/2]). - --record(state, {pool_id :: pool_id(), - monitors :: #{reference() => {router_module(), sourceid()}}}). - --type pool_id() :: non_neg_integer(). --type router_module() :: esockd_udp_router:router_module(). --type sourceid() :: esockd_udp_router:sourceid(). - --define(POOL, ?MODULE). --define(PNAME(Id), {?POOL, Id}). --define(ERROR_MSG(Format, Args), - error_logger:error_msg("[~s]: " ++ Format, [?MODULE | Args])). - -%%-------------------------------------------------------------------- -%%- API -%%-------------------------------------------------------------------- -create(Transport, Peer, Sid, Behaviour) -> - %% Use affinity to resolve potential concurrent conflicts - call(Sid, {?FUNCTION_NAME, Transport, Peer, Sid, Behaviour}). - -%%-------------------------------------------------------------------- -%% @doc -%% Starts the server -%% @end -%%-------------------------------------------------------------------- --spec start_link(pool_id()) -> {ok, Pid :: pid()} | - {error, Error :: {already_started, pid()}} | - {error, Error :: term()} | - ignore. -start_link(Id) -> - gen_server:start_link({local, proc_name(Id)}, ?MODULE, [Id], [{hibernate_afterr, 1000}]). - -%%-------------------------------------------------------------------- -%% gen_server callbacks -%%-------------------------------------------------------------------- - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Initializes the server -%% @end -%%-------------------------------------------------------------------- --spec init(Args :: term()) -> {ok, State :: term()} | - {ok, State :: term(), Timeout :: timeout()} | - {ok, State :: term(), hibernate} | - {stop, Reason :: term()} | - ignore. -init([PoolId]) -> - gproc_pool:connect_worker(?POOL, ?PNAME(PoolId)), - {ok, #state{pool_id = PoolId, - monitors = #{}}}. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Handling call messages -%% @end -%%-------------------------------------------------------------------- --spec handle_call(Request :: term(), From :: {pid(), term()}, State :: term()) -> - {reply, Reply :: term(), NewState :: term()} | - {reply, Reply :: term(), NewState :: term(), Timeout :: timeout()} | - {reply, Reply :: term(), NewState :: term(), hibernate} | - {noreply, NewState :: term()} | - {noreply, NewState :: term(), Timeout :: timeout()} | - {noreply, NewState :: term(), hibernate} | - {stop, Reason :: term(), Reply :: term(), NewState :: term()} | - {stop, Reason :: term(), NewState :: term()}. - -handle_call({create, Transport, Peer, Sid, Behaviour}, _From, - #state{monitors = Monitors} = State) -> - case esockd_udp_router_db:lookup(Sid) of - {ok, Pid} -> - {reply, {ok, Pid}, State}; - _ -> - {ok, Pid} = Behaviour:create(Transport, Peer), - Ref = erlang:monitor(process, Pid), - true = esockd_udp_router_db:insert(Sid, Pid), - {reply, {ok, Pid}, State#state{monitors = Monitors#{Ref => {Behaviour, Sid}}}} - end; - -handle_call(Request, _From, State) -> - ?ERROR_MSG("Unexpected call: ~p", [Request]), - {reply, ok, State}. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Handling cast messages -%% @end -%%-------------------------------------------------------------------- --spec handle_cast(Request :: term(), State :: term()) -> - {noreply, NewState :: term()} | - {noreply, NewState :: term(), Timeout :: timeout()} | - {noreply, NewState :: term(), hibernate} | - {stop, Reason :: term(), NewState :: term()}. -handle_cast(Request, State) -> - ?ERROR_MSG("Unexpected cast: ~p", [Request]), - {noreply, State}. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Handling all non call/cast messages -%% @end -%%-------------------------------------------------------------------- --spec handle_info(Info :: timeout() | term(), State :: term()) -> - {noreply, NewState :: term()} | - {noreply, NewState :: term(), Timeout :: timeout()} | - {noreply, NewState :: term(), hibernate} | - {stop, Reason :: normal | term(), NewState :: term()}. - -%% connection closed -handle_info({'DOWN', Ref, process, _, _}, - #state{monitors = Monitors} = State) -> - case maps:get(Ref, Monitors, undefined) of - undefined -> - {noreply, State}; - {_Behaviour, Sid} -> - Monitors2 = maps:remove(Ref, Monitors), - esockd_udp_router_db:delete_out(Sid), - {noreply, State#state{monitors = Monitors2}} - end; - -handle_info(Info, State) -> - ?ERROR_MSG("Unexpected info: ~p", [Info]), - {noreply, State}. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any -%% necessary cleaning up. When it returns, the gen_server terminates -%% with Reason. The return value is ignored. -%% @end -%%-------------------------------------------------------------------- --spec terminate(Reason :: normal | shutdown | {shutdown, term()} | term(), - State :: term()) -> any(). -terminate(_Reason, #state{pool_id = PoolId, - monitors = Monitors}) -> - gproc_pool:disconnect_worker(?POOL, ?PNAME(PoolId)), - _ = maps:fold(fun(Ref, {Behaviour, Sid}, _) -> - erlang:demonitor(Ref), - case esockd_udp_router_db:lookup(Sid) of - undefined -> - ok; - {ok, Pid} -> - Behaviour:close(Pid), - esockd_udp_router_db:delete_out(Sid) - end - end, ok, Monitors), - ok. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Convert process state when code is changed -%% @end -%%-------------------------------------------------------------------- --spec code_change(OldVsn :: term() | {down, term()}, - State :: term(), - Extra :: term()) -> {ok, NewState :: term()} | - {error, Reason :: term()}. -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% This function is called for changing the form and appearance -%% of gen_server status when it is returned from sys:get_status/1,2 -%% or when it appears in termination error logs. -%% @end -%%-------------------------------------------------------------------- --spec format_status(Opt :: normal | terminate, - Status :: list()) -> Status :: term(). -format_status(_Opt, Status) -> - Status. - -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- --spec proc_name(pool_id()) -> atom(). -proc_name(Id) -> - list_to_atom(lists:concat([?POOL, "_", Id])). - --spec call(sourceid(), term()) -> ok. -call(Sid, Req) -> - MPid = gproc_pool:pick_worker(?POOL, Sid), - gen_server:call(MPid, Req). diff --git a/src/udp_router/esockd_udp_router_sup.erl b/src/udp_router/esockd_udp_router_sup.erl deleted file mode 100644 index 4487ff0..0000000 --- a/src/udp_router/esockd_udp_router_sup.erl +++ /dev/null @@ -1,112 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(esockd_udp_router_sup). - --behaviour(supervisor). - -%% API --export([ start_link/0 - , ensure_start/0]). - -%% Supervisor callbacks --export([init/1]). - -%% shared with esockd_udp_router_monitor --define(POOL, esockd_udp_monitor). --define(WORKER_NAME(Id), {?POOL, Id}). - -%%%------------------------------------------------------------------- -%%- API functions -%%-------------------------------------------------------------------- -ensure_start() -> - case erlang:whereis(?MODULE) of - undefined -> - Spec = #{id => ?MODULE, - restart => transient, - shutdown => infinity, - type => supervisor, - modules => [?MODULE]}, - esockd_sup:start_child(Spec), - ok; - _ -> - ok - end. - -%%-------------------------------------------------------------------- -%% @doc -%% Starts the supervisor -%% @end -%%-------------------------------------------------------------------- --spec start_link() -> {ok, Pid :: pid()} | - {error, {already_started, Pid :: pid()}} | - {error, {shutdown, term()}} | - {error, term()} | - ignore. -start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). - -%%-------------------------------------------------------------------- -%%- Supervisor callbacks -%%-------------------------------------------------------------------- - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Whenever a supervisor is started using supervisor:start_link/[2,3], -%% this function is called by the new process to find out about -%% restart strategy, maximum restart intensity, and child -%% specifications. -%% @end -%%-------------------------------------------------------------------- --spec init(Args :: term()) -> - {ok, {SupFlags :: supervisor:sup_flags(), - [ChildSpec :: supervisor:child_spec()]}} | - ignore. -init([]) -> - Size = erlang:system_info(schedulers) * 2, - ok = ensure_pool(Size), - - SupFlags = #{strategy => one_for_one, - intensity => 10, - period => 3600}, - - AChilds = [begin - ensure_pool_worker(I), - #{id => ?WORKER_NAME(I), - start => {?POOL, start_link, [I]}, - restart => transient, - shutdown => 5000, - type => worker, - modules => [?POOL]} - end || I <- lists:seq(1, Size)], - - {ok, {SupFlags, [AChilds]}}. - -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- -ensure_pool(Size) -> - try gproc_pool:new(?POOL, hash, [{size, Size}]) - catch - error:exists -> ok - end. - -ensure_pool_worker(Id) -> - try gproc_pool:add_worker(?POOL, ?WORKER_NAME(Id), Id) - catch - error:exists -> ok - end.