diff --git a/rebar.config b/rebar.config index 3571af6..7b6978c 100644 --- a/rebar.config +++ b/rebar.config @@ -3,14 +3,12 @@ {require_min_otp_vsn, "22"}. {erl_opts, [ - {parse_transform, lager_transform} ]}. {deps, [ - lager, {mqtt_packet_map, "1.1.0"}, zotonic_stdlib, - {jsx, "2.10.0"}, + {jsx, "3.1.0"}, {sidejob, "2.1.0"}, {router, "1.0.4"} ]}. diff --git a/rebar.lock b/rebar.lock index 4347707..293bbdc 100644 --- a/rebar.lock +++ b/rebar.lock @@ -1,35 +1,31 @@ {"1.2.0", [{<<"cowlib">>,{pkg,<<"cowlib">>,<<"2.11.0">>},1}, - {<<"goldrush">>,{pkg,<<"goldrush">>,<<"0.1.9">>},1}, - {<<"jsx">>,{pkg,<<"jsx">>,<<"2.10.0">>},0}, - {<<"lager">>,{pkg,<<"lager">>,<<"3.9.2">>},0}, + {<<"jsx">>,{pkg,<<"jsx">>,<<"3.1.0">>},0}, {<<"mqtt_packet_map">>,{pkg,<<"mqtt_packet_map">>,<<"1.1.0">>},0}, {<<"router">>,{pkg,<<"router">>,<<"1.0.4">>},0}, {<<"sidejob">>,{pkg,<<"sidejob">>,<<"2.1.0">>},0}, {<<"ssl_verify_fun">>,{pkg,<<"ssl_verify_fun">>,<<"1.1.6">>},2}, - {<<"tls_certificate_check">>,{pkg,<<"tls_certificate_check">>,<<"1.6.0">>},1}, - {<<"zotonic_stdlib">>,{pkg,<<"zotonic_stdlib">>,<<"1.4.2">>},0}]}. + {<<"tls_certificate_check">>, + {pkg,<<"tls_certificate_check">>,<<"1.11.0">>}, + 1}, + {<<"zotonic_stdlib">>,{pkg,<<"zotonic_stdlib">>,<<"1.6.0">>},0}]}. [ {pkg_hash,[ {<<"cowlib">>, <<"0B9FF9C346629256C42EBE1EEB769A83C6CB771A6EE5960BD110AB0B9B872063">>}, - {<<"goldrush">>, <<"F06E5D5F1277DA5C413E84D5A2924174182FB108DABB39D5EC548B27424CD106">>}, - {<<"jsx">>, <<"77760560D6AC2B8C51FD4C980E9E19B784016AA70BE354CE746472C33BEB0B1C">>}, - {<<"lager">>, <<"4CAB289120EB24964E3886BD22323CB5FEFE4510C076992A23AD18CF85413D8C">>}, + {<<"jsx">>, <<"D12516BAA0BB23A59BB35DCCAF02A1BD08243FCBB9EFE24F2D9D056CCFF71268">>}, {<<"mqtt_packet_map">>, <<"4D44C0673B936491F9BE7AF4908092AB902C0ADFD168158DC73953A3A560C547">>}, {<<"router">>, <<"E9311B9D296B41C9484FC933EFD847F87E7DCBAD94DEE1566FB77570CA59EF2C">>}, {<<"sidejob">>, <<"5D6A7C9C620778CB1908E46B552D767DF2ED4D77070BB7B5B8773D4FF18D1D37">>}, {<<"ssl_verify_fun">>, <<"CF344F5692C82D2CD7554F5EC8FD961548D4FD09E7D22F5B62482E5AEAEBD4B0">>}, - {<<"tls_certificate_check">>, <<"661C287FC3F7823F6A299834664A5932AC9D369BEFEC2883172DE0623A4AEA12">>}, - {<<"zotonic_stdlib">>, <<"1D361AA0592998E7A5953E38E9440278E9BBF44ECC1E65F7F0F33C8A45759236">>}]}, + {<<"tls_certificate_check">>, <<"609DCD503F31170F0799DAC380EB0E086388CF918FC769AAA60DDD6BBF575218">>}, + {<<"zotonic_stdlib">>, <<"872E85BD2DC8A49A4DA255E4DF6C1B4DDC993E59024D02303BD6A5DF0868859B">>}]}, {pkg_hash_ext,[ {<<"cowlib">>, <<"2B3E9DA0B21C4565751A6D4901C20D1B4CC25CBB7FD50D91D2AB6DD287BC86A9">>}, - {<<"goldrush">>, <<"99CB4128CFFCB3227581E5D4D803D5413FA643F4EB96523F77D9E6937D994CEB">>}, - {<<"jsx">>, <<"9A83E3704807298016968DB506F9FAD0F027DE37546EB838B3AE1064C3A0AD62">>}, - {<<"lager">>, <<"7F904D9E87A8CB7E66156ED31768D1C8E26EBA1D54F4BC85B1AA4AC1F6340C28">>}, + {<<"jsx">>, <<"0C5CC8FDC11B53CC25CF65AC6705AD39E54ECC56D1C22E4ADB8F5A53FB9427F3">>}, {<<"mqtt_packet_map">>, <<"82BD12177EC927244ECC40093FB5A9BFAC317106168FD11F7D49FB3434C6D1DA">>}, {<<"router">>, <<"C9F939968F8D8BCBEB03087829693E5BEEA6BFFD14A9717200FB57522A141838">>}, {<<"sidejob">>, <<"6DC3DAC041C8C07C64401ECD22684730DA1497F5F14377B3CA9C5B2B9A135181">>}, {<<"ssl_verify_fun">>, <<"BDB0D2471F453C88FF3908E7686F86F9BE327D065CC1EC16FA4540197EA04680">>}, - {<<"tls_certificate_check">>, <<"42476E2E40717F09FBF1B559A8A6975B394E46C94BB5EBB043A18269A46401C0">>}, - {<<"zotonic_stdlib">>, <<"6654F4D93DB30AEBD1341255F057BE61138C1AB9F5EECCC0D235CFED21B46B24">>}]} + {<<"tls_certificate_check">>, <<"4AB962212EF7C87482619CB298E1FE06E047B57F0BD58CC417B3B299EB8D036E">>}, + {<<"zotonic_stdlib">>, <<"9139167866615F226C3915B6082B317892E276F3A9C78C5A99CD497FA589444E">>}]} ]. diff --git a/src/mqtt_sessions.app.src b/src/mqtt_sessions.app.src index 5e6658b..0acd5a3 100644 --- a/src/mqtt_sessions.app.src +++ b/src/mqtt_sessions.app.src @@ -15,6 +15,6 @@ ]}, {modules, []}, {maintainers, ["Zotonic Team"]}, - {licenses, ["Apache 2.0"]}, + {licenses, ["Apache-2.0"]}, {links, [ {"GitHub","https://github.com/zotonic/mqtt_sessions"} ]} ]}. diff --git a/src/mqtt_sessions_job.erl b/src/mqtt_sessions_job.erl index a5b6ddc..8ee35d5 100644 --- a/src/mqtt_sessions_job.erl +++ b/src/mqtt_sessions_job.erl @@ -1,8 +1,8 @@ %% @doc Sidejobs for handling topic subscriptions %% @author Marc Worrell -%% @copyright 2018 Marc Worrell +%% @copyright 2018-2022 Marc Worrell -%% Copyright 2018 Marc Worrell +%% Copyright 2018-2022 Marc Worrell %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -27,6 +27,7 @@ ]). +-include_lib("kernel/include/logger.hrl"). -include_lib("router/include/router.hrl"). -include_lib("../include/mqtt_sessions.hrl"). @@ -41,7 +42,7 @@ publish(Pool, Topic, Routes, Msg, PublisherContext) -> {ok, _JobPid} = OK -> OK; {error, overload} -> - lager:debug("MQTT sidejob overload, delaying job ~p ...", [ Topic ]), + ?LOG_DEBUG("MQTT sidejob overload, delaying job ~p ...", [ Topic ]), timer:sleep(100), sidejob_supervisor:spawn( ?MQTT_SESSIONS_JOBS, @@ -60,7 +61,7 @@ publish_retained(Pool, TopicFilter, Ms, Subscriber, Options, SubscriberContext) {ok, _JobPid} -> ok; {error, overload} -> - lager:debug("MQTT sidejob overload, delaying retained job ~p ...", [ TopicFilter ]), + ?LOG_DEBUG("MQTT sidejob overload, delaying retained job ~p ...", [ TopicFilter ]), timer:sleep(100), sidejob_supervisor:spawn( ?MQTT_SESSIONS_JOBS, diff --git a/src/mqtt_sessions_payload.erl b/src/mqtt_sessions_payload.erl index afc5c41..4c2f986 100644 --- a/src/mqtt_sessions_payload.erl +++ b/src/mqtt_sessions_payload.erl @@ -1,7 +1,7 @@ %% @author Marc Worrell -%% @copyright 2018 Marc Worrell +%% @copyright 2018-2022 Marc Worrell -%% Copyright 2018 Marc Worrell +%% Copyright 2018-2022 Marc Worrell %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ encode/1 ]). +-include_lib("kernel/include/logger.hrl"). -spec encode( mqtt_packet_map:mqtt_packet() ) -> mqtt_packet_map:mqtt_packet(). encode(#{ type := publish } = Msg) -> @@ -97,7 +98,7 @@ guess_mime({{Y,M,D},{H,I,S}}) when guess_mime(T) when is_tuple(T), is_atom(element(1, T)) -> <<"application/json">>; guess_mime(X) -> - lager:info("MQTT payload unknown type for guess_mime: ~p", [X]), + ?LOG_INFO("MQTT payload unknown type for guess_mime: ~p", [X]), <<"binary/octet-stream">>. diff --git a/src/mqtt_sessions_process.erl b/src/mqtt_sessions_process.erl index 34277fb..91144fa 100644 --- a/src/mqtt_sessions_process.erl +++ b/src/mqtt_sessions_process.erl @@ -1,9 +1,9 @@ %% @doc Process handling one single MQTT session. %% Transports attaches and detaches from this session. %% @author Marc Worrell -%% @copyright 2018-2020 Marc Worrell +%% @copyright 2018-2022 Marc Worrell -%% Copyright 2018-2020 Marc Worrell +%% Copyright 2018-2022 Marc Worrell %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -110,8 +110,9 @@ }). --include_lib("../include/mqtt_sessions.hrl"). +-include_lib("kernel/include/logger.hrl"). -include_lib("mqtt_packet_map/include/mqtt_packet_map.hrl"). +-include_lib("../include/mqtt_sessions.hrl"). -spec get_user_context( pid() ) -> {ok, term()} | {error, noproc}. @@ -236,11 +237,11 @@ handle_call({incoming_data, NewData, ConnectionPid}, _From, #state{ incoming_dat {reply, ok, StateRest#state{ keep_alive_counter = 3, incoming_data = Rest }}; {error, Reason} when is_atom(Reason) -> % illegal packet, disconnect and wait for new connection - lager:info("Error decoding incoming data: ~p", [ Reason ]), + ?LOG_INFO("Error decoding incoming data: ~p", [ Reason ]), {reply, {error, Reason}, force_disconnect(State)} end; handle_call({incoming_data, _NewData, ConnectionPid}, _From, State) -> - lager:debug("MQTT session incoming data from ~p, expected from ~p", [ConnectionPid, State#state.connection_pid]), + ?LOG_DEBUG("MQTT session incoming data from ~p, expected from ~p", [ConnectionPid, State#state.connection_pid]), {reply, {error, wrong_connection}, State}; handle_call(Cmd, _From, State) -> {stop, {unknown_cmd, Cmd}, State}. @@ -261,7 +262,7 @@ handle_info({mqtt_msg, #{ type := publish } = MqttMsg}, State) -> {noreply, State1}; handle_info({keep_alive, Ref}, #state{ keep_alive_counter = 0, keep_alive_ref = Ref } = State) -> - lager:debug("MQTT past keep_alive, disconnecting transport"), + ?LOG_DEBUG("MQTT past keep_alive, disconnecting transport"), {noreply, force_disconnect(State)}; handle_info({keep_alive, Ref}, #state{ keep_alive_counter = N, keep_alive_ref = Ref } = State) -> erlang:send_after(State#state.keep_alive * 500, self(), {keep_alive, Ref}), @@ -299,7 +300,7 @@ handle_info({'DOWN', _Mref, process, Pid, _Reason}, State) -> {noreply, State1}; handle_info(Info, State) -> - lager:info("Unknown info message ~p", [Info]), + ?LOG_INFO("Unknown info message ~p", [Info]), {noreply, State}. code_change(_Vsn, State, _Extra) -> @@ -350,12 +351,12 @@ handle_incoming(#{ type := connect } = Msg, Options, State) -> handle_incoming(#{ type := auth } = Msg, _Options, State) -> packet_connect_auth(Msg, State); handle_incoming(#{ type := Type }, _Options, #state{ connection_pid = undefined } = State) -> - lager:info("Dropping packet for MQTT session ~p ~s (~p) for receiving ~p when not connected.", + ?LOG_INFO("Dropping packet for MQTT session ~p ~s (~p) for receiving ~p when not connected.", [State#state.pool, State#state.client_id, self(), Type]), {error, not_connected}; handle_incoming(#{ type := Type }, _Options, #state{ is_session_present = false } = State) -> % Only AUTH and CONNECT before the CONNACK - lager:info("Killing MQTT session ~p ~s (~p) for receiving ~p when no session started.", + ?LOG_INFO("Killing MQTT session ~p ~s (~p) for receiving ~p when no session started.", [State#state.pool, State#state.client_id, self(), Type]), {stop, State}; handle_incoming(#{ type := publish } = Msg, _Options, State) -> @@ -388,7 +389,7 @@ handle_incoming(#{ type := disconnect } = Msg, _Options, State) -> packet_disconnect(Msg, State); handle_incoming(#{ type := Type }, _Options, State) -> - lager:info("MQTT dropping unhandled packet with type ~p", [Type]), + ?LOG_INFO("MQTT dropping unhandled packet with type ~p", [Type]), {ok, State}. % --------------------------------------------------------------------------------------- @@ -466,7 +467,7 @@ handle_connect_auth_1({ok, #{ type := connack, reason_code := ?MQTT_RC_SUCCESS } {ok, State3}; handle_connect_auth_1({ok, #{ type := connack, reason_code := ReasonCode } = ConnAck, _UserContext1}, _Msg, StateIfAccept, _State) -> _ = reply_connack(ConnAck, StateIfAccept), - lager:debug("MQTT connect/auth refused (~p): ~p", [ReasonCode, ConnAck]), + ?LOG_DEBUG("MQTT connect/auth refused (~p): ~p", [ReasonCode, ConnAck]), {error, connection_refused}; handle_connect_auth_1({ok, #{ type := auth } = Auth, UserContext1}, _Msg, StateIfAccept, _State) -> State1 = StateIfAccept#state{ @@ -477,7 +478,7 @@ handle_connect_auth_1({ok, #{ type := auth } = Auth, UserContext1}, _Msg, StateI State2#state.session_expiry_interval, State2#state.user_context), {ok, State2}; handle_connect_auth_1({error, Reason}, Msg, _StateIfAccept, _State) -> - lager:info("MQTT connect/auth refused (~p): ~p", [Reason, Msg]), + ?LOG_INFO("MQTT connect/auth refused (~p): ~p", [Reason, Msg]), {error, connection_refused}. @@ -612,7 +613,7 @@ packet_pubrel(#{ packet_id := PacketId, reason_code := ?MQTT_RC_SUCCESS }, #stat end; packet_pubrel(#{ packet_id := PacketId, reason_code := RC }, #state{ awaiting_rel = WaitRel } = State) -> % Error server/client out of sync - remove the wait-rel for this packet_id - lager:info("PUBREL with reason ~p for packet ~p", + ?LOG_INFO("PUBREL with reason ~p for packet ~p", [ RC, PacketId ]), WaitRel1 = maps:remove(PacketId, WaitRel), {ok, State#state{ awaiting_rel = WaitRel1 }}. @@ -624,7 +625,7 @@ packet_puback(#{ packet_id := PacketId }, #state{ awaiting_ack = WaitAck } = Sta {ok, {_MsgNr, puback, _Msg}} -> maps:remove(PacketId, WaitAck); {ok, {_MsgNr, Wait, Msg}} -> - lager:warning("PUBACK for message ~p waiting for ~p. Message: ~p", + ?LOG_WARNING("PUBACK for message ~p waiting for ~p. Message: ~p", [ PacketId, Wait, Msg ]), maps:remove(PacketId, WaitAck); error -> @@ -640,7 +641,7 @@ packet_pubrec(#{ packet_id := PacketId, reason_code := RC }, #state{ awaiting_ac {ok, {_MsgNr, pubcomp, _Msg}} -> maps:remove(PacketId, WaitAck); {ok, {_MsgNr, Wait, Msg}} -> - lager:warning("PUBREC for message ~p waiting for ~p. Message: ~p", + ?LOG_WARNING("PUBREC for message ~p waiting for ~p. Message: ~p", [ PacketId, Wait, Msg ]), maps:remove(PacketId, WaitAck); error -> @@ -654,7 +655,7 @@ packet_pubrec(#{ packet_id := PacketId }, #state{ awaiting_ack = WaitAck } = Sta {ok, {_MsgNr, pubcomp, _Msg}} -> {WaitAck, ?MQTT_RC_SUCCESS}; {ok, {_MsgNr, Wait, Msg}} -> - lager:warning("PUBREC for message ~p waiting for ~p. Message: ~p", + ?LOG_WARNING("PUBREC for message ~p waiting for ~p. Message: ~p", [ PacketId, Wait, Msg ]), {maps:remove(PacketId, WaitAck), ?MQTT_RC_PACKET_ID_NOT_FOUND}; error -> @@ -674,7 +675,7 @@ packet_pubcomp(#{ packet_id := PacketId }, #state{ awaiting_ack = WaitAck } = St {ok, {_MsgNr, pubcomp, _Msg}} -> maps:remove(PacketId, WaitAck); {ok, {_MsgNr, Wait, Msg}} -> - lager:warning("PUBREC for message ~p waiting for ~p. Message: ~p", + ?LOG_WARNING("PUBREC for message ~p waiting for ~p. Message: ~p", [ PacketId, Wait, Msg ]), maps:remove(PacketId, WaitAck); error ->