-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
6 changed files
with
40 additions
and
43 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,35 +1,31 @@ | ||
{"1.2.0", | ||
[{<<"cowlib">>,{pkg,<<"cowlib">>,<<"2.11.0">>},1}, | ||
{<<"goldrush">>,{pkg,<<"goldrush">>,<<"0.1.9">>},1}, | ||
{<<"jsx">>,{pkg,<<"jsx">>,<<"2.10.0">>},0}, | ||
{<<"lager">>,{pkg,<<"lager">>,<<"3.9.2">>},0}, | ||
{<<"jsx">>,{pkg,<<"jsx">>,<<"3.1.0">>},0}, | ||
{<<"mqtt_packet_map">>,{pkg,<<"mqtt_packet_map">>,<<"1.1.0">>},0}, | ||
{<<"router">>,{pkg,<<"router">>,<<"1.0.4">>},0}, | ||
{<<"sidejob">>,{pkg,<<"sidejob">>,<<"2.1.0">>},0}, | ||
{<<"ssl_verify_fun">>,{pkg,<<"ssl_verify_fun">>,<<"1.1.6">>},2}, | ||
{<<"tls_certificate_check">>,{pkg,<<"tls_certificate_check">>,<<"1.6.0">>},1}, | ||
{<<"zotonic_stdlib">>,{pkg,<<"zotonic_stdlib">>,<<"1.4.2">>},0}]}. | ||
{<<"tls_certificate_check">>, | ||
{pkg,<<"tls_certificate_check">>,<<"1.11.0">>}, | ||
1}, | ||
{<<"zotonic_stdlib">>,{pkg,<<"zotonic_stdlib">>,<<"1.6.0">>},0}]}. | ||
[ | ||
{pkg_hash,[ | ||
{<<"cowlib">>, <<"0B9FF9C346629256C42EBE1EEB769A83C6CB771A6EE5960BD110AB0B9B872063">>}, | ||
{<<"goldrush">>, <<"F06E5D5F1277DA5C413E84D5A2924174182FB108DABB39D5EC548B27424CD106">>}, | ||
{<<"jsx">>, <<"77760560D6AC2B8C51FD4C980E9E19B784016AA70BE354CE746472C33BEB0B1C">>}, | ||
{<<"lager">>, <<"4CAB289120EB24964E3886BD22323CB5FEFE4510C076992A23AD18CF85413D8C">>}, | ||
{<<"jsx">>, <<"D12516BAA0BB23A59BB35DCCAF02A1BD08243FCBB9EFE24F2D9D056CCFF71268">>}, | ||
{<<"mqtt_packet_map">>, <<"4D44C0673B936491F9BE7AF4908092AB902C0ADFD168158DC73953A3A560C547">>}, | ||
{<<"router">>, <<"E9311B9D296B41C9484FC933EFD847F87E7DCBAD94DEE1566FB77570CA59EF2C">>}, | ||
{<<"sidejob">>, <<"5D6A7C9C620778CB1908E46B552D767DF2ED4D77070BB7B5B8773D4FF18D1D37">>}, | ||
{<<"ssl_verify_fun">>, <<"CF344F5692C82D2CD7554F5EC8FD961548D4FD09E7D22F5B62482E5AEAEBD4B0">>}, | ||
{<<"tls_certificate_check">>, <<"661C287FC3F7823F6A299834664A5932AC9D369BEFEC2883172DE0623A4AEA12">>}, | ||
{<<"zotonic_stdlib">>, <<"1D361AA0592998E7A5953E38E9440278E9BBF44ECC1E65F7F0F33C8A45759236">>}]}, | ||
{<<"tls_certificate_check">>, <<"609DCD503F31170F0799DAC380EB0E086388CF918FC769AAA60DDD6BBF575218">>}, | ||
{<<"zotonic_stdlib">>, <<"872E85BD2DC8A49A4DA255E4DF6C1B4DDC993E59024D02303BD6A5DF0868859B">>}]}, | ||
{pkg_hash_ext,[ | ||
{<<"cowlib">>, <<"2B3E9DA0B21C4565751A6D4901C20D1B4CC25CBB7FD50D91D2AB6DD287BC86A9">>}, | ||
{<<"goldrush">>, <<"99CB4128CFFCB3227581E5D4D803D5413FA643F4EB96523F77D9E6937D994CEB">>}, | ||
{<<"jsx">>, <<"9A83E3704807298016968DB506F9FAD0F027DE37546EB838B3AE1064C3A0AD62">>}, | ||
{<<"lager">>, <<"7F904D9E87A8CB7E66156ED31768D1C8E26EBA1D54F4BC85B1AA4AC1F6340C28">>}, | ||
{<<"jsx">>, <<"0C5CC8FDC11B53CC25CF65AC6705AD39E54ECC56D1C22E4ADB8F5A53FB9427F3">>}, | ||
{<<"mqtt_packet_map">>, <<"82BD12177EC927244ECC40093FB5A9BFAC317106168FD11F7D49FB3434C6D1DA">>}, | ||
{<<"router">>, <<"C9F939968F8D8BCBEB03087829693E5BEEA6BFFD14A9717200FB57522A141838">>}, | ||
{<<"sidejob">>, <<"6DC3DAC041C8C07C64401ECD22684730DA1497F5F14377B3CA9C5B2B9A135181">>}, | ||
{<<"ssl_verify_fun">>, <<"BDB0D2471F453C88FF3908E7686F86F9BE327D065CC1EC16FA4540197EA04680">>}, | ||
{<<"tls_certificate_check">>, <<"42476E2E40717F09FBF1B559A8A6975B394E46C94BB5EBB043A18269A46401C0">>}, | ||
{<<"zotonic_stdlib">>, <<"6654F4D93DB30AEBD1341255F057BE61138C1AB9F5EECCC0D235CFED21B46B24">>}]} | ||
{<<"tls_certificate_check">>, <<"4AB962212EF7C87482619CB298E1FE06E047B57F0BD58CC417B3B299EB8D036E">>}, | ||
{<<"zotonic_stdlib">>, <<"9139167866615F226C3915B6082B317892E276F3A9C78C5A99CD497FA589444E">>}]} | ||
]. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,8 +1,8 @@ | ||
%% @doc Sidejobs for handling topic subscriptions | ||
%% @author Marc Worrell <[email protected]> | ||
%% @copyright 2018 Marc Worrell | ||
%% @copyright 2018-2022 Marc Worrell | ||
|
||
%% Copyright 2018 Marc Worrell | ||
%% Copyright 2018-2022 Marc Worrell | ||
%% | ||
%% Licensed under the Apache License, Version 2.0 (the "License"); | ||
%% you may not use this file except in compliance with the License. | ||
|
@@ -27,6 +27,7 @@ | |
]). | ||
|
||
|
||
-include_lib("kernel/include/logger.hrl"). | ||
-include_lib("router/include/router.hrl"). | ||
-include_lib("../include/mqtt_sessions.hrl"). | ||
|
||
|
@@ -41,7 +42,7 @@ publish(Pool, Topic, Routes, Msg, PublisherContext) -> | |
{ok, _JobPid} = OK -> | ||
OK; | ||
{error, overload} -> | ||
lager:debug("MQTT sidejob overload, delaying job ~p ...", [ Topic ]), | ||
?LOG_DEBUG("MQTT sidejob overload, delaying job ~p ...", [ Topic ]), | ||
timer:sleep(100), | ||
sidejob_supervisor:spawn( | ||
?MQTT_SESSIONS_JOBS, | ||
|
@@ -60,7 +61,7 @@ publish_retained(Pool, TopicFilter, Ms, Subscriber, Options, SubscriberContext) | |
{ok, _JobPid} -> | ||
ok; | ||
{error, overload} -> | ||
lager:debug("MQTT sidejob overload, delaying retained job ~p ...", [ TopicFilter ]), | ||
?LOG_DEBUG("MQTT sidejob overload, delaying retained job ~p ...", [ TopicFilter ]), | ||
timer:sleep(100), | ||
sidejob_supervisor:spawn( | ||
?MQTT_SESSIONS_JOBS, | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,7 @@ | ||
%% @author Marc Worrell <[email protected]> | ||
%% @copyright 2018 Marc Worrell | ||
%% @copyright 2018-2022 Marc Worrell | ||
|
||
%% Copyright 2018 Marc Worrell | ||
%% Copyright 2018-2022 Marc Worrell | ||
%% | ||
%% Licensed under the Apache License, Version 2.0 (the "License"); | ||
%% you may not use this file except in compliance with the License. | ||
|
@@ -22,6 +22,7 @@ | |
encode/1 | ||
]). | ||
|
||
-include_lib("kernel/include/logger.hrl"). | ||
|
||
-spec encode( mqtt_packet_map:mqtt_packet() ) -> mqtt_packet_map:mqtt_packet(). | ||
encode(#{ type := publish } = Msg) -> | ||
|
@@ -97,7 +98,7 @@ guess_mime({{Y,M,D},{H,I,S}}) when | |
guess_mime(T) when is_tuple(T), is_atom(element(1, T)) -> | ||
<<"application/json">>; | ||
guess_mime(X) -> | ||
lager:info("MQTT payload unknown type for guess_mime: ~p", [X]), | ||
?LOG_INFO("MQTT payload unknown type for guess_mime: ~p", [X]), | ||
<<"binary/octet-stream">>. | ||
|
||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,9 @@ | ||
%% @doc Process handling one single MQTT session. | ||
%% Transports attaches and detaches from this session. | ||
%% @author Marc Worrell <[email protected]> | ||
%% @copyright 2018-2020 Marc Worrell | ||
%% @copyright 2018-2022 Marc Worrell | ||
|
||
%% Copyright 2018-2020 Marc Worrell | ||
%% Copyright 2018-2022 Marc Worrell | ||
%% | ||
%% Licensed under the Apache License, Version 2.0 (the "License"); | ||
%% you may not use this file except in compliance with the License. | ||
|
@@ -110,8 +110,9 @@ | |
}). | ||
|
||
|
||
-include_lib("../include/mqtt_sessions.hrl"). | ||
-include_lib("kernel/include/logger.hrl"). | ||
-include_lib("mqtt_packet_map/include/mqtt_packet_map.hrl"). | ||
-include_lib("../include/mqtt_sessions.hrl"). | ||
|
||
|
||
-spec get_user_context( pid() ) -> {ok, term()} | {error, noproc}. | ||
|
@@ -236,11 +237,11 @@ handle_call({incoming_data, NewData, ConnectionPid}, _From, #state{ incoming_dat | |
{reply, ok, StateRest#state{ keep_alive_counter = 3, incoming_data = Rest }}; | ||
{error, Reason} when is_atom(Reason) -> | ||
% illegal packet, disconnect and wait for new connection | ||
lager:info("Error decoding incoming data: ~p", [ Reason ]), | ||
?LOG_INFO("Error decoding incoming data: ~p", [ Reason ]), | ||
{reply, {error, Reason}, force_disconnect(State)} | ||
end; | ||
handle_call({incoming_data, _NewData, ConnectionPid}, _From, State) -> | ||
lager:debug("MQTT session incoming data from ~p, expected from ~p", [ConnectionPid, State#state.connection_pid]), | ||
?LOG_DEBUG("MQTT session incoming data from ~p, expected from ~p", [ConnectionPid, State#state.connection_pid]), | ||
{reply, {error, wrong_connection}, State}; | ||
handle_call(Cmd, _From, State) -> | ||
{stop, {unknown_cmd, Cmd}, State}. | ||
|
@@ -261,7 +262,7 @@ handle_info({mqtt_msg, #{ type := publish } = MqttMsg}, State) -> | |
{noreply, State1}; | ||
|
||
handle_info({keep_alive, Ref}, #state{ keep_alive_counter = 0, keep_alive_ref = Ref } = State) -> | ||
lager:debug("MQTT past keep_alive, disconnecting transport"), | ||
?LOG_DEBUG("MQTT past keep_alive, disconnecting transport"), | ||
{noreply, force_disconnect(State)}; | ||
handle_info({keep_alive, Ref}, #state{ keep_alive_counter = N, keep_alive_ref = Ref } = State) -> | ||
erlang:send_after(State#state.keep_alive * 500, self(), {keep_alive, Ref}), | ||
|
@@ -299,7 +300,7 @@ handle_info({'DOWN', _Mref, process, Pid, _Reason}, State) -> | |
{noreply, State1}; | ||
|
||
handle_info(Info, State) -> | ||
lager:info("Unknown info message ~p", [Info]), | ||
?LOG_INFO("Unknown info message ~p", [Info]), | ||
{noreply, State}. | ||
|
||
code_change(_Vsn, State, _Extra) -> | ||
|
@@ -350,12 +351,12 @@ handle_incoming(#{ type := connect } = Msg, Options, State) -> | |
handle_incoming(#{ type := auth } = Msg, _Options, State) -> | ||
packet_connect_auth(Msg, State); | ||
handle_incoming(#{ type := Type }, _Options, #state{ connection_pid = undefined } = State) -> | ||
lager:info("Dropping packet for MQTT session ~p ~s (~p) for receiving ~p when not connected.", | ||
?LOG_INFO("Dropping packet for MQTT session ~p ~s (~p) for receiving ~p when not connected.", | ||
[State#state.pool, State#state.client_id, self(), Type]), | ||
{error, not_connected}; | ||
handle_incoming(#{ type := Type }, _Options, #state{ is_session_present = false } = State) -> | ||
% Only AUTH and CONNECT before the CONNACK | ||
lager:info("Killing MQTT session ~p ~s (~p) for receiving ~p when no session started.", | ||
?LOG_INFO("Killing MQTT session ~p ~s (~p) for receiving ~p when no session started.", | ||
[State#state.pool, State#state.client_id, self(), Type]), | ||
{stop, State}; | ||
handle_incoming(#{ type := publish } = Msg, _Options, State) -> | ||
|
@@ -388,7 +389,7 @@ handle_incoming(#{ type := disconnect } = Msg, _Options, State) -> | |
packet_disconnect(Msg, State); | ||
|
||
handle_incoming(#{ type := Type }, _Options, State) -> | ||
lager:info("MQTT dropping unhandled packet with type ~p", [Type]), | ||
?LOG_INFO("MQTT dropping unhandled packet with type ~p", [Type]), | ||
{ok, State}. | ||
|
||
% --------------------------------------------------------------------------------------- | ||
|
@@ -466,7 +467,7 @@ handle_connect_auth_1({ok, #{ type := connack, reason_code := ?MQTT_RC_SUCCESS } | |
{ok, State3}; | ||
handle_connect_auth_1({ok, #{ type := connack, reason_code := ReasonCode } = ConnAck, _UserContext1}, _Msg, StateIfAccept, _State) -> | ||
_ = reply_connack(ConnAck, StateIfAccept), | ||
lager:debug("MQTT connect/auth refused (~p): ~p", [ReasonCode, ConnAck]), | ||
?LOG_DEBUG("MQTT connect/auth refused (~p): ~p", [ReasonCode, ConnAck]), | ||
{error, connection_refused}; | ||
handle_connect_auth_1({ok, #{ type := auth } = Auth, UserContext1}, _Msg, StateIfAccept, _State) -> | ||
State1 = StateIfAccept#state{ | ||
|
@@ -477,7 +478,7 @@ handle_connect_auth_1({ok, #{ type := auth } = Auth, UserContext1}, _Msg, StateI | |
State2#state.session_expiry_interval, State2#state.user_context), | ||
{ok, State2}; | ||
handle_connect_auth_1({error, Reason}, Msg, _StateIfAccept, _State) -> | ||
lager:info("MQTT connect/auth refused (~p): ~p", [Reason, Msg]), | ||
?LOG_INFO("MQTT connect/auth refused (~p): ~p", [Reason, Msg]), | ||
{error, connection_refused}. | ||
|
||
|
||
|
@@ -612,7 +613,7 @@ packet_pubrel(#{ packet_id := PacketId, reason_code := ?MQTT_RC_SUCCESS }, #stat | |
end; | ||
packet_pubrel(#{ packet_id := PacketId, reason_code := RC }, #state{ awaiting_rel = WaitRel } = State) -> | ||
% Error server/client out of sync - remove the wait-rel for this packet_id | ||
lager:info("PUBREL with reason ~p for packet ~p", | ||
?LOG_INFO("PUBREL with reason ~p for packet ~p", | ||
[ RC, PacketId ]), | ||
WaitRel1 = maps:remove(PacketId, WaitRel), | ||
{ok, State#state{ awaiting_rel = WaitRel1 }}. | ||
|
@@ -624,7 +625,7 @@ packet_puback(#{ packet_id := PacketId }, #state{ awaiting_ack = WaitAck } = Sta | |
{ok, {_MsgNr, puback, _Msg}} -> | ||
maps:remove(PacketId, WaitAck); | ||
{ok, {_MsgNr, Wait, Msg}} -> | ||
lager:warning("PUBACK for message ~p waiting for ~p. Message: ~p", | ||
?LOG_WARNING("PUBACK for message ~p waiting for ~p. Message: ~p", | ||
[ PacketId, Wait, Msg ]), | ||
maps:remove(PacketId, WaitAck); | ||
error -> | ||
|
@@ -640,7 +641,7 @@ packet_pubrec(#{ packet_id := PacketId, reason_code := RC }, #state{ awaiting_ac | |
{ok, {_MsgNr, pubcomp, _Msg}} -> | ||
maps:remove(PacketId, WaitAck); | ||
{ok, {_MsgNr, Wait, Msg}} -> | ||
lager:warning("PUBREC for message ~p waiting for ~p. Message: ~p", | ||
?LOG_WARNING("PUBREC for message ~p waiting for ~p. Message: ~p", | ||
[ PacketId, Wait, Msg ]), | ||
maps:remove(PacketId, WaitAck); | ||
error -> | ||
|
@@ -654,7 +655,7 @@ packet_pubrec(#{ packet_id := PacketId }, #state{ awaiting_ack = WaitAck } = Sta | |
{ok, {_MsgNr, pubcomp, _Msg}} -> | ||
{WaitAck, ?MQTT_RC_SUCCESS}; | ||
{ok, {_MsgNr, Wait, Msg}} -> | ||
lager:warning("PUBREC for message ~p waiting for ~p. Message: ~p", | ||
?LOG_WARNING("PUBREC for message ~p waiting for ~p. Message: ~p", | ||
[ PacketId, Wait, Msg ]), | ||
{maps:remove(PacketId, WaitAck), ?MQTT_RC_PACKET_ID_NOT_FOUND}; | ||
error -> | ||
|
@@ -674,7 +675,7 @@ packet_pubcomp(#{ packet_id := PacketId }, #state{ awaiting_ack = WaitAck } = St | |
{ok, {_MsgNr, pubcomp, _Msg}} -> | ||
maps:remove(PacketId, WaitAck); | ||
{ok, {_MsgNr, Wait, Msg}} -> | ||
lager:warning("PUBREC for message ~p waiting for ~p. Message: ~p", | ||
?LOG_WARNING("PUBREC for message ~p waiting for ~p. Message: ~p", | ||
[ PacketId, Wait, Msg ]), | ||
maps:remove(PacketId, WaitAck); | ||
error -> | ||
|