diff --git a/include/esockd_proxy.hrl b/include/esockd_proxy.hrl new file mode 100644 index 0000000..4926957 --- /dev/null +++ b/include/esockd_proxy.hrl @@ -0,0 +1,63 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-ifndef(ESOCKD_PROXY_HRL). +-define(ESOCKD_PROXY_HRL, true). + +-define(SSL_TRANSPORT, esockd_transport). +-define(PROXY_TRANSPORT, esockd_udp_proxy). + +-type proxy_id() :: pid(). +-type socket_packet() :: binary(). +-type socket() :: inet:socket() | ssl:sslsocket(). + +-type transport() :: {udp, pid(), socket()} | ?SSL_TRANSPORT. +-type proxy_transport() :: {?PROXY_TRANSPORT, pid(), socket()}. +-type address() :: {inet:ip_address(), inet:port_number()}. +-type peer() :: socket() | address(). + +-type connection_module() :: atom(). +-type connection_state() :: term(). +-type connection_packet() :: term(). + +-type connection_id() :: + peer() + | integer() + | string() + | binary(). + +-type proxy_packet() :: + {?PROXY_TRANSPORT, proxy_id(), binary(), connection_packet()}. + +%% Routing information search results + +%% send raw socket packet +-type get_connection_id_result() :: + %% send decoded packet + {ok, connection_id(), connection_packet(), connection_state()} + | invalid. + +-type connection_options() :: #{ + esockd_proxy_opts := proxy_options(), + atom() => term() +}. + +-type proxy_options() :: #{ + connection_mod := connection_module(), + heartbeat => non_neg_integer() +}. + +-endif. diff --git a/rebar.config b/rebar.config index 1b38b51..202b9f0 100644 --- a/rebar.config +++ b/rebar.config @@ -18,6 +18,10 @@ {src_dirs, ["src"]}. +{deps, + [ {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} + ]}. + {cover_enabled, true}. {cover_opts, [verbose]}. {cover_export_enabled, true}. diff --git a/src/esockd_sup.erl b/src/esockd_sup.erl index 5565a58..704993d 100644 --- a/src/esockd_sup.erl +++ b/src/esockd_sup.erl @@ -53,7 +53,7 @@ start_link() -> child_spec(Proto, ListenOn, Opts) when is_atom(Proto) -> ListenerRef = {Proto, ListenOn}, _ = esockd_server:set_listener_prop(ListenerRef, type, tcp), - _ = esockd_server:set_listener_prop(ListenerRef, options, Opts), + _ = esockd_server:set_listener_prop(ListenerRef, options, Opts), #{id => child_id(Proto, ListenOn), start => {esockd_listener_sup, start_link, [Proto, ListenOn]}, restart => transient, @@ -76,7 +76,7 @@ udp_child_spec(Proto, Port, Opts) when is_atom(Proto) -> dtls_child_spec(Proto, Port, Opts) when is_atom(Proto) -> ListenerRef = {Proto, Port}, _ = esockd_server:set_listener_prop(ListenerRef, type, dtls), - _ = esockd_server:set_listener_prop(ListenerRef, options, Opts), + _ = esockd_server:set_listener_prop(ListenerRef, options, Opts), #{id => child_id(Proto, Port), start => {esockd_listener_sup, start_link, [Proto, Port]}, restart => transient, @@ -191,5 +191,11 @@ init([]) -> type => worker, modules => [esockd_server] }, - {ok, {SupFlags, [Limiter, Server]}}. - + ProxyDB = #{id => esockd_udp_proxy_db, + start => {esockd_udp_proxy_db, start_link, []}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [esockd_udp_proxy_db] + }, + {ok, {SupFlags, [Limiter, Server, ProxyDB]}}. diff --git a/src/udp_proxy/doc/proxy.plantuml b/src/udp_proxy/doc/proxy.plantuml new file mode 100644 index 0000000..8062e05 --- /dev/null +++ b/src/udp_proxy/doc/proxy.plantuml @@ -0,0 +1,43 @@ +@startuml +skinparam dpi 300 +left to right direction + +frame client { + card clinet1 + card client2 + card client3 +} + +frame proxy { + label "proxy converts the udp connection\nto long connection in the application" + agent proxy1 + agent proxy2 + agent proxy3 +} + +frame connection { + card connection1 + card connection2 + card connection3 +} + +clinet1 .[#red].> proxy1 +client2 .[#blue].> proxy1 +client3 ..> proxy2 +client3 ..> proxy3 + +proxy1 -[#red]-> connection1 : client1 +proxy1 -[#blue]-> connection2 : client2 +proxy2 --> connection3 : client3 +proxy3 --> connection3 : client3 + +note bottom of proxy1 +different clients use the same UDP channel,e.g NAT, the proxy will first disconnect the existing connection and then connect to the new one +endnote + + +note right of proxy +proxy2 and proxy3 are multiple channels used by a client,e.g LB +endnote + +@enduml diff --git a/src/udp_proxy/doc/proxy.png b/src/udp_proxy/doc/proxy.png new file mode 100644 index 0000000..608a15d Binary files /dev/null and b/src/udp_proxy/doc/proxy.png differ diff --git a/src/udp_proxy/esockd_udp_proxy.erl b/src/udp_proxy/esockd_udp_proxy.erl new file mode 100644 index 0000000..48bf15f --- /dev/null +++ b/src/udp_proxy/esockd_udp_proxy.erl @@ -0,0 +1,292 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(esockd_udp_proxy). + +-behaviour(gen_server). + +-include("include/esockd_proxy.hrl"). + +%% API +-export([start_link/3, send/2, close/1]). + +%% gen_server callbacks +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2 +]). + +-export_type([connection_options/0]). + +-define(NOW, erlang:system_time(second)). +-define(ERROR_MSG(Format, Args), + error_logger:error_msg("[~s]: " ++ Format, [?MODULE | Args]) +). +-define(DEF_HEARTBEAT, 60). + +-type timespan() :: non_neg_integer(). + +%%-------------------------------------------------------------------- +%% Definitions +%%-------------------------------------------------------------------- + +-type state() :: #{ + connection_mod := connection_module(), + connection_id := connection_id() | undefined, + connection_state := connection_state(), + connection_options := connection_options(), + %% last source's connection active time + last_time := pos_integer(), + transport := proxy_transport(), + peer := peer() +}. + +%%-------------------------------------------------------------------- +%%- API +%%-------------------------------------------------------------------- + +start_link(Transport, Peer, Opts) -> + gen_server:start_link(?MODULE, [Transport, Peer, Opts], []). + +-spec send(proxy_id(), binary()) -> ok. +send(ProxyId, Data) -> + gen_server:cast(ProxyId, {send, Data}). + +close(ProxyId) -> + case erlang:is_process_alive(ProxyId) of + true -> + gen_server:call(ProxyId, close); + _ -> + ok + end. + +%%-------------------------------------------------------------------- +%%- gen_server callbacks +%%-------------------------------------------------------------------- + +init([Transport, Peer, #{esockd_proxy_opts := Opts} = COpts]) -> + #{connection_mod := Mod} = Opts, + heartbeat(maps:get(heartbeat, Opts, ?DEF_HEARTBEAT)), + init_transport(Transport, Peer, #{ + last_time => ?NOW, + connection_mod => Mod, + connection_options => COpts, + connection_state => esockd_udp_proxy_connection:initialize(Mod, COpts), + connection_id => undefined + }). + +handle_call(close, _From, State) -> + {stop, {shutdown, close_transport}, ok, State}; +handle_call(Request, _From, State) -> + ?ERROR_MSG("Unexpected call: ~p", [Request]), + {reply, ok, State}. + +handle_cast({send, Data}, #{transport := Transport, peer := Peer} = State) -> + case send(Transport, Peer, Data) of + ok -> + {noreply, State}; + {error, Reason} -> + ?ERROR_MSG("Send failed, Reason: ~0p", [Reason]), + {stop, {sock_error, Reason}, State} + end; +handle_cast(Request, State) -> + ?ERROR_MSG("Unexpected cast: ~p", [Request]), + {noreply, State}. + +handle_info({datagram, _SockPid, Data}, State) -> + {noreply, handle_incoming(Data, State)}; +handle_info({ssl, _Socket, Data}, State) -> + {noreply, handle_incoming(Data, State)}; +handle_info({heartbeat, Span}, #{last_time := LastTime} = State) -> + Now = ?NOW, + case Now - LastTime > Span of + true -> + {stop, normal, State}; + _ -> + heartbeat(Span), + {noreply, State} + end; +handle_info({ssl_error, _Sock, Reason}, State) -> + {stop, Reason, socket_exit(State)}; +handle_info({ssl_closed, _Sock}, State) -> + {stop, ssl_closed, socket_exit(State)}; +handle_info( + {'DOWN', _, process, _, _Reason}, + State +) -> + {stop, {shutdown, connection_closed}, State}; +handle_info(Info, State) -> + ?ERROR_MSG("Unexpected info: ~p", [Info]), + {noreply, State}. + +terminate(Reason, #{transport := Transport} = State) -> + close_transport(Transport), + Clear = + case Reason of + close_transport -> + false; + connection_closed -> + false; + _ -> + true + end, + detach(State, Clear). + +%%-------------------------------------------------------------------- +%%- Internal functions +%%-------------------------------------------------------------------- +-spec handle_incoming(socket_packet(), state()) -> state(). +handle_incoming( + Data, + #{transport := Transport, peer := Peer, connection_mod := Mod, connection_state := CState} = + State +) -> + State2 = State#{last_time := ?NOW}, + case esockd_udp_proxy_connection:get_connection_id(Mod, Transport, Peer, CState, Data) of + {ok, CId, Packet, CState2} -> + dispatch(Mod, CId, Data, Packet, State2#{connection_state := CState2}); + invalid -> + ?ERROR_MSG("Can't get connection id, Transport:~0p, Peer:~0p, Mod:~0p", [ + Transport, Peer, Mod + ]), + State2 + end. + +-spec dispatch( + connection_module(), + esockd_transport:socket(), + connection_id(), + connection_packet(), + state() +) -> + state(). +dispatch( + Mod, + CId, + Data, + Packet, + #{ + transport := Transport, + peer := Peer, + connection_state := CState, + connection_options := Opts + } = + State +) -> + case lookup(Mod, Transport, Peer, CId, Opts) of + {ok, Pid} -> + esockd_udp_proxy_connection:dispatch( + Mod, Pid, CState, {Transport, Data, Packet} + ), + attach(CId, State); + {error, Reason} -> + ?ERROR_MSG("Dispatch failed, Reason:~0p", [Reason]), + State + end. + +-spec attach(connection_id(), state()) -> state(). +attach(CId, #{connection_mod := Mod, connection_id := undefined} = State) -> + esockd_udp_proxy_db:attach(Mod, CId), + State#{connection_id := CId}; +attach(CId, #{connection_id := OldId} = State) when CId =/= OldId -> + State2 = detach(State), + attach(CId, State2); +attach(_CId, State) -> + State. + +-spec detach(state()) -> state(). +detach(State) -> + detach(State, true). + +-spec detach(state(), boolean()) -> state(). +detach(#{connection_id := undefined} = State, _Clear) -> + State; +detach(#{connection_id := CId, connection_mod := Mod, connection_state := CState} = State, Clear) -> + case esockd_udp_proxy_db:detach(Mod, CId) of + {Clear, Pid} -> + case erlang:is_process_alive(Pid) of + true -> + esockd_udp_proxy_connection:close(Mod, Pid, CState); + _ -> + ok + end; + _ -> + ok + end, + State#{connection_id := undefined}. + +-spec socket_exit(state()) -> state(). +socket_exit(State) -> + detach(State). + +-spec heartbeat(timespan()) -> ok. +heartbeat(Span) -> + erlang:send_after(timer:seconds(Span), self(), {?FUNCTION_NAME, Span}), + ok. + +-spec lookup( + connection_module(), + proxy_transport(), + peer(), + connection_id(), + connection_options() +) -> {ok, pid()} | {error, Reason :: term()}. +lookup(Mod, Transport, Peer, CId, Opts) -> + case esockd_udp_proxy_db:lookup(Mod, CId) of + {ok, _} = Ok -> + Ok; + undefined -> + case esockd_udp_proxy_connection:create(Mod, Transport, Peer, Opts) of + {ok, Pid} -> + esockd_udp_proxy_db:insert(Mod, CId, Pid), + _ = erlang:monitor(process, Pid), + {ok, Pid}; + ignore -> + {error, ignore}; + Error -> + Error + end + end. + +-spec send(proxy_transport(), peer(), binary()) -> _. +send({?PROXY_TRANSPORT, _, Socket}, {IP, Port}, Data) when is_port(Socket) -> + gen_udp:send(Socket, IP, Port, Data); +send({?PROXY_TRANSPORT, _, Socket}, _Peer, Data) -> + esockd_transport:send(Socket, Data). + +init_transport({udp, _, Sock}, Peer, State) -> + {ok, State#{ + transport => {?PROXY_TRANSPORT, self(), Sock}, + peer => Peer + }}; +init_transport(esockd_transport, Sock, State) -> + case esockd_transport:wait(Sock) of + {ok, NSock} -> + {ok, State#{ + transport => {?PROXY_TRANSPORT, self(), NSock}, + peer => esockd_transport:peername(NSock) + }}; + Error -> + Error + end. + +close_transport({?PROXY_TRANSPORT, _, Sock}) when is_port(Sock) -> + ok; +close_transport({?PROXY_TRANSPORT, _, Sock}) -> + esockd_transport:fast_close(Sock). diff --git a/src/udp_proxy/esockd_udp_proxy_connection.erl b/src/udp_proxy/esockd_udp_proxy_connection.erl new file mode 100644 index 0000000..e53956b --- /dev/null +++ b/src/udp_proxy/esockd_udp_proxy_connection.erl @@ -0,0 +1,67 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(esockd_udp_proxy_connection). + +-include("include/esockd_proxy.hrl"). + +-export([ + initialize/2, + create/4, + get_connection_id/5, + dispatch/4, + close/3 +]). + +-export_type([connection_id/0, connection_module/0]). + +%%-------------------------------------------------------------------- +%%- Callbacks +%%-------------------------------------------------------------------- +-callback initialize(connection_options()) -> connection_state(). + +%% Create new connection +-callback create(proxy_transport(), peer(), connection_options()) -> gen_server:start_ret(). + +%% Find routing information +-callback get_connection_id( + proxy_transport(), peer(), connection_state(), socket_packet() +) -> + get_connection_id_result(). + +%% Dispacth message +-callback dispatch(pid(), connection_state(), proxy_packet()) -> ok. + +%% Close Connection +-callback close(pid(), connection_state()) -> ok. + +%%-------------------------------------------------------------------- +%%- API +%%-------------------------------------------------------------------- +initialize(Mod, Opts) -> + Mod:initialize(Opts). + +create(Mod, Transport, Peer, Opts) -> + Mod:create(Transport, Peer, Opts). + +get_connection_id(Mod, Transport, Peer, State, Data) -> + Mod:get_connection_id(Transport, Peer, State, Data). + +dispatch(Mod, Pid, State, Packet) -> + Mod:dispatch(Pid, State, Packet). + +close(Mod, Pid, State) -> + Mod:close(Pid, State). diff --git a/src/udp_proxy/esockd_udp_proxy_db.erl b/src/udp_proxy/esockd_udp_proxy_db.erl new file mode 100644 index 0000000..d8a5dba --- /dev/null +++ b/src/udp_proxy/esockd_udp_proxy_db.erl @@ -0,0 +1,127 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(esockd_udp_proxy_db). + +-behaviour(gen_server). + +-include("include/esockd_proxy.hrl"). + +%% API +-export([ + start_link/0, + insert/3, + attach/2, + detach/2, + lookup/2 +]). + +%% gen_server callbacks +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2 +]). + +-define(ID(Mod, CId), {Mod, CId}). + +-record(connection, { + id :: ?ID(connection_module(), connection_id()), + %% the connection pid + pid :: pid(), + %% Reference Counter + count :: non_neg_integer() +}). + +-define(TAB, esockd_udp_proxy_db). +-define(MINIMUM_VAL, -2147483647). + +%%-------------------------------------------------------------------- +%%- API +%%-------------------------------------------------------------------- +-spec start_link() -> {ok, pid()}. +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +-spec insert(connection_module(), connection_id(), pid()) -> boolean(). +insert(Mod, CId, Pid) -> + ets:insert_new(?TAB, #connection{ + id = ?ID(Mod, CId), + pid = Pid, + count = 0 + }). + +-spec attach(connection_module(), connection_id()) -> integer(). +attach(Mod, CId) -> + ets:update_counter(?TAB, ?ID(Mod, CId), {#connection.count, 1}). + +-spec detach(connection_module(), connection_id()) -> {Clear :: true, connection_state()} | false. +detach(Mod, CId) -> + Id = ?ID(Mod, CId), + RC = ets:update_counter(?TAB, Id, {#connection.count, -1, 0, ?MINIMUM_VAL}), + if + RC < 0 -> + case ets:lookup(?TAB, Id) of + [#connection{pid = Pid}] -> + ets:delete(?TAB, Id), + {true, Pid}; + _ -> + false + end; + true -> + false + end. + +-spec lookup(connection_module(), connection_id()) -> {ok, pid()} | undefined. +lookup(Mod, CId) -> + case ets:lookup(?TAB, ?ID(Mod, CId)) of + [#connection{pid = Pid}] -> + {ok, Pid}; + _ -> + undefined + end. + +%%-------------------------------------------------------------------- +%%- gen_server callbacks +%%-------------------------------------------------------------------- +init([]) -> + ?TAB = ets:new(?TAB, [ + set, + public, + named_table, + {keypos, #connection.id}, + {write_concurrency, true}, + {read_concurrency, true} + ]), + {ok, #{}}. + +handle_call(Req, _From, State) -> + error_logger:error_msg("Unexpected call: ~p", [Req]), + {reply, ignore, State}. + +handle_cast(Msg, State) -> + error_logger:error_msg("Unexpected cast: ~p~n", [Msg]), + {noreply, State}. + +handle_info(Info, State) -> + error_logger:error_msg("Unexpected info: ~p~n", [Info]), + {noreply, State}. + +terminate(_Reason, _State) -> + ets:delete(?TAB), + ok.