commit 5111914a9b1b92b0b497f825c77bdd365e3989b0 Author: anlicheng <244108715@qq.com> Date: Fri May 16 15:50:44 2025 +0800 init project diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f1c4554 --- /dev/null +++ b/.gitignore @@ -0,0 +1,19 @@ +.rebar3 +_* +.eunit +*.o +*.beam +*.plt +*.swp +*.swo +.erlang.cookie +ebin +log +erl_crash.dump +.rebar +logs +_build +.idea +*.iml +rebar3.crashdump +*~ diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..20015b1 --- /dev/null +++ b/LICENSE @@ -0,0 +1,191 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + Copyright 2025, anlicheng <244108715@qq.com>. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + diff --git a/README.md b/README.md new file mode 100644 index 0000000..8024f9d --- /dev/null +++ b/README.md @@ -0,0 +1,9 @@ +emqtt +===== + +An OTP library + +Build +----- + + $ rebar3 compile diff --git a/include/emqtt.hrl b/include/emqtt.hrl new file mode 100644 index 0000000..48cc014 --- /dev/null +++ b/include/emqtt.hrl @@ -0,0 +1,535 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-ifndef(EMQTT_HRL). +-define(EMQTT_HRL, true). + +%%-------------------------------------------------------------------- +%% MQTT Protocol Version and Names +%%-------------------------------------------------------------------- + +-define(MQTT_PROTO_V3, 3). +-define(MQTT_PROTO_V4, 4). +-define(MQTT_PROTO_V5, 5). + +-define(PROTOCOL_NAMES, [ + {?MQTT_PROTO_V3, <<"MQIsdp">>}, + {?MQTT_PROTO_V4, <<"MQTT">>}, + {?MQTT_PROTO_V5, <<"MQTT">>}]). + +%%-------------------------------------------------------------------- +%% MQTT QoS Levels +%%-------------------------------------------------------------------- + +-define(QOS_0, 0). %% At most once +-define(QOS_1, 1). %% At least once +-define(QOS_2, 2). %% Exactly once + +-define(IS_QOS(I), (I >= ?QOS_0 andalso I =< ?QOS_2)). + +-define(QOS_I(Name), + begin + (case Name of + ?QOS_0 -> ?QOS_0; + qos0 -> ?QOS_0; + at_most_once -> ?QOS_0; + ?QOS_1 -> ?QOS_1; + qos1 -> ?QOS_1; + at_least_once -> ?QOS_1; + ?QOS_2 -> ?QOS_2; + qos2 -> ?QOS_2; + exactly_once -> ?QOS_2 + end) + end). + +-define(IS_QOS_NAME(I), + (I =:= qos0 orelse I =:= at_most_once orelse + I =:= qos1 orelse I =:= at_least_once orelse + I =:= qos2 orelse I =:= exactly_once)). + +%%-------------------------------------------------------------------- +%% Maximum ClientId Length. +%%-------------------------------------------------------------------- + +-define(MAX_CLIENTID_LEN, 65535). + +%%-------------------------------------------------------------------- +%% MQTT Control Packet Types +%%-------------------------------------------------------------------- + +-define(RESERVED, 0). %% Reserved +-define(CONNECT, 1). %% Client request to connect to Server +-define(CONNACK, 2). %% Server to Client: Connect acknowledgment +-define(PUBLISH, 3). %% Publish message +-define(PUBACK, 4). %% Publish acknowledgment +-define(PUBREC, 5). %% Publish received (assured delivery part 1) +-define(PUBREL, 6). %% Publish release (assured delivery part 2) +-define(PUBCOMP, 7). %% Publish complete (assured delivery part 3) +-define(SUBSCRIBE, 8). %% Client subscribe request +-define(SUBACK, 9). %% Server Subscribe acknowledgment +-define(UNSUBSCRIBE, 10). %% Unsubscribe request +-define(UNSUBACK, 11). %% Unsubscribe acknowledgment +-define(PINGREQ, 12). %% PING request +-define(PINGRESP, 13). %% PING response +-define(DISCONNECT, 14). %% Client or Server is disconnecting +-define(AUTH, 15). %% Authentication exchange + +-define(TYPE_NAMES, [ + 'CONNECT', + 'CONNACK', + 'PUBLISH', + 'PUBACK', + 'PUBREC', + 'PUBREL', + 'PUBCOMP', + 'SUBSCRIBE', + 'SUBACK', + 'UNSUBSCRIBE', + 'UNSUBACK', + 'PINGREQ', + 'PINGRESP', + 'DISCONNECT', + 'AUTH']). + +%%-------------------------------------------------------------------- +%% MQTT V3.1.1 Connect Return Codes +%%-------------------------------------------------------------------- + +-define(CONNACK_ACCEPT, 0). %% Connection accepted +-define(CONNACK_PROTO_VER, 1). %% Unacceptable protocol version +-define(CONNACK_INVALID_ID, 2). %% Client Identifier is correct UTF-8 but not allowed by the Server +-define(CONNACK_SERVER, 3). %% Server unavailable +-define(CONNACK_CREDENTIALS, 4). %% Username or password is malformed +-define(CONNACK_AUTH, 5). %% Client is not authorized to connect + +%%-------------------------------------------------------------------- +%% MQTT V5.0 Reason Codes +%%-------------------------------------------------------------------- + +-define(RC_SUCCESS, 16#00). +-define(RC_NORMAL_DISCONNECTION, 16#00). +-define(RC_GRANTED_QOS_0, 16#00). +-define(RC_GRANTED_QOS_1, 16#01). +-define(RC_GRANTED_QOS_2, 16#02). +-define(RC_DISCONNECT_WITH_WILL_MESSAGE, 16#04). +-define(RC_NO_MATCHING_SUBSCRIBERS, 16#10). +-define(RC_NO_SUBSCRIPTION_EXISTED, 16#11). +-define(RC_CONTINUE_AUTHENTICATION, 16#18). +-define(RC_RE_AUTHENTICATE, 16#19). +-define(RC_UNSPECIFIED_ERROR, 16#80). +-define(RC_MALFORMED_PACKET, 16#81). +-define(RC_PROTOCOL_ERROR, 16#82). +-define(RC_IMPLEMENTATION_SPECIFIC_ERROR, 16#83). +-define(RC_UNSUPPORTED_PROTOCOL_VERSION, 16#84). +-define(RC_CLIENT_IDENTIFIER_NOT_VALID, 16#85). +-define(RC_BAD_USER_NAME_OR_PASSWORD, 16#86). +-define(RC_NOT_AUTHORIZED, 16#87). +-define(RC_SERVER_UNAVAILABLE, 16#88). +-define(RC_SERVER_BUSY, 16#89). +-define(RC_BANNED, 16#8A). +-define(RC_SERVER_SHUTTING_DOWN, 16#8B). +-define(RC_BAD_AUTHENTICATION_METHOD, 16#8C). +-define(RC_KEEP_ALIVE_TIMEOUT, 16#8D). +-define(RC_SESSION_TAKEN_OVER, 16#8E). +-define(RC_TOPIC_FILTER_INVALID, 16#8F). +-define(RC_TOPIC_NAME_INVALID, 16#90). +-define(RC_PACKET_IDENTIFIER_IN_USE, 16#91). +-define(RC_PACKET_IDENTIFIER_NOT_FOUND, 16#92). +-define(RC_RECEIVE_MAXIMUM_EXCEEDED, 16#93). +-define(RC_TOPIC_ALIAS_INVALID, 16#94). +-define(RC_PACKET_TOO_LARGE, 16#95). +-define(RC_MESSAGE_RATE_TOO_HIGH, 16#96). +-define(RC_QUOTA_EXCEEDED, 16#97). +-define(RC_ADMINISTRATIVE_ACTION, 16#98). +-define(RC_PAYLOAD_FORMAT_INVALID, 16#99). +-define(RC_RETAIN_NOT_SUPPORTED, 16#9A). +-define(RC_QOS_NOT_SUPPORTED, 16#9B). +-define(RC_USE_ANOTHER_SERVER, 16#9C). +-define(RC_SERVER_MOVED, 16#9D). +-define(RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED, 16#9E). +-define(RC_CONNECTION_RATE_EXCEEDED, 16#9F). +-define(RC_MAXIMUM_CONNECT_TIME, 16#A0). +-define(RC_SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED, 16#A1). +-define(RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED, 16#A2). + +%%-------------------------------------------------------------------- +%% Maximum MQTT Packet ID and Length +%%-------------------------------------------------------------------- + +-define(MAX_PACKET_ID, 16#ffff). +-define(MAX_PACKET_SIZE, 16#fffffff). + +%%-------------------------------------------------------------------- +%% MQTT Frame Mask +%%-------------------------------------------------------------------- + +-define(HIGHBIT, 2#10000000). +-define(LOWBITS, 2#01111111). + +%%-------------------------------------------------------------------- +%% MQTT Packet Fixed Header +%%-------------------------------------------------------------------- + +-record(mqtt_packet_header, { + type = ?RESERVED, + dup = false, + qos = ?QOS_0, + retain = false + }). + +%%-------------------------------------------------------------------- +%% MQTT Packets +%%-------------------------------------------------------------------- + +-define(DEFAULT_SUBOPTS, #{rh => 0, %% Retain Handling + rap => 0, %% Retain as Publish + nl => 0, %% No Local + qos => 0 %% QoS + }). + +-record(mqtt_packet_connect, { + proto_name = <<"MQTT">>, + proto_ver = ?MQTT_PROTO_V4, + is_bridge = false, + clean_start = true, + will_flag = false, + will_qos = ?QOS_0, + will_retain = false, + keepalive = 0, + properties = undefined, + clientid = <<>>, + will_props = undefined, + will_topic = undefined, + will_payload = undefined, + username = undefined, + password = undefined + }). + +-record(mqtt_packet_connack, { + ack_flags, + reason_code, + properties + }). + +-record(mqtt_packet_publish, { + topic_name, + packet_id, + properties + }). + +-record(mqtt_packet_puback, { + packet_id, + reason_code, + properties + }). + +-record(mqtt_packet_subscribe, { + packet_id, + properties, + topic_filters + }). + +-record(mqtt_packet_suback, { + packet_id, + properties, + reason_codes + }). + +-record(mqtt_packet_unsubscribe, { + packet_id, + properties, + topic_filters + }). + +-record(mqtt_packet_unsuback, { + packet_id, + properties, + reason_codes + }). + +-record(mqtt_packet_disconnect, { + reason_code, + properties + }). + +-record(mqtt_packet_auth, { + reason_code, + properties + }). + +%%-------------------------------------------------------------------- +%% MQTT Message +%%-------------------------------------------------------------------- + +-record(mqtt_msg, { + qos = ?QOS_0 :: emqtt:qos(), + retain = false :: boolean(), + dup = false :: boolean(), + packet_id :: emqtt:packet_id(), + topic :: emqtt:topic(), + props :: emqtt:properties(), + payload :: binary() + }). + +%%-------------------------------------------------------------------- +%% MQTT Control Packet +%%-------------------------------------------------------------------- + +-record(mqtt_packet, { + header :: #mqtt_packet_header{}, + variable :: #mqtt_packet_connect{} + | #mqtt_packet_connack{} + | #mqtt_packet_publish{} + | #mqtt_packet_puback{} + | #mqtt_packet_subscribe{} + | #mqtt_packet_suback{} + | #mqtt_packet_unsubscribe{} + | #mqtt_packet_unsuback{} + | #mqtt_packet_disconnect{} + | #mqtt_packet_auth{} + | pos_integer() + | undefined, + payload :: binary() | undefined + }). + +%%-------------------------------------------------------------------- +%% MQTT Packet Match +%%-------------------------------------------------------------------- + +-define(CONNECT_PACKET(Var), + #mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT}, + variable = Var}). + +-define(CONNACK_PACKET(ReasonCode), + #mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}, + variable = #mqtt_packet_connack{ack_flags = 0, + reason_code = ReasonCode} + }). + +-define(CONNACK_PACKET(ReasonCode, SessPresent), + #mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}, + variable = #mqtt_packet_connack{ack_flags = SessPresent, + reason_code = ReasonCode} + }). + +-define(CONNACK_PACKET(ReasonCode, SessPresent, Properties), + #mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}, + variable = #mqtt_packet_connack{ack_flags = SessPresent, + reason_code = ReasonCode, + properties = Properties} + }). + +-define(AUTH_PACKET(), + #mqtt_packet{header = #mqtt_packet_header{type = ?AUTH}, + variable = #mqtt_packet_auth{reason_code = 0} + }). + +-define(AUTH_PACKET(ReasonCode), + #mqtt_packet{header = #mqtt_packet_header{type = ?AUTH}, + variable = #mqtt_packet_auth{reason_code = ReasonCode} + }). + +-define(AUTH_PACKET(ReasonCode, Properties), + #mqtt_packet{header = #mqtt_packet_header{type = ?AUTH}, + variable = #mqtt_packet_auth{reason_code = ReasonCode, + properties = Properties} + }). + +-define(PUBLISH_PACKET(QoS), + #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, qos = QoS}}). + +-define(PUBLISH_PACKET(QoS, PacketId), + #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, + qos = QoS}, + variable = #mqtt_packet_publish{packet_id = PacketId} + }). + +-define(PUBLISH_PACKET(QoS, Topic, PacketId, Payload), + #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, + qos = QoS}, + variable = #mqtt_packet_publish{topic_name = Topic, + packet_id = PacketId}, + payload = Payload + }). + +-define(PUBLISH_PACKET(QoS, Topic, PacketId, Properties, Payload), + #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, + qos = QoS}, + variable = #mqtt_packet_publish{topic_name = Topic, + packet_id = PacketId, + properties = Properties}, + payload = Payload + }). + +-define(PUBACK_PACKET(PacketId), + #mqtt_packet{header = #mqtt_packet_header{type = ?PUBACK}, + variable = #mqtt_packet_puback{packet_id = PacketId, + reason_code = 0} + }). + +-define(PUBACK_PACKET(PacketId, ReasonCode), + #mqtt_packet{header = #mqtt_packet_header{type = ?PUBACK}, + variable = #mqtt_packet_puback{packet_id = PacketId, + reason_code = ReasonCode} + }). + +-define(PUBACK_PACKET(PacketId, ReasonCode, Properties), + #mqtt_packet{header = #mqtt_packet_header{type = ?PUBACK}, + variable = #mqtt_packet_puback{packet_id = PacketId, + reason_code = ReasonCode, + properties = Properties} + }). + +-define(PUBREC_PACKET(PacketId), + #mqtt_packet{header = #mqtt_packet_header{type = ?PUBREC}, + variable = #mqtt_packet_puback{packet_id = PacketId, + reason_code = 0} + }). + +-define(PUBREC_PACKET(PacketId, ReasonCode), + #mqtt_packet{header = #mqtt_packet_header{type = ?PUBREC}, + variable = #mqtt_packet_puback{packet_id = PacketId, + reason_code = ReasonCode} + }). + +-define(PUBREC_PACKET(PacketId, ReasonCode, Properties), + #mqtt_packet{header = #mqtt_packet_header{type = ?PUBREC}, + variable = #mqtt_packet_puback{packet_id = PacketId, + reason_code = ReasonCode, + properties = Properties} + }). + +-define(PUBREL_PACKET(PacketId), + #mqtt_packet{header = #mqtt_packet_header{type = ?PUBREL, + qos = ?QOS_1}, + variable = #mqtt_packet_puback{packet_id = PacketId, + reason_code = 0} + }). + +-define(PUBREL_PACKET(PacketId, ReasonCode), + #mqtt_packet{header = #mqtt_packet_header{type = ?PUBREL, + qos = ?QOS_1}, + variable = #mqtt_packet_puback{packet_id = PacketId, + reason_code = ReasonCode} + }). + +-define(PUBREL_PACKET(PacketId, ReasonCode, Properties), + #mqtt_packet{header = #mqtt_packet_header{type = ?PUBREL, + qos = ?QOS_1}, + variable = #mqtt_packet_puback{packet_id = PacketId, + reason_code = ReasonCode, + properties = Properties} + }). + +-define(PUBCOMP_PACKET(PacketId), + #mqtt_packet{header = #mqtt_packet_header{type = ?PUBCOMP}, + variable = #mqtt_packet_puback{packet_id = PacketId, + reason_code = 0} + }). + +-define(PUBCOMP_PACKET(PacketId, ReasonCode), + #mqtt_packet{header = #mqtt_packet_header{type = ?PUBCOMP}, + variable = #mqtt_packet_puback{packet_id = PacketId, + reason_code = ReasonCode} + }). + +-define(PUBCOMP_PACKET(PacketId, ReasonCode, Properties), + #mqtt_packet{header = #mqtt_packet_header{type = ?PUBCOMP}, + variable = #mqtt_packet_puback{packet_id = PacketId, + reason_code = ReasonCode, + properties = Properties} + }). + +-define(SUBSCRIBE_PACKET(PacketId, TopicFilters), + #mqtt_packet{header = #mqtt_packet_header{type = ?SUBSCRIBE, + qos = ?QOS_1}, + variable = #mqtt_packet_subscribe{packet_id = PacketId, + topic_filters = TopicFilters} + }). + +-define(SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), + #mqtt_packet{header = #mqtt_packet_header{type = ?SUBSCRIBE, + qos = ?QOS_1}, + variable = #mqtt_packet_subscribe{packet_id = PacketId, + properties = Properties, + topic_filters = TopicFilters} + }). + +-define(SUBACK_PACKET(PacketId, ReasonCodes), + #mqtt_packet{header = #mqtt_packet_header{type = ?SUBACK}, + variable = #mqtt_packet_suback{packet_id = PacketId, + reason_codes = ReasonCodes} + }). + +-define(SUBACK_PACKET(PacketId, Properties, ReasonCodes), + #mqtt_packet{header = #mqtt_packet_header{type = ?SUBACK}, + variable = #mqtt_packet_suback{packet_id = PacketId, + properties = Properties, + reason_codes = ReasonCodes} + }). + +-define(UNSUBSCRIBE_PACKET(PacketId, TopicFilters), + #mqtt_packet{header = #mqtt_packet_header{type = ?UNSUBSCRIBE, + qos = ?QOS_1}, + variable = #mqtt_packet_unsubscribe{packet_id = PacketId, + topic_filters = TopicFilters} + }). + +-define(UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), + #mqtt_packet{header = #mqtt_packet_header{type = ?UNSUBSCRIBE, + qos = ?QOS_1}, + variable = #mqtt_packet_unsubscribe{packet_id = PacketId, + properties = Properties, + topic_filters = TopicFilters} + }). + +-define(UNSUBACK_PACKET(PacketId), + #mqtt_packet{header = #mqtt_packet_header{type = ?UNSUBACK}, + variable = #mqtt_packet_unsuback{packet_id = PacketId} + }). + +-define(UNSUBACK_PACKET(PacketId, ReasonCodes), + #mqtt_packet{header = #mqtt_packet_header{type = ?UNSUBACK}, + variable = #mqtt_packet_unsuback{packet_id = PacketId, + reason_codes = ReasonCodes} + }). + +-define(UNSUBACK_PACKET(PacketId, Properties, ReasonCodes), + #mqtt_packet{header = #mqtt_packet_header{type = ?UNSUBACK}, + variable = #mqtt_packet_unsuback{packet_id = PacketId, + properties = Properties, + reason_codes = ReasonCodes} + }). + +-define(DISCONNECT_PACKET(), + #mqtt_packet{header = #mqtt_packet_header{type = ?DISCONNECT}, + variable = #mqtt_packet_disconnect{reason_code = 0} + }). + +-define(DISCONNECT_PACKET(ReasonCode), + #mqtt_packet{header = #mqtt_packet_header{type = ?DISCONNECT}, + variable = #mqtt_packet_disconnect{reason_code = ReasonCode} + }). + +-define(DISCONNECT_PACKET(ReasonCode, Properties), + #mqtt_packet{header = #mqtt_packet_header{type = ?DISCONNECT}, + variable = #mqtt_packet_disconnect{reason_code = ReasonCode, + properties = Properties} + }). + +-define(PACKET(Type), #mqtt_packet{header = #mqtt_packet_header{type = Type}}). + +-endif. diff --git a/rebar.config b/rebar.config new file mode 100644 index 0000000..f618f3e --- /dev/null +++ b/rebar.config @@ -0,0 +1,2 @@ +{erl_opts, [debug_info]}. +{deps, []}. \ No newline at end of file diff --git a/rebar.lock b/rebar.lock new file mode 100644 index 0000000..57afcca --- /dev/null +++ b/rebar.lock @@ -0,0 +1 @@ +[]. diff --git a/src/emqtt.app.src b/src/emqtt.app.src new file mode 100644 index 0000000..2d3da15 --- /dev/null +++ b/src/emqtt.app.src @@ -0,0 +1,14 @@ +{application, emqtt, + [{description, "An OTP library"}, + {vsn, "0.1.0"}, + {registered, []}, + {applications, + [kernel, + stdlib + ]}, + {env,[]}, + {modules, []}, + + {licenses, ["Apache-2.0"]}, + {links, []} + ]}. diff --git a/src/emqtt.erl b/src/emqtt.erl new file mode 100644 index 0000000..586c828 --- /dev/null +++ b/src/emqtt.erl @@ -0,0 +1,1319 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqtt). + +-behaviour(gen_statem). + +-include("emqtt.hrl"). + +-export([start_link/0, start_link/1]). + +-export([connect/1, connect/2, disconnect/1, disconnect/2, disconnect/3]). + +-export([ping/1]). + +%% PubSub +-export([ subscribe/2, subscribe/3, subscribe/4, publish/2, publish/3, publish/4, publish/5, unsubscribe/2, unsubscribe/3]). + +%% Puback... +-export([puback/2, puback/3, puback/4, pubrec/2, pubrec/3, pubrec/4, pubrel/2, pubrel/3, pubrel/4, pubcomp/2, pubcomp/3, pubcomp/4 ]). + +-export([subscriptions/1]). + +-export([info/1, stop/1]). + +%% For test cases +-export([ pause/1, resume/1 ]). + +-export([ initialized/3, waiting_for_connack/3, connected/3, inflight_full/3, random_client_id/0, reason_code_name/1 ]). + +-export([ init/1, callback_mode/0, handle_event/4, terminate/3, code_change/4 ]). + +-export_type([host/0 , option/0 , properties/0 , payload/0 , pubopt/0 , subopt/0 , mqtt_msg/0 , client/0]). + +-type(host() :: inet:ip_address() | inet:hostname()). + +%% Message handler is a set of callbacks defined to handle MQTT messages +%% as well as the disconnect event. +-define(NO_MSG_HDLR, undefined). + +-type(mfas() :: {module(), atom(), list()} | {function(), list()}). + +-type(msg_handler() :: #{puback := fun((_) -> any()) | mfas(), + publish := fun((emqx_types:message()) -> any()) | mfas(), + disconnected := fun(({reason_code(), _Properties :: term()}) -> any()) | mfas() + }). + +-type(option() :: {name, atom()} + | {owner, pid()} + | {msg_handler, msg_handler()} + | {host, host()} + | {hosts, [{host(), inet:port_number()}]} + | {port, inet:port_number()} + | {tcp_opts, [gen_tcp:option()]} + | {ssl, boolean()} + | {ssl_opts, [ssl:ssl_option()]} + | {ws_path, string()} + | {connect_timeout, pos_integer()} + | {bridge_mode, boolean()} + | {clientid, iodata()} + | {clean_start, boolean()} + | {username, iodata()} + | {password, iodata()} + | {proto_ver, v3 | v4 | v5} + | {keepalive, non_neg_integer()} + | {max_inflight, pos_integer()} + | {retry_interval, timeout()} + | {will_topic, iodata()} + | {will_payload, iodata()} + | {will_retain, boolean()} + | {will_qos, qos()} + | {will_props, properties()} + | {auto_ack, boolean()} + | {ack_timeout, pos_integer()} + | {force_ping, boolean()} + | {properties, properties()}). + +-type(maybe(T) :: undefined | T). +-type(topic() :: binary()). +-type(payload() :: iodata()). +-type(packet_id() :: 0..16#FFFF). +-type(reason_code() :: 0..16#FF). +-type(properties() :: #{atom() => term()}). +-type(version() :: ?MQTT_PROTO_V3 + | ?MQTT_PROTO_V4 + | ?MQTT_PROTO_V5). +-type(qos() :: ?QOS_0 | ?QOS_1 | ?QOS_2). +-type(qos_name() :: qos0 | at_most_once | + qos1 | at_least_once | + qos2 | exactly_once). +-type(pubopt() :: {retain, boolean()} + | {qos, qos() | qos_name()}). +-type(subopt() :: {rh, 0 | 1 | 2} + | {rap, boolean()} + | {nl, boolean()} + | {qos, qos() | qos_name()}). + +-type(subscribe_ret() :: + {ok, properties(), [reason_code()]} | {error, term()}). + +-type(client() :: pid() | atom()). + +-opaque(mqtt_msg() :: #mqtt_msg{}). + +-record(state, { + name :: atom(), + owner :: pid(), + msg_handler :: ?NO_MSG_HDLR | msg_handler(), + host :: host(), + port :: inet:port_number(), + hosts :: [{host(), inet:port_number()}], + socket :: inet:socket() | pid(), + sock_opts :: [emqtt_sock:option()|emqtt_ws:option()], + connect_timeout :: pos_integer(), + bridge_mode :: boolean(), + clientid :: binary(), + clean_start :: boolean(), + username :: maybe(binary()), + password :: maybe(binary()), + proto_ver :: version(), + proto_name :: iodata(), + keepalive :: non_neg_integer(), + keepalive_timer :: maybe(reference()), + force_ping :: boolean(), + paused :: boolean(), + will_flag :: boolean(), + will_msg :: mqtt_msg(), + properties :: properties(), + pending_calls :: list(), + subscriptions :: map(), + max_inflight :: infinity | pos_integer(), + inflight :: #{packet_id() => term()}, + awaiting_rel :: map(), + auto_ack :: boolean(), + ack_timeout :: pos_integer(), + ack_timer :: reference(), + retry_interval :: pos_integer(), + retry_timer :: reference(), + session_present :: boolean(), + last_packet_id :: packet_id(), + parse_state :: emqtt_frame:parse_state() + }). + +-record(call, { + id, + from, + req, + ts +}). + +%% Default timeout +-define(DEFAULT_KEEPALIVE, 60). +-define(DEFAULT_RETRY_INTERVAL, 30000). +-define(DEFAULT_ACK_TIMEOUT, 30000). +-define(DEFAULT_CONNECT_TIMEOUT, 60000). + +-define(PROPERTY(Name, Val), #state{properties = #{Name := Val}}). + +-define(WILL_MSG(QoS, Retain, Topic, Props, Payload), + #mqtt_msg{qos = QoS, + retain = Retain, + topic = Topic, + props = Props, + payload = Payload + }). + +-define(NO_CLIENT_ID, <<>>). + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- + +-spec(start_link() -> gen_statem:start_ret()). +start_link() -> start_link([]). + +-spec(start_link(map() | [option()]) -> gen_statem:start_ret()). +start_link(Options) when is_map(Options) -> + start_link(maps:to_list(Options)); +start_link(Options) when is_list(Options) -> + ok = emqtt_props:validate( + proplists:get_value(properties, Options, #{})), + case proplists:get_value(name, Options) of + undefined -> + gen_statem:start_link(?MODULE, [with_owner(Options)], []); + Name when is_atom(Name) -> + gen_statem:start_link({local, Name}, ?MODULE, [with_owner(Options)], []) + end. + +with_owner(Options) -> + case proplists:get_value(owner, Options) of + Owner when is_pid(Owner) -> + Options; + undefined -> + [{owner, self()} | Options] + end. + +-spec(connect(client()) -> {ok, properties()} | {error, term()}). +connect(Client) -> + connect(Client, infinity). +connect(Client, Timeout) -> + gen_statem:call(Client, connect, Timeout). + +-spec(subscribe(client(), topic() | {topic(), qos() | qos_name() | [subopt()]} | [{topic(), qos()}]) + -> subscribe_ret()). +subscribe(Client, Topic) when is_binary(Topic) -> + subscribe(Client, {Topic, ?QOS_0}); +subscribe(Client, {Topic, QoS}) when is_binary(Topic), is_atom(QoS) -> + subscribe(Client, {Topic, ?QOS_I(QoS)}); +subscribe(Client, {Topic, QoS}) when is_binary(Topic), ?IS_QOS(QoS) -> + subscribe(Client, [{Topic, ?QOS_I(QoS)}]); +subscribe(Client, Topics) when is_list(Topics) -> + subscribe(Client, #{}, lists:map( + fun({Topic, QoS}) when is_binary(Topic), is_atom(QoS) -> + {Topic, [{qos, ?QOS_I(QoS)}]}; + ({Topic, QoS}) when is_binary(Topic), ?IS_QOS(QoS) -> + {Topic, [{qos, ?QOS_I(QoS)}]}; + ({Topic, Opts}) when is_binary(Topic), is_list(Opts) -> + {Topic, Opts} + end, Topics)). + +-spec(subscribe(client(), topic(), qos() | qos_name() | [subopt()]) -> + subscribe_ret(); + (client(), properties(), [{topic(), qos() | [subopt()]}]) -> + subscribe_ret()). +subscribe(Client, Topic, QoS) when is_binary(Topic), is_atom(QoS) -> + subscribe(Client, Topic, ?QOS_I(QoS)); +subscribe(Client, Topic, QoS) when is_binary(Topic), ?IS_QOS(QoS) -> + subscribe(Client, Topic, [{qos, QoS}]); +subscribe(Client, Topic, Opts) when is_binary(Topic), is_list(Opts) -> + subscribe(Client, #{}, [{Topic, Opts}]); +subscribe(Client, Properties, Topics) when is_map(Properties), is_list(Topics) -> + Topics1 = [{Topic, parse_subopt(Opts)} || {Topic, Opts} <- Topics], + gen_statem:call(Client, {subscribe, Properties, Topics1}). + +-spec(subscribe(client(), properties(), topic(), qos() | qos_name() | [subopt()]) + -> subscribe_ret()). +subscribe(Client, Properties, Topic, QoS) + when is_map(Properties), is_binary(Topic), is_atom(QoS) -> + subscribe(Client, Properties, Topic, ?QOS_I(QoS)); +subscribe(Client, Properties, Topic, QoS) + when is_map(Properties), is_binary(Topic), ?IS_QOS(QoS) -> + subscribe(Client, Properties, Topic, [{qos, QoS}]); +subscribe(Client, Properties, Topic, Opts) + when is_map(Properties), is_binary(Topic), is_list(Opts) -> + subscribe(Client, Properties, [{Topic, Opts}]). + +parse_subopt(Opts) -> + parse_subopt(Opts, #{rh => 0, rap => 0, nl => 0, qos => ?QOS_0}). + +parse_subopt([], Result) -> + Result; +parse_subopt([{rh, I} | Opts], Result) when I >= 0, I =< 2 -> + parse_subopt(Opts, Result#{rh := I}); +parse_subopt([{rap, true} | Opts], Result) -> + parse_subopt(Opts, Result#{rap := 1}); +parse_subopt([{rap, false} | Opts], Result) -> + parse_subopt(Opts, Result#{rap := 0}); +parse_subopt([{nl, true} | Opts], Result) -> + parse_subopt(Opts, Result#{nl := 1}); +parse_subopt([{nl, false} | Opts], Result) -> + parse_subopt(Opts, Result#{nl := 0}); +parse_subopt([{qos, QoS} | Opts], Result) -> + parse_subopt(Opts, Result#{qos := ?QOS_I(QoS)}); +parse_subopt([_ | Opts], Result) -> + parse_subopt(Opts, Result). + +-spec(publish(client(), topic(), payload()) -> ok | {error, term()}). +publish(Client, Topic, Payload) when is_binary(Topic) -> + publish(Client, #mqtt_msg{topic = Topic, qos = ?QOS_0, payload = iolist_to_binary(Payload)}). + +-spec(publish(client(), topic(), payload(), qos() | qos_name() | [pubopt()]) + -> ok | {ok, packet_id()} | {error, term()}). +publish(Client, Topic, Payload, QoS) when is_binary(Topic), is_atom(QoS) -> + publish(Client, Topic, Payload, [{qos, ?QOS_I(QoS)}]); +publish(Client, Topic, Payload, QoS) when is_binary(Topic), ?IS_QOS(QoS) -> + publish(Client, Topic, Payload, [{qos, QoS}]); +publish(Client, Topic, Payload, Opts) when is_binary(Topic), is_list(Opts) -> + publish(Client, Topic, #{}, Payload, Opts). + +-spec(publish(client(), topic(), properties(), payload(), [pubopt()]) + -> ok | {ok, packet_id()} | {error, term()}). +publish(Client, Topic, Properties, Payload, Opts) + when is_binary(Topic), is_map(Properties), is_list(Opts) -> + ok = emqtt_props:validate(Properties), + Retain = proplists:get_bool(retain, Opts), + QoS = ?QOS_I(proplists:get_value(qos, Opts, ?QOS_0)), + publish(Client, #mqtt_msg{qos = QoS, + retain = Retain, + topic = Topic, + props = Properties, + payload = iolist_to_binary(Payload)}). + +-spec(publish(client(), #mqtt_msg{}) -> ok | {ok, packet_id()} | {error, term()}). +publish(Client, Msg) -> + gen_statem:call(Client, {publish, Msg}). + +-spec(unsubscribe(client(), topic() | [topic()]) -> subscribe_ret()). +unsubscribe(Client, Topic) when is_binary(Topic) -> + unsubscribe(Client, [Topic]); +unsubscribe(Client, Topics) when is_list(Topics) -> + unsubscribe(Client, #{}, Topics). + +-spec(unsubscribe(client(), properties(), topic() | [topic()]) -> subscribe_ret()). +unsubscribe(Client, Properties, Topic) when is_map(Properties), is_binary(Topic) -> + unsubscribe(Client, Properties, [Topic]); +unsubscribe(Client, Properties, Topics) when is_map(Properties), is_list(Topics) -> + gen_statem:call(Client, {unsubscribe, Properties, Topics}). + +-spec(ping(client()) -> pong). +ping(Client) -> + gen_statem:call(Client, ping). + +-spec(disconnect(client()) -> ok). +disconnect(Client) -> + disconnect(Client, ?RC_SUCCESS). + +-spec(disconnect(client(), reason_code()) -> ok). +disconnect(Client, ReasonCode) -> + disconnect(Client, ReasonCode, #{}). + +-spec(disconnect(client(), reason_code(), properties()) -> ok). +disconnect(Client, ReasonCode, Properties) -> + gen_statem:call(Client, {disconnect, ReasonCode, Properties}). + +%%-------------------------------------------------------------------- +%% For test cases +%%-------------------------------------------------------------------- + +puback(Client, PacketId) when is_integer(PacketId) -> + puback(Client, PacketId, ?RC_SUCCESS). +puback(Client, PacketId, ReasonCode) + when is_integer(PacketId), is_integer(ReasonCode) -> + puback(Client, PacketId, ReasonCode, #{}). +puback(Client, PacketId, ReasonCode, Properties) + when is_integer(PacketId), is_integer(ReasonCode), is_map(Properties) -> + gen_statem:cast(Client, {puback, PacketId, ReasonCode, Properties}). + +pubrec(Client, PacketId) when is_integer(PacketId) -> + pubrec(Client, PacketId, ?RC_SUCCESS). +pubrec(Client, PacketId, ReasonCode) + when is_integer(PacketId), is_integer(ReasonCode) -> + pubrec(Client, PacketId, ReasonCode, #{}). +pubrec(Client, PacketId, ReasonCode, Properties) + when is_integer(PacketId), is_integer(ReasonCode), is_map(Properties) -> + gen_statem:cast(Client, {pubrec, PacketId, ReasonCode, Properties}). + +pubrel(Client, PacketId) when is_integer(PacketId) -> + pubrel(Client, PacketId, ?RC_SUCCESS). +pubrel(Client, PacketId, ReasonCode) + when is_integer(PacketId), is_integer(ReasonCode) -> + pubrel(Client, PacketId, ReasonCode, #{}). +pubrel(Client, PacketId, ReasonCode, Properties) + when is_integer(PacketId), is_integer(ReasonCode), is_map(Properties) -> + gen_statem:cast(Client, {pubrel, PacketId, ReasonCode, Properties}). + +pubcomp(Client, PacketId) when is_integer(PacketId) -> + pubcomp(Client, PacketId, ?RC_SUCCESS). +pubcomp(Client, PacketId, ReasonCode) + when is_integer(PacketId), is_integer(ReasonCode) -> + pubcomp(Client, PacketId, ReasonCode, #{}). +pubcomp(Client, PacketId, ReasonCode, Properties) + when is_integer(PacketId), is_integer(ReasonCode), is_map(Properties) -> + gen_statem:cast(Client, {pubcomp, PacketId, ReasonCode, Properties}). + +subscriptions(Client) -> + gen_statem:call(Client, subscriptions). + +info(Client) -> + gen_statem:call(Client, info). + +stop(Client) -> + gen_statem:call(Client, stop). + +pause(Client) -> + gen_statem:call(Client, pause). + +resume(Client) -> + gen_statem:call(Client, resume). + +%%-------------------------------------------------------------------- +%% gen_statem callbacks +%%-------------------------------------------------------------------- + +init([Options]) -> + process_flag(trap_exit, true), + ClientId = case {proplists:get_value(proto_ver, Options, v4), + proplists:get_value(clientid, Options)} of + {v5, undefined} -> ?NO_CLIENT_ID; + {_ver, undefined} -> random_client_id(); + {_ver, Id} -> iolist_to_binary(Id) + end, + State = init(Options, #state{host = {127,0,0,1}, + port = 1883, + hosts = [], + sock_opts = [], + bridge_mode = false, + clientid = ClientId, + clean_start = true, + proto_ver = ?MQTT_PROTO_V4, + proto_name = <<"MQTT">>, + keepalive = ?DEFAULT_KEEPALIVE, + force_ping = false, + paused = false, + will_flag = false, + will_msg = #mqtt_msg{}, + pending_calls = [], + subscriptions = #{}, + max_inflight = infinity, + inflight = #{}, + awaiting_rel = #{}, + properties = #{}, + auto_ack = true, + ack_timeout = ?DEFAULT_ACK_TIMEOUT, + retry_interval = ?DEFAULT_RETRY_INTERVAL, + connect_timeout = ?DEFAULT_CONNECT_TIMEOUT, + last_packet_id = 1 + }), + {ok, initialized, init_parse_state(State)}. + +random_client_id() -> + rand:seed(exsplus, erlang:timestamp()), + I1 = rand:uniform(round(math:pow(2, 48))) - 1, + I2 = rand:uniform(round(math:pow(2, 32))) - 1, + {ok, Host} = inet:gethostname(), + RandId = io_lib:format("~12.16.0b~8.16.0b", [I1, I2]), + iolist_to_binary(["emqtt-", Host, "-", RandId]). + +init([], State) -> + State; +init([{name, Name} | Opts], State) -> + init(Opts, State#state{name = Name}); +init([{owner, Owner} | Opts], State) when is_pid(Owner) -> + link(Owner), + init(Opts, State#state{owner = Owner}); +init([{msg_handler, Hdlr} | Opts], State) -> + init(Opts, State#state{msg_handler = Hdlr}); +init([{host, Host} | Opts], State) -> + init(Opts, State#state{host = Host}); +init([{port, Port} | Opts], State) -> + init(Opts, State#state{port = Port}); +init([{hosts, Hosts} | Opts], State) -> + Hosts1 = + lists:foldl(fun({Host, Port}, Acc) -> + [{Host, Port}|Acc]; + (Host, Acc) -> + [{Host, 1883}|Acc] + end, [], Hosts), + init(Opts, State#state{hosts = Hosts1}); +init([{tcp_opts, TcpOpts} | Opts], State = #state{sock_opts = SockOpts}) -> + init(Opts, State#state{sock_opts = merge_opts(SockOpts, TcpOpts)}); +init([{ssl, EnableSsl} | Opts], State) -> + case lists:keytake(ssl_opts, 1, Opts) of + {value, SslOpts, WithOutSslOpts} -> + init([SslOpts, {ssl, EnableSsl}| WithOutSslOpts], State); + false -> + init([{ssl_opts, []}, {ssl, EnableSsl}| Opts], State) + end; +init([{ssl_opts, SslOpts} | Opts], State = #state{sock_opts = SockOpts}) -> + case lists:keytake(ssl, 1, Opts) of + {value, {ssl, true}, WithOutEnableSsl} -> + ok = ssl:start(), + SockOpts1 = merge_opts(SockOpts, [{ssl_opts, SslOpts}]), + init(WithOutEnableSsl, State#state{sock_opts = SockOpts1}); + {value, {ssl, false}, WithOutEnableSsl} -> + init(WithOutEnableSsl, State); + false -> + init(Opts, State) + end; +init([{ws_path, Path} | Opts], State = #state{sock_opts = SockOpts}) -> + init(Opts, State#state{sock_opts = [{ws_path, Path}|SockOpts]}); +init([{clientid, ClientId} | Opts], State) -> + init(Opts, State#state{clientid = iolist_to_binary(ClientId)}); +init([{clean_start, CleanStart} | Opts], State) when is_boolean(CleanStart) -> + init(Opts, State#state{clean_start = CleanStart}); +init([{username, Username} | Opts], State) -> + init(Opts, State#state{username = iolist_to_binary(Username)}); +init([{password, Password} | Opts], State) -> + init(Opts, State#state{password = iolist_to_binary(Password)}); +init([{keepalive, Secs} | Opts], State) -> + init(Opts, State#state{keepalive = Secs}); +init([{proto_ver, v3} | Opts], State) -> + init(Opts, State#state{proto_ver = ?MQTT_PROTO_V3, + proto_name = <<"MQIsdp">>}); +init([{proto_ver, v4} | Opts], State) -> + init(Opts, State#state{proto_ver = ?MQTT_PROTO_V4, + proto_name = <<"MQTT">>}); +init([{proto_ver, v5} | Opts], State) -> + init(Opts, State#state{proto_ver = ?MQTT_PROTO_V5, + proto_name = <<"MQTT">>}); +init([{will_topic, Topic} | Opts], State = #state{will_msg = WillMsg}) -> + WillMsg1 = init_will_msg({topic, Topic}, WillMsg), + init(Opts, State#state{will_flag = true, will_msg = WillMsg1}); +init([{will_props, Properties} | Opts], State = #state{will_msg = WillMsg}) -> + init(Opts, State#state{will_msg = init_will_msg({props, Properties}, WillMsg)}); +init([{will_payload, Payload} | Opts], State = #state{will_msg = WillMsg}) -> + init(Opts, State#state{will_msg = init_will_msg({payload, Payload}, WillMsg)}); +init([{will_retain, Retain} | Opts], State = #state{will_msg = WillMsg}) -> + init(Opts, State#state{will_msg = init_will_msg({retain, Retain}, WillMsg)}); +init([{will_qos, QoS} | Opts], State = #state{will_msg = WillMsg}) -> + init(Opts, State#state{will_msg = init_will_msg({qos, QoS}, WillMsg)}); +init([{connect_timeout, Timeout}| Opts], State) -> + init(Opts, State#state{connect_timeout = timer:seconds(Timeout)}); +init([{ack_timeout, Timeout}| Opts], State) -> + init(Opts, State#state{ack_timeout = timer:seconds(Timeout)}); +init([force_ping | Opts], State) -> + init(Opts, State#state{force_ping = true}); +init([{force_ping, ForcePing} | Opts], State) when is_boolean(ForcePing) -> + init(Opts, State#state{force_ping = ForcePing}); +init([{properties, Properties} | Opts], State = #state{properties = InitProps}) -> + init(Opts, State#state{properties = maps:merge(InitProps, Properties)}); +init([{max_inflight, infinity} | Opts], State) -> + init(Opts, State#state{max_inflight = infinity, + inflight = #{}}); +init([{max_inflight, I} | Opts], State) when is_integer(I) -> + init(Opts, State#state{max_inflight = I, + inflight = #{}}); +init([auto_ack | Opts], State) -> + init(Opts, State#state{auto_ack = true}); +init([{auto_ack, AutoAck} | Opts], State) when is_boolean(AutoAck) -> + init(Opts, State#state{auto_ack = AutoAck}); +init([{retry_interval, I} | Opts], State) -> + init(Opts, State#state{retry_interval = timer:seconds(I)}); +init([{bridge_mode, Mode} | Opts], State) when is_boolean(Mode) -> + init(Opts, State#state{bridge_mode = Mode}); +init([_Opt | Opts], State) -> + init(Opts, State). + +init_will_msg({topic, Topic}, WillMsg) -> + WillMsg#mqtt_msg{topic = iolist_to_binary(Topic)}; +init_will_msg({props, Props}, WillMsg) -> + WillMsg#mqtt_msg{props = Props}; +init_will_msg({payload, Payload}, WillMsg) -> + WillMsg#mqtt_msg{payload = iolist_to_binary(Payload)}; +init_will_msg({retain, Retain}, WillMsg) when is_boolean(Retain) -> + WillMsg#mqtt_msg{retain = Retain}; +init_will_msg({qos, QoS}, WillMsg) -> + WillMsg#mqtt_msg{qos = ?QOS_I(QoS)}. + +init_parse_state(State = #state{proto_ver = Ver, properties = Properties}) -> + MaxSize = maps:get('Maximum-Packet-Size', Properties, ?MAX_PACKET_SIZE), + ParseState = emqtt_frame:initial_parse_state( + #{max_size => MaxSize, version => Ver}), + State#state{parse_state = ParseState}. + +merge_opts(Defaults, Options) -> + lists:foldl( + fun({Opt, Val}, Acc) -> + lists:keystore(Opt, 1, Acc, {Opt, Val}); + (Opt, Acc) -> + lists:usort([Opt | Acc]) + end, Defaults, Options). + +callback_mode() -> state_functions. + +initialized({call, From}, connect, State = #state{sock_opts = SockOpts, connect_timeout = Timeout}) -> + case sock_connect(hosts(State), SockOpts, Timeout) of + {ok, Sock} -> + case mqtt_connect(run_sock(State#state{socket = Sock})) of + {ok, NewState} -> + {next_state, waiting_for_connack, + add_call(new_call(connect, From), NewState), [Timeout]}; + Error = {error, Reason} -> + {stop_and_reply, Reason, [{reply, From, Error}]} + end; + Error = {error, Reason} -> + {stop_and_reply, {shutdown, Reason}, [{reply, From, Error}]} + end; + +initialized(EventType, EventContent, State) -> + handle_event(EventType, EventContent, initialized, State). + +mqtt_connect(State = #state{clientid = ClientId, + clean_start = CleanStart, + bridge_mode = IsBridge, + username = Username, + password = Password, + proto_ver = ProtoVer, + proto_name = ProtoName, + keepalive = KeepAlive, + will_flag = WillFlag, + will_msg = WillMsg, + properties = Properties}) -> + ?WILL_MSG(WillQoS, WillRetain, WillTopic, WillProps, WillPayload) = WillMsg, + ConnProps = emqtt_props:filter(?CONNECT, Properties), + send(?CONNECT_PACKET( + #mqtt_packet_connect{proto_ver = ProtoVer, + proto_name = ProtoName, + is_bridge = IsBridge, + clean_start = CleanStart, + will_flag = WillFlag, + will_qos = WillQoS, + will_retain = WillRetain, + keepalive = KeepAlive, + properties = ConnProps, + clientid = ClientId, + will_props = WillProps, + will_topic = WillTopic, + will_payload = WillPayload, + username = Username, + password = Password}), State). + +waiting_for_connack(cast, ?CONNACK_PACKET(?RC_SUCCESS, + SessPresent, + Properties), + State = #state{properties = AllProps, + clientid = ClientId}) -> + case take_call(connect, State) of + {value, #call{from = From}, State1} -> + AllProps1 = case Properties of + undefined -> AllProps; + _ -> maps:merge(AllProps, Properties) + end, + Reply = {ok, Properties}, + State2 = State1#state{clientid = assign_id(ClientId, AllProps1), + properties = AllProps1, + session_present = SessPresent}, + {next_state, connected, ensure_keepalive_timer(State2), + [{reply, From, Reply}]}; + false -> + {stop, bad_connack} + end; + +waiting_for_connack(cast, ?CONNACK_PACKET(ReasonCode, + _SessPresent, + Properties), + State = #state{proto_ver = ProtoVer}) -> + Reason = reason_code_name(ReasonCode, ProtoVer), + case take_call(connect, State) of + {value, #call{from = From}, _State} -> + Reply = {error, {Reason, Properties}}, + {stop_and_reply, {shutdown, Reason}, [{reply, From, Reply}]}; + false -> {stop, connack_error} + end; + +waiting_for_connack(timeout, _Timeout, State) -> + case take_call(connect, State) of + {value, #call{from = From}, _State} -> + Reply = {error, connack_timeout}, + {stop_and_reply, connack_timeout, [{reply, From, Reply}]}; + false -> {stop, connack_timeout} + end; + +waiting_for_connack(EventType, EventContent, State) -> + case take_call(connect, State) of + {value, #call{from = From}, _State} -> + case handle_event(EventType, EventContent, waiting_for_connack, State) of + {stop, Reason, State} -> + Reply = {error, {Reason, EventContent}}, + {stop_and_reply, Reason, [{reply, From, Reply}]}; + StateCallbackResult -> + StateCallbackResult + end; + false -> + {stop, connack_timeout} + end. + +connected({call, From}, subscriptions, #state{subscriptions = Subscriptions}) -> + {keep_state_and_data, [{reply, From, maps:to_list(Subscriptions)}]}; + +connected({call, From}, info, State) -> + Info = lists:zip(record_info(fields, state), tl(tuple_to_list(State))), + {keep_state_and_data, [{reply, From, Info}]}; + +connected({call, From}, pause, State) -> + {keep_state, State#state{paused = true}, [{reply, From, ok}]}; + +connected({call, From}, resume, State) -> + {keep_state, State#state{paused = false}, [{reply, From, ok}]}; + +connected({call, From}, clientid, #state{clientid = ClientId}) -> + {keep_state_and_data, [{reply, From, ClientId}]}; + +connected({call, From}, SubReq = {subscribe, Properties, Topics}, + State = #state{last_packet_id = PacketId, subscriptions = Subscriptions}) -> + case send(?SUBSCRIBE_PACKET(PacketId, Properties, Topics), State) of + {ok, NewState} -> + Call = new_call({subscribe, PacketId}, From, SubReq), + Subscriptions1 = + lists:foldl(fun({Topic, Opts}, Acc) -> + maps:put(Topic, Opts, Acc) + end, Subscriptions, Topics), + {keep_state, ensure_ack_timer(add_call(Call,NewState#state{subscriptions = Subscriptions1}))}; + Error = {error, Reason} -> + {stop_and_reply, Reason, [{reply, From, Error}]} + end; + +connected({call, From}, {publish, Msg = #mqtt_msg{qos = ?QOS_0}}, State) -> + case send(Msg, State) of + {ok, NewState} -> + {keep_state, NewState, [{reply, From, ok}]}; + Error = {error, Reason} -> + {stop_and_reply, Reason, [{reply, From, Error}]} + end; + +connected({call, From}, {publish, Msg = #mqtt_msg{qos = QoS}}, + State = #state{inflight = Inflight, last_packet_id = PacketId}) + when (QoS =:= ?QOS_1); (QoS =:= ?QOS_2) -> + Msg1 = Msg#mqtt_msg{packet_id = PacketId}, + case send(Msg1, State) of + {ok, NewState} -> + Inflight1 = maps:put(PacketId, {publish, Msg1, os:timestamp()}, Inflight), + State1 = ensure_retry_timer(NewState#state{inflight = Inflight1}), + Actions = [{reply, From, {ok, PacketId}}], + case is_inflight_full(State1) of + true -> {next_state, inflight_full, State1, Actions}; + false -> {keep_state, State1, Actions} + end; + {error, Reason} -> + {stop_and_reply, Reason, [{reply, From, {error, {PacketId, Reason}}}]} + end; + +connected({call, From}, UnsubReq = {unsubscribe, Properties, Topics}, + State = #state{last_packet_id = PacketId}) -> + case send(?UNSUBSCRIBE_PACKET(PacketId, Properties, Topics), State) of + {ok, NewState} -> + Call = new_call({unsubscribe, PacketId}, From, UnsubReq), + {keep_state, ensure_ack_timer(add_call(Call, NewState))}; + Error = {error, Reason} -> + {stop_and_reply, Reason, [{reply, From, Error}]} + end; + +connected({call, From}, ping, State) -> + case send(?PACKET(?PINGREQ), State) of + {ok, NewState} -> + Call = new_call(ping, From), + {keep_state, ensure_ack_timer(add_call(Call, NewState))}; + Error = {error, Reason} -> + {stop_and_reply, Reason, [{reply, From, Error}]} + end; + +connected({call, From}, {disconnect, ReasonCode, Properties}, State) -> + case send(?DISCONNECT_PACKET(ReasonCode, Properties), State) of + {ok, NewState} -> + {stop_and_reply, normal, [{reply, From, ok}], NewState}; + Error = {error, Reason} -> + {stop_and_reply, Reason, [{reply, From, Error}]} + end; + +connected(cast, {puback, PacketId, ReasonCode, Properties}, State) -> + send_puback(?PUBACK_PACKET(PacketId, ReasonCode, Properties), State); + +connected(cast, {pubrec, PacketId, ReasonCode, Properties}, State) -> + send_puback(?PUBREC_PACKET(PacketId, ReasonCode, Properties), State); + +connected(cast, {pubrel, PacketId, ReasonCode, Properties}, State) -> + send_puback(?PUBREL_PACKET(PacketId, ReasonCode, Properties), State); + +connected(cast, {pubcomp, PacketId, ReasonCode, Properties}, State) -> + send_puback(?PUBCOMP_PACKET(PacketId, ReasonCode, Properties), State); + +connected(cast, ?PUBLISH_PACKET(_QoS, _PacketId), #state{paused = true}) -> + keep_state_and_data; + +connected(cast, Packet = ?PUBLISH_PACKET(?QOS_0, _PacketId), State) -> + {keep_state, deliver(packet_to_msg(Packet), State)}; + +connected(cast, Packet = ?PUBLISH_PACKET(?QOS_1, _PacketId), State) -> + publish_process(?QOS_1, Packet, State); + +connected(cast, Packet = ?PUBLISH_PACKET(?QOS_2, _PacketId), State) -> + publish_process(?QOS_2, Packet, State); + +connected(cast, ?PUBACK_PACKET(_PacketId, _ReasonCode, _Properties) = PubAck, State) -> + {keep_state, delete_inflight(PubAck, State)}; + +connected(cast, ?PUBREC_PACKET(PacketId), State = #state{inflight = Inflight}) -> + NState = case maps:find(PacketId, Inflight) of + {ok, {publish, _Msg, _Ts}} -> + Inflight1 = maps:put(PacketId, {pubrel, PacketId, os:timestamp()}, Inflight), + State#state{inflight = Inflight1}; + {ok, {pubrel, _Ref, _Ts}} -> + lager:notice("[emqtt] Duplicated PUBREC Packet: ~p, client_id: ~p", [PacketId, State#state.clientid]), + State; + error -> + lager:warning("[emqtt] Unexpected PUBREC Packet: ~p, client_id: ~p", [PacketId, State#state.clientid]), + State + end, + send_puback(?PUBREL_PACKET(PacketId), NState); + +connected(cast, ?PUBREC_PACKET(PacketId, ReasonCode), State) -> + lager:notice("[emqtt] Duplicated PUBREC Packet: ~p, reason_code: ~p, client_id: ~p", [PacketId, ReasonCode, State#state.clientid]), + keep_state_and_data; + +%%TODO::... if auto_ack is false, should we take PacketId from the map? +connected(cast, ?PUBREL_PACKET(PacketId), + State = #state{awaiting_rel = AwaitingRel, auto_ack = AutoAck}) -> + case maps:take(PacketId, AwaitingRel) of + {Packet, AwaitingRel1} -> + NewState = deliver(packet_to_msg(Packet), State#state{awaiting_rel = AwaitingRel1}), + case AutoAck of + true -> send_puback(?PUBCOMP_PACKET(PacketId), NewState); + false -> {keep_state, NewState} + end; + error -> + lager:warning("[emqtt] Unexpected PUBREL: ~p, client_id: ~p", [PacketId, State#state.clientid]), + keep_state_and_data + end; + +connected(cast, ?PUBCOMP_PACKET(_PacketId, _ReasonCode, _Properties) = PubComp, State) -> + {keep_state, delete_inflight(PubComp, State)}; + +connected(cast, ?SUBACK_PACKET(PacketId, Properties, ReasonCodes), + State = #state{subscriptions = _Subscriptions}) -> + case take_call({subscribe, PacketId}, State) of + {value, #call{from = From}, NewState} -> + %%TODO: Merge reason codes to subscriptions? + Reply = {ok, Properties, ReasonCodes}, + {keep_state, NewState, [{reply, From, Reply}]}; + false -> + keep_state_and_data + end; + +connected(cast, ?UNSUBACK_PACKET(PacketId, Properties, ReasonCodes), + State = #state{subscriptions = Subscriptions}) -> + case take_call({unsubscribe, PacketId}, State) of + {value, #call{from = From, req = {_, _, Topics}}, NewState} -> + Subscriptions1 = + lists:foldl(fun(Topic, Acc) -> + maps:remove(Topic, Acc) + end, Subscriptions, Topics), + {keep_state, NewState#state{subscriptions = Subscriptions1}, + [{reply, From, {ok, Properties, ReasonCodes}}]}; + false -> + keep_state_and_data + end; + +connected(cast, ?PACKET(?PINGRESP), #state{pending_calls = []}) -> + keep_state_and_data; +connected(cast, ?PACKET(?PINGRESP), State) -> + case take_call(ping, State) of + {value, #call{from = From}, NewState} -> + {keep_state, NewState, [{reply, From, pong}]}; + false -> + keep_state_and_data + end; + +connected(cast, ?DISCONNECT_PACKET(ReasonCode, Properties), State) -> + {stop, {disconnected, ReasonCode, Properties}, State}; + +connected(info, {timeout, _TRef, keepalive}, State = #state{force_ping = true}) -> + case send(?PACKET(?PINGREQ), State) of + {ok, NewState} -> + {keep_state, ensure_keepalive_timer(NewState)}; + Error -> {stop, Error} + end; + +connected(info, {timeout, TRef, keepalive}, State = #state{socket = Sock, paused = Paused, keepalive_timer = TRef}) -> + case (not Paused) andalso should_ping(Sock) of + true -> + case send(?PACKET(?PINGREQ), State) of + {ok, NewState} -> + {ok, [{send_oct, Val}]} = emqtt_sock:getstat(Sock, [send_oct]), + put(send_oct, Val), + {keep_state, ensure_keepalive_timer(NewState), [hibernate]}; + Error -> {stop, Error} + end; + false -> + {keep_state, ensure_keepalive_timer(State), [hibernate]}; + {error, Reason} -> + {stop, Reason} + end; + +connected(info, {timeout, TRef, ack}, State = #state{ack_timer = TRef, + ack_timeout = Timeout, + pending_calls = Calls}) -> + NewState = State#state{ack_timer = undefined, + pending_calls = timeout_calls(Timeout, Calls)}, + {keep_state, ensure_ack_timer(NewState)}; + +connected(info, {timeout, TRef, retry}, State = #state{retry_timer = TRef, inflight = Inflight}) -> + case maps:size(Inflight) == 0 of + true -> {keep_state, State#state{retry_timer = undefined}}; + false -> retry_send(State) + end; + +connected(EventType, EventContent, Data) -> + handle_event(EventType, EventContent, connected, Data). + +inflight_full({call, _From}, {publish, #mqtt_msg{qos = QoS}}, _State) when (QoS =:= ?QOS_1); (QoS =:= ?QOS_2) -> + {keep_state_and_data, [postpone]}; +inflight_full(cast, ?PUBACK_PACKET(_PacketId, _ReasonCode, _Properties) = PubAck, State) -> + delete_inflight_when_full(PubAck, State); +inflight_full(cast, ?PUBCOMP_PACKET(_PacketId, _ReasonCode, _Properties) = PubComp, State) -> + delete_inflight_when_full(PubComp, State); +inflight_full(EventType, EventContent, Data) -> + %% inflight_full is a sub-state of connected state, + %% delegate all other events to connected state. + connected(EventType, EventContent, Data). + +handle_event({call, From}, stop, _StateName, _State) -> + {stop_and_reply, normal, [{reply, From, ok}]}; + +handle_event(info, {TcpOrSsL, _Sock, Data}, _StateName, State) when TcpOrSsL =:= tcp; TcpOrSsL =:= ssl -> + %lager:debug("[emqtt] RECV Data: ~p, client_id: ~p", [Data, State#state.clientid]), + process_incoming(Data, [], run_sock(State)); + +handle_event(info, {Error, _Sock, Reason}, _StateName, State) when Error =:= tcp_error; Error =:= ssl_error -> + lager:error("[emqtt] The connection error occured ~p, reason:~p, client_id: ~p", [Error, Reason, State#state.clientid]), + {stop, {shutdown, Reason}, State}; + +handle_event(info, {Closed, _Sock}, _StateName, State) when Closed =:= tcp_closed; Closed =:= ssl_closed -> + lager:debug("[emqtt] sokcet closed: ~p, client_id: ~p", [Closed, State#state.clientid]), + {stop, {shutdown, Closed}, State}; + +handle_event(info, {'EXIT', Owner, Reason}, _, State = #state{owner = Owner}) -> + lager:warning("[emqtt] Got EXIT from owner, Reason: ~p, client_id: ~p", [Reason, State#state.clientid]), + {stop, {shutdown, Reason}, State}; + +handle_event(info, {inet_reply, _Sock, ok}, _, _State) -> + keep_state_and_data; + +handle_event(info, {inet_reply, _Sock, {error, Reason}}, _, State) -> + lager:error("[emqtt] Got tcp error: ~p, client_id: ~p", [Reason, State#state.clientid]), + {stop, {shutdown, Reason}, State}; + +handle_event(info, EventContent = {'EXIT', Pid, Reason}, StateName, State) -> + lager:warning("[emqtt] State: ~s, Unexpected Event: (info, ~p), from pid: ~p, client_id: ~p", [StateName, EventContent, Pid, State#state.clientid]), + {stop, {shutdown, Reason}, State}; + +handle_event(EventType, EventContent, StateName, State) -> + lager:error("[emqtt] State: ~s, Unexpected Event: (~p, ~p), client_id: ~p", [StateName, EventType, EventContent, State#state.clientid]), + keep_state_and_data. + +%% Mandatory callback functions +terminate(Reason, _StateName, State = #state{socket = Socket}) -> + case Reason of + {disconnected, ReasonCode, Properties} -> + %% backward compatible + ok = eval_msg_handler(State, disconnected, {ReasonCode, Properties}); + _ -> + ok = eval_msg_handler(State, disconnected, Reason) + end, + case Socket =:= undefined of + true -> ok; + _ -> emqtt_sock:close(Socket) + end. + +code_change(_Vsn, State, Data, _Extra) -> + {ok, State, Data}. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +should_ping(Sock) -> + case emqtt_sock:getstat(Sock, [send_oct]) of + {ok, [{send_oct, Val}]} -> + OldVal = get(send_oct), put(send_oct, Val), + OldVal == undefined orelse OldVal == Val; + Error = {error, _Reason} -> + Error + end. + +is_inflight_full(#state{max_inflight = infinity}) -> + false; +is_inflight_full(#state{max_inflight = MaxLimit, inflight = Inflight}) -> + maps:size(Inflight) >= MaxLimit. + +delete_inflight(?PUBACK_PACKET(PacketId, ReasonCode, Properties), + State = #state{inflight = Inflight}) -> + case maps:find(PacketId, Inflight) of + {ok, {publish, #mqtt_msg{packet_id = PacketId}, _Ts}} -> + ok = eval_msg_handler(State, puback, #{packet_id => PacketId, + reason_code => ReasonCode, + properties => Properties}), + State#state{inflight = maps:remove(PacketId, Inflight)}; + error -> + lager:warning("[emqtt] Unexpected PUBACK: ~p, client_id: ~p", [PacketId, State#state.clientid]), + State + end; +delete_inflight(?PUBCOMP_PACKET(PacketId, ReasonCode, Properties), + State = #state{inflight = Inflight}) -> + case maps:find(PacketId, Inflight) of + {ok, {pubrel, _PacketId, _Ts}} -> + ok = eval_msg_handler(State, puback, #{packet_id => PacketId, + reason_code => ReasonCode, + properties => Properties}), + State#state{inflight = maps:remove(PacketId, Inflight)}; + error -> + lager:warning("[emqtt] Unexpected PUBCOMP Packet: ~p, client_id: ~p", [PacketId, State#state.clientid]), + State + end. + +delete_inflight_when_full(Packet, State) -> + State1 = delete_inflight(Packet, State), + case is_inflight_full(State1) of + true -> + {keep_state, State1}; + false -> + {next_state, connected, State1} + end. + +assign_id(?NO_CLIENT_ID, Props) -> + case maps:find('Assigned-Client-Identifier', Props) of + {ok, Value} -> + Value; + _ -> + error(bad_client_id) + end; +assign_id(Id, _Props) -> + Id. + +publish_process(?QOS_1, Packet = ?PUBLISH_PACKET(?QOS_1, PacketId), + State0 = #state{auto_ack = AutoAck}) -> + State = deliver(packet_to_msg(Packet), State0), + case AutoAck of + true -> + send_puback(?PUBACK_PACKET(PacketId), State); + false -> + {keep_state, State} + end; +publish_process(?QOS_2, Packet = ?PUBLISH_PACKET(?QOS_2, PacketId), + State = #state{awaiting_rel = AwaitingRel}) -> + case send_puback(?PUBREC_PACKET(PacketId), State) of + {keep_state, NewState} -> + AwaitingRel1 = maps:put(PacketId, Packet, AwaitingRel), + {keep_state, NewState#state{awaiting_rel = AwaitingRel1}}; + Stop -> + Stop + end. + +ensure_keepalive_timer(State = ?PROPERTY('Server-Keep-Alive', Secs)) -> + ensure_keepalive_timer(timer:seconds(Secs), State#state{keepalive = Secs}); +ensure_keepalive_timer(State = #state{keepalive = 0}) -> + State; +ensure_keepalive_timer(State = #state{keepalive = I}) -> + ensure_keepalive_timer(timer:seconds(I), State). +ensure_keepalive_timer(I, State) when is_integer(I) -> + State#state{keepalive_timer = erlang:start_timer(I, self(), keepalive)}. + +new_call(Id, From) -> + new_call(Id, From, undefined). +new_call(Id, From, Req) -> + #call{id = Id, from = From, req = Req, ts = os:timestamp()}. + +add_call(Call, Data = #state{pending_calls = Calls}) -> + Data#state{pending_calls = [Call | Calls]}. + +take_call(Id, Data = #state{pending_calls = Calls}) -> + case lists:keytake(Id, #call.id, Calls) of + {value, Call, Left} -> + {value, Call, Data#state{pending_calls = Left}}; + false -> false + end. + +timeout_calls(Timeout, Calls) -> + timeout_calls(os:timestamp(), Timeout, Calls). +timeout_calls(Now, Timeout, Calls) -> + lists:foldl(fun(C = #call{from = From, ts = Ts}, Acc) -> + case (timer:now_diff(Now, Ts) div 1000) >= Timeout of + true -> + gen_statem:reply(From, {error, ack_timeout}), + Acc; + false -> [C | Acc] + end + end, [], Calls). + +ensure_ack_timer(State = #state{ack_timer = undefined, + ack_timeout = Timeout, + pending_calls = Calls}) when length(Calls) > 0 -> + State#state{ack_timer = erlang:start_timer(Timeout, self(), ack)}; +ensure_ack_timer(State) -> State. + +ensure_retry_timer(State = #state{retry_interval = Interval}) -> + do_ensure_retry_timer(Interval, State). + +do_ensure_retry_timer(Interval, State = #state{retry_timer = undefined}) + when Interval > 0 -> + State#state{retry_timer = erlang:start_timer(Interval, self(), retry)}; +do_ensure_retry_timer(_Interval, State) -> + State. + +retry_send(State = #state{inflight = Inflight}) -> + SortFun = fun({_, _, Ts1}, {_, _, Ts2}) -> Ts1 < Ts2 end, + Msgs = lists:sort(SortFun, maps:values(Inflight)), + retry_send(Msgs, os:timestamp(), State ). + +retry_send([], _Now, State) -> + {keep_state, ensure_retry_timer(State)}; +retry_send([{Type, Msg, Ts} | Msgs], Now, State = #state{retry_interval = Interval}) -> + Diff = timer:now_diff(Now, Ts) div 1000, %% micro -> ms + case (Diff >= Interval) of + true -> case retry_send(Type, Msg, Now, State) of + {ok, NewState} -> retry_send(Msgs, Now, NewState); + {error, Error} -> {stop, Error} + end; + false -> {keep_state, do_ensure_retry_timer(Interval - Diff, State)} + end. + +retry_send(publish, Msg = #mqtt_msg{qos = QoS, packet_id = PacketId}, + Now, State = #state{inflight = Inflight}) -> + Msg1 = Msg#mqtt_msg{dup = (QoS =:= ?QOS_1)}, + case send(Msg1, State) of + {ok, NewState} -> + Inflight1 = maps:put(PacketId, {publish, Msg1, Now}, Inflight), + {ok, NewState#state{inflight = Inflight1}}; + Error = {error, _Reason} -> + Error + end; +retry_send(pubrel, PacketId, Now, State = #state{inflight = Inflight}) -> + case send(?PUBREL_PACKET(PacketId), State) of + {ok, NewState} -> + Inflight1 = maps:put(PacketId, {pubrel, PacketId, Now}, Inflight), + {ok, NewState#state{inflight = Inflight1}}; + Error = {error, _Reason} -> + Error + end. + +deliver(#mqtt_msg{qos = QoS, dup = Dup, retain = Retain, packet_id = PacketId, + topic = Topic, props = Props, payload = Payload}, + State) -> + Msg = #{qos => QoS, dup => Dup, retain => Retain, packet_id => PacketId, + topic => Topic, properties => Props, payload => Payload, + client_pid => self()}, + ok = eval_msg_handler(State, publish, Msg), + State. + +eval_msg_handler(#state{msg_handler = ?NO_MSG_HDLR, + owner = Owner}, + disconnected, {ReasonCode, Properties}) when is_integer(ReasonCode) -> + %% Special handling for disconnected message when there is no handler callback + Owner ! {disconnected, ReasonCode, Properties}, + ok; +eval_msg_handler(#state{msg_handler = ?NO_MSG_HDLR}, + disconnected, _OtherReason) -> + %% do nothing to be backward compatible + ok; +eval_msg_handler(#state{msg_handler = ?NO_MSG_HDLR, + owner = Owner}, Kind, Msg) -> + Owner ! {Kind, Msg}, + ok; +eval_msg_handler(#state{msg_handler = Handler}, Kind, Msg) -> + F = maps:get(Kind, Handler), + _ = apply_handler_function(F, Msg), + ok. + +apply_handler_function(F, Msg) + when is_function(F) -> + erlang:apply(F, [Msg]); +apply_handler_function({F, A}, Msg) + when is_function(F), + is_list(A) -> + erlang:apply(F, [Msg] ++ A); +apply_handler_function({M, F, A}, Msg) + when is_atom(M), + is_atom(F), + is_list(A) -> + erlang:apply(M, F, [Msg] ++ A). + +packet_to_msg(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, + dup = Dup, + qos = QoS, + retain = R}, + variable = #mqtt_packet_publish{topic_name = Topic, + packet_id = PacketId, + properties = Props}, + payload = Payload}) -> + #mqtt_msg{qos = QoS, retain = R, dup = Dup, packet_id = PacketId, + topic = Topic, props = Props, payload = Payload}. + +msg_to_packet(#mqtt_msg{qos = QoS, dup = Dup, retain = Retain, packet_id = PacketId, + topic = Topic, props = Props, payload = Payload}) -> + #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, + qos = QoS, + retain = Retain, + dup = Dup}, + variable = #mqtt_packet_publish{topic_name = Topic, + packet_id = PacketId, + properties = Props}, + payload = Payload}. + +%%-------------------------------------------------------------------- +%% Socket Connect/Send + +sock_connect(Hosts, SockOpts, Timeout) -> + sock_connect(Hosts, SockOpts, Timeout, {error, no_hosts}). + +sock_connect([], _SockOpts, _Timeout, LastErr) -> + LastErr; +sock_connect([{Host, Port} | Hosts], SockOpts, Timeout, _LastErr) -> + case emqtt_sock:connect(Host, Port, SockOpts, Timeout) of + {ok, SockOrPid} -> + {ok, SockOrPid}; + Error = {error, _Reason} -> + sock_connect(Hosts, SockOpts, Timeout, Error) + end. + +hosts(#state{hosts = [], host = Host, port = Port}) -> + [{Host, Port}]; +hosts(#state{hosts = Hosts}) -> Hosts. + +send_puback(Packet, State) -> + case send(Packet, State) of + {ok, NewState} -> {keep_state, NewState}; + {error, Reason} -> {stop, {shutdown, Reason}} + end. + +send(Msg, State) when is_record(Msg, mqtt_msg) -> + send(msg_to_packet(Msg), State); + +send(Packet, State = #state{socket = Sock, proto_ver = Ver}) + when is_record(Packet, mqtt_packet) -> + Data = emqtt_frame:serialize(Packet, Ver), + case emqtt_sock:send(Sock, Data) of + ok -> + {ok, bump_last_packet_id(State)}; + Error -> + Error + end. + +run_sock(State = #state{socket = Sock}) -> + emqtt_sock:setopts(Sock, [{active, once}]), State. + +%%-------------------------------------------------------------------- +%% Process incomming + +process_incoming(<<>>, Packets, State) -> + {keep_state, State, next_events(Packets)}; + +process_incoming(Bytes, Packets, State = #state{parse_state = ParseState}) -> + try emqtt_frame:parse(Bytes, ParseState) of + {ok, Packet, Rest, NParseState} -> + process_incoming(Rest, [Packet|Packets], State#state{parse_state = NParseState}); + {more, NParseState} -> + {keep_state, State#state{parse_state = NParseState}, next_events(Packets)} + catch + error:Error -> + {stop, Error} + end. + +-compile({inline, [next_events/1]}). +next_events([]) -> []; +next_events([Packet]) -> + {next_event, cast, Packet}; +next_events(Packets) -> + [{next_event, cast, Packet} || Packet <- lists:reverse(Packets)]. + +%%-------------------------------------------------------------------- +%% packet_id generation + +bump_last_packet_id(State = #state{last_packet_id = Id}) -> + State#state{last_packet_id = next_packet_id(Id)}. + +-spec next_packet_id(packet_id()) -> packet_id(). +next_packet_id(?MAX_PACKET_ID) -> + 1; +next_packet_id(Id) -> + Id + 1. + +%%-------------------------------------------------------------------- +%% ReasonCode Name + +reason_code_name(I, Ver) when Ver >= ?MQTT_PROTO_V5 -> + reason_code_name(I); +reason_code_name(0, _Ver) -> connection_accepted; +reason_code_name(1, _Ver) -> unacceptable_protocol_version; +reason_code_name(2, _Ver) -> client_identifier_not_valid; +reason_code_name(3, _Ver) -> server_unavaliable; +reason_code_name(4, _Ver) -> malformed_username_or_password; +reason_code_name(5, _Ver) -> unauthorized_client; +reason_code_name(_, _Ver) -> unknown_error. + +reason_code_name(16#00) -> success; +reason_code_name(16#01) -> granted_qos1; +reason_code_name(16#02) -> granted_qos2; +reason_code_name(16#04) -> disconnect_with_will_message; +reason_code_name(16#10) -> no_matching_subscribers; +reason_code_name(16#11) -> no_subscription_existed; +reason_code_name(16#18) -> continue_authentication; +reason_code_name(16#19) -> re_authenticate; +reason_code_name(16#80) -> unspecified_error; +reason_code_name(16#81) -> malformed_Packet; +reason_code_name(16#82) -> protocol_error; +reason_code_name(16#83) -> implementation_specific_error; +reason_code_name(16#84) -> unsupported_protocol_version; +reason_code_name(16#85) -> client_identifier_not_valid; +reason_code_name(16#86) -> bad_username_or_password; +reason_code_name(16#87) -> not_authorized; +reason_code_name(16#88) -> server_unavailable; +reason_code_name(16#89) -> server_busy; +reason_code_name(16#8A) -> banned; +reason_code_name(16#8B) -> server_shutting_down; +reason_code_name(16#8C) -> bad_authentication_method; +reason_code_name(16#8D) -> keepalive_timeout; +reason_code_name(16#8E) -> session_taken_over; +reason_code_name(16#8F) -> topic_filter_invalid; +reason_code_name(16#90) -> topic_name_invalid; +reason_code_name(16#91) -> packet_identifier_inuse; +reason_code_name(16#92) -> packet_identifier_not_found; +reason_code_name(16#93) -> receive_maximum_exceeded; +reason_code_name(16#94) -> topic_alias_invalid; +reason_code_name(16#95) -> packet_too_large; +reason_code_name(16#96) -> message_rate_too_high; +reason_code_name(16#97) -> quota_exceeded; +reason_code_name(16#98) -> administrative_action; +reason_code_name(16#99) -> payload_format_invalid; +reason_code_name(16#9A) -> retain_not_supported; +reason_code_name(16#9B) -> qos_not_supported; +reason_code_name(16#9C) -> use_another_server; +reason_code_name(16#9D) -> server_moved; +reason_code_name(16#9E) -> shared_subscriptions_not_supported; +reason_code_name(16#9F) -> connection_rate_exceeded; +reason_code_name(16#A0) -> maximum_connect_time; +reason_code_name(16#A1) -> subscription_identifiers_not_supported; +reason_code_name(16#A2) -> wildcard_subscriptions_not_supported; +reason_code_name(_Code) -> unknown_error. diff --git a/src/emqtt_frame.erl b/src/emqtt_frame.erl new file mode 100644 index 0000000..96393f3 --- /dev/null +++ b/src/emqtt_frame.erl @@ -0,0 +1,738 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqtt_frame). + +-include("emqtt.hrl"). + +-export([initial_parse_state/0, initial_parse_state/1]). + +-export([parse/1, parse/2, serialize_fun/0, serialize_fun/1, serialize/1, serialize/2 ]). + +-export_type([options/0, parse_state/0, parse_result/0, serialize_fun/0]). + +-type(version() :: ?MQTT_PROTO_V3 + | ?MQTT_PROTO_V4 + | ?MQTT_PROTO_V5). + +-type(options() :: #{strict_mode => boolean(), + max_size => 1..?MAX_PACKET_SIZE, + version => version()}). + +-opaque(parse_state() :: {none, options()} | cont_fun()). + +-opaque(parse_result() :: {more, cont_fun()} + | {ok, #mqtt_packet{}, binary(), parse_state()}). + +-type(cont_fun() :: fun((binary()) -> parse_result())). + +-type(serialize_fun() :: fun((emqx_types:packet()) -> iodata())). + +-define(none(Options), {none, Options}). + +-define(DEFAULT_OPTIONS, + #{strict_mode => false, + max_size => ?MAX_PACKET_SIZE, + version => ?MQTT_PROTO_V4 + }). + +%%-------------------------------------------------------------------- +%% Init Parse State +%%-------------------------------------------------------------------- + +-spec(initial_parse_state() -> {none, options()}). +initial_parse_state() -> + initial_parse_state(#{}). + +-spec(initial_parse_state(options()) -> {none, options()}). +initial_parse_state(Options) when is_map(Options) -> + ?none(merge_opts(Options)). + +%% @pivate +merge_opts(Options) -> + maps:merge(?DEFAULT_OPTIONS, Options). + +%%-------------------------------------------------------------------- +%% Parse MQTT Frame +%%-------------------------------------------------------------------- + +-spec(parse(binary()) -> parse_result()). +parse(Bin) -> + parse(Bin, initial_parse_state()). + +-spec(parse(binary(), parse_state()) -> parse_result()). +parse(<<>>, {none, Options}) -> + {more, fun(Bin) -> parse(Bin, {none, Options}) end}; +parse(<>, + {none, Options = #{strict_mode := StrictMode}}) -> + %% Validate header if strict mode. + StrictMode andalso validate_header(Type, Dup, QoS, Retain), + Header = #mqtt_packet_header{type = Type, + dup = bool(Dup), + qos = QoS, + retain = bool(Retain) + }, + Header1 = case fixqos(Type, QoS) of + QoS -> Header; + FixedQoS -> Header#mqtt_packet_header{qos = FixedQoS} + end, + parse_remaining_len(Rest, Header1, Options); +parse(Bin, Cont) when is_binary(Bin), is_function(Cont) -> + Cont(Bin). + +parse_remaining_len(<<>>, Header, Options) -> + {more, fun(Bin) -> parse_remaining_len(Bin, Header, Options) end}; +parse_remaining_len(Rest, Header, Options) -> + parse_remaining_len(Rest, Header, 1, 0, Options). + +parse_remaining_len(_Bin, _Header, _Multiplier, Length, #{max_size := MaxSize}) when Length > MaxSize -> + error(frame_too_large); +parse_remaining_len(<<>>, Header, Multiplier, Length, Options) -> + {more, fun(Bin) -> parse_remaining_len(Bin, Header, Multiplier, Length, Options) end}; +%% Match DISCONNECT without payload +parse_remaining_len(<<0:8, Rest/binary>>, Header = #mqtt_packet_header{type = ?DISCONNECT}, 1, 0, Options) -> + Packet = packet(Header, #mqtt_packet_disconnect{reason_code = ?RC_SUCCESS}), + {ok, Packet, Rest, ?none(Options)}; +%% Match PINGREQ. +parse_remaining_len(<<0:8, Rest/binary>>, Header, 1, 0, Options) -> + parse_frame(Rest, Header, 0, Options); +%% Match PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK... +parse_remaining_len(<<0:1, 2:7, Rest/binary>>, Header, 1, 0, Options) -> + parse_frame(Rest, Header, 2, Options); +parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Options) -> + parse_remaining_len(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier, Options); +parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value, + Options = #{max_size := MaxSize}) -> + FrameLen = Value + Len * Multiplier, + if + FrameLen > MaxSize -> + error(frame_too_large); + true -> + parse_frame(Rest, Header, FrameLen, Options) + end. + +parse_frame(Bin, Header, 0, Options) -> + {ok, packet(Header), Bin, ?none(Options)}; + +parse_frame(Bin, Header, Length, Options) -> + case Bin of + <> -> + case parse_packet(Header, FrameBin, Options) of + {Variable, Payload} -> + {ok, packet(Header, Variable, Payload), Rest, ?none(Options)}; + Variable = #mqtt_packet_connect{proto_ver = Ver} -> + {ok, packet(Header, Variable), Rest, ?none(Options#{version := Ver})}; + Variable -> + {ok, packet(Header, Variable), Rest, ?none(Options)} + end; + TooShortBin -> + {more, fun(BinMore) -> + parse_frame(<>, Header, Length, Options) + end} + end. + +-compile({inline, [packet/1, packet/2, packet/3]}). +packet(Header) -> + #mqtt_packet{header = Header}. +packet(Header, Variable) -> + #mqtt_packet{header = Header, variable = Variable}. +packet(Header, Variable, Payload) -> + #mqtt_packet{header = Header, variable = Variable, payload = Payload}. + +parse_packet(#mqtt_packet_header{type = ?CONNECT}, FrameBin, _Options) -> + {ProtoName, Rest} = parse_utf8_string(FrameBin), + <> = Rest, + % Note: Crash when reserved flag doesn't equal to 0, there is no strict + % compliance with the MQTT5.0. + <> = Rest1, + + {Properties, Rest3} = parse_properties(Rest2, ProtoVer), + {ClientId, Rest4} = parse_utf8_string(Rest3), + ConnPacket = #mqtt_packet_connect{proto_name = ProtoName, + proto_ver = ProtoVer, + is_bridge = (BridgeTag =:= 8), + clean_start = bool(CleanStart), + will_flag = bool(WillFlag), + will_qos = WillQoS, + will_retain = bool(WillRetain), + keepalive = KeepAlive, + properties = Properties, + clientid = ClientId + }, + {ConnPacket1, Rest5} = parse_will_message(ConnPacket, Rest4), + {Username, Rest6} = parse_utf8_string(Rest5, bool(UsernameFlag)), + {Passsword, <<>>} = parse_utf8_string(Rest6, bool(PasswordFlag)), + ConnPacket1#mqtt_packet_connect{username = Username, password = Passsword}; + +parse_packet(#mqtt_packet_header{type = ?CONNACK}, + <>, #{version := Ver}) -> + {Properties, <<>>} = parse_properties(Rest, Ver), + #mqtt_packet_connack{ack_flags = AckFlags, + reason_code = ReasonCode, + properties = Properties + }; + +parse_packet(#mqtt_packet_header{type = ?PUBLISH, qos = QoS}, Bin, + #{strict_mode := StrictMode, version := Ver}) -> + {TopicName, Rest} = parse_utf8_string(Bin), + {PacketId, Rest1} = case QoS of + ?QOS_0 -> {undefined, Rest}; + _ -> parse_packet_id(Rest) + end, + (PacketId =/= undefined) andalso + StrictMode andalso validate_packet_id(PacketId), + {Properties, Payload} = parse_properties(Rest1, Ver), + Publish = #mqtt_packet_publish{topic_name = TopicName, + packet_id = PacketId, + properties = Properties + }, + {Publish, Payload}; + +parse_packet(#mqtt_packet_header{type = PubAck}, <>, #{strict_mode := StrictMode}) + when ?PUBACK =< PubAck, PubAck =< ?PUBCOMP -> + StrictMode andalso validate_packet_id(PacketId), + #mqtt_packet_puback{packet_id = PacketId, reason_code = 0}; + +parse_packet(#mqtt_packet_header{type = PubAck}, <>, + #{strict_mode := StrictMode, version := Ver = ?MQTT_PROTO_V5}) + when ?PUBACK =< PubAck, PubAck =< ?PUBCOMP -> + StrictMode andalso validate_packet_id(PacketId), + {Properties, <<>>} = parse_properties(Rest, Ver), + #mqtt_packet_puback{packet_id = PacketId, + reason_code = ReasonCode, + properties = Properties + }; + +parse_packet(#mqtt_packet_header{type = ?SUBSCRIBE}, <>, + #{strict_mode := StrictMode, version := Ver}) -> + StrictMode andalso validate_packet_id(PacketId), + {Properties, Rest1} = parse_properties(Rest, Ver), + TopicFilters = parse_topic_filters(subscribe, Rest1), + ok = validate_subqos([QoS || {_, #{qos := QoS}} <- TopicFilters]), + #mqtt_packet_subscribe{packet_id = PacketId, + properties = Properties, + topic_filters = TopicFilters + }; + +parse_packet(#mqtt_packet_header{type = ?SUBACK}, <>, + #{strict_mode := StrictMode, version := Ver}) -> + StrictMode andalso validate_packet_id(PacketId), + {Properties, Rest1} = parse_properties(Rest, Ver), + ReasonCodes = parse_reason_codes(Rest1), + #mqtt_packet_suback{packet_id = PacketId, + properties = Properties, + reason_codes = ReasonCodes + }; + +parse_packet(#mqtt_packet_header{type = ?UNSUBSCRIBE}, <>, + #{strict_mode := StrictMode, version := Ver}) -> + StrictMode andalso validate_packet_id(PacketId), + {Properties, Rest1} = parse_properties(Rest, Ver), + TopicFilters = parse_topic_filters(unsubscribe, Rest1), + #mqtt_packet_unsubscribe{packet_id = PacketId, + properties = Properties, + topic_filters = TopicFilters + }; + +parse_packet(#mqtt_packet_header{type = ?UNSUBACK}, <>, + #{strict_mode := StrictMode}) -> + StrictMode andalso validate_packet_id(PacketId), + #mqtt_packet_unsuback{packet_id = PacketId}; + +parse_packet(#mqtt_packet_header{type = ?UNSUBACK}, <>, + #{strict_mode := StrictMode, version := Ver}) -> + StrictMode andalso validate_packet_id(PacketId), + {Properties, Rest1} = parse_properties(Rest, Ver), + ReasonCodes = parse_reason_codes(Rest1), + #mqtt_packet_unsuback{packet_id = PacketId, + properties = Properties, + reason_codes = ReasonCodes + }; + +parse_packet(#mqtt_packet_header{type = ?DISCONNECT}, <>, + #{version := ?MQTT_PROTO_V5}) -> + {Properties, <<>>} = parse_properties(Rest, ?MQTT_PROTO_V5), + #mqtt_packet_disconnect{reason_code = ReasonCode, + properties = Properties + }; + +parse_packet(#mqtt_packet_header{type = ?AUTH}, <>, + #{version := ?MQTT_PROTO_V5}) -> + {Properties, <<>>} = parse_properties(Rest, ?MQTT_PROTO_V5), + #mqtt_packet_auth{reason_code = ReasonCode, properties = Properties}. + +parse_will_message(Packet = #mqtt_packet_connect{will_flag = true, + proto_ver = Ver}, Bin) -> + {Props, Rest} = parse_properties(Bin, Ver), + {Topic, Rest1} = parse_utf8_string(Rest), + {Payload, Rest2} = parse_binary_data(Rest1), + {Packet#mqtt_packet_connect{will_props = Props, + will_topic = Topic, + will_payload = Payload + }, Rest2}; +parse_will_message(Packet, Bin) -> {Packet, Bin}. + +-compile({inline, [parse_packet_id/1]}). +parse_packet_id(<>) -> + {PacketId, Rest}. + +parse_properties(Bin, Ver) when Ver =/= ?MQTT_PROTO_V5 -> + {undefined, Bin}; +%% TODO: version mess? +parse_properties(<<>>, ?MQTT_PROTO_V5) -> + {#{}, <<>>}; +parse_properties(<<0, Rest/binary>>, ?MQTT_PROTO_V5) -> + {#{}, Rest}; +parse_properties(Bin, ?MQTT_PROTO_V5) -> + {Len, Rest} = parse_variable_byte_integer(Bin), + <> = Rest, + {parse_property(PropsBin, #{}), Rest1}. + +parse_property(<<>>, Props) -> + Props; +parse_property(<<16#01, Val, Bin/binary>>, Props) -> + parse_property(Bin, Props#{'Payload-Format-Indicator' => Val}); +parse_property(<<16#02, Val:32/big, Bin/binary>>, Props) -> + parse_property(Bin, Props#{'Message-Expiry-Interval' => Val}); +parse_property(<<16#03, Bin/binary>>, Props) -> + {Val, Rest} = parse_utf8_string(Bin), + parse_property(Rest, Props#{'Content-Type' => Val}); +parse_property(<<16#08, Bin/binary>>, Props) -> + {Val, Rest} = parse_utf8_string(Bin), + parse_property(Rest, Props#{'Response-Topic' => Val}); +parse_property(<<16#09, Len:16/big, Val:Len/binary, Bin/binary>>, Props) -> + parse_property(Bin, Props#{'Correlation-Data' => Val}); +parse_property(<<16#0B, Bin/binary>>, Props) -> + {Val, Rest} = parse_variable_byte_integer(Bin), + parse_property(Rest, Props#{'Subscription-Identifier' => Val}); +parse_property(<<16#11, Val:32/big, Bin/binary>>, Props) -> + parse_property(Bin, Props#{'Session-Expiry-Interval' => Val}); +parse_property(<<16#12, Bin/binary>>, Props) -> + {Val, Rest} = parse_utf8_string(Bin), + parse_property(Rest, Props#{'Assigned-Client-Identifier' => Val}); +parse_property(<<16#13, Val:16, Bin/binary>>, Props) -> + parse_property(Bin, Props#{'Server-Keep-Alive' => Val}); +parse_property(<<16#15, Bin/binary>>, Props) -> + {Val, Rest} = parse_utf8_string(Bin), + parse_property(Rest, Props#{'Authentication-Method' => Val}); +parse_property(<<16#16, Len:16/big, Val:Len/binary, Bin/binary>>, Props) -> + parse_property(Bin, Props#{'Authentication-Data' => Val}); +parse_property(<<16#17, Val, Bin/binary>>, Props) -> + parse_property(Bin, Props#{'Request-Problem-Information' => Val}); +parse_property(<<16#18, Val:32, Bin/binary>>, Props) -> + parse_property(Bin, Props#{'Will-Delay-Interval' => Val}); +parse_property(<<16#19, Val, Bin/binary>>, Props) -> + parse_property(Bin, Props#{'Request-Response-Information' => Val}); +parse_property(<<16#1A, Bin/binary>>, Props) -> + {Val, Rest} = parse_utf8_string(Bin), + parse_property(Rest, Props#{'Response-Information' => Val}); +parse_property(<<16#1C, Bin/binary>>, Props) -> + {Val, Rest} = parse_utf8_string(Bin), + parse_property(Rest, Props#{'Server-Reference' => Val}); +parse_property(<<16#1F, Bin/binary>>, Props) -> + {Val, Rest} = parse_utf8_string(Bin), + parse_property(Rest, Props#{'Reason-String' => Val}); +parse_property(<<16#21, Val:16/big, Bin/binary>>, Props) -> + parse_property(Bin, Props#{'Receive-Maximum' => Val}); +parse_property(<<16#22, Val:16/big, Bin/binary>>, Props) -> + parse_property(Bin, Props#{'Topic-Alias-Maximum' => Val}); +parse_property(<<16#23, Val:16/big, Bin/binary>>, Props) -> + parse_property(Bin, Props#{'Topic-Alias' => Val}); +parse_property(<<16#24, Val, Bin/binary>>, Props) -> + parse_property(Bin, Props#{'Maximum-QoS' => Val}); +parse_property(<<16#25, Val, Bin/binary>>, Props) -> + parse_property(Bin, Props#{'Retain-Available' => Val}); +parse_property(<<16#26, Bin/binary>>, Props) -> + {Pair, Rest} = parse_utf8_pair(Bin), + case maps:find('User-Property', Props) of + {ok, UserProps} -> + UserProps1 = lists:append(UserProps, [Pair]), + parse_property(Rest, Props#{'User-Property' := UserProps1}); + error -> + parse_property(Rest, Props#{'User-Property' => [Pair]}) + end; +parse_property(<<16#27, Val:32, Bin/binary>>, Props) -> + parse_property(Bin, Props#{'Maximum-Packet-Size' => Val}); +parse_property(<<16#28, Val, Bin/binary>>, Props) -> + parse_property(Bin, Props#{'Wildcard-Subscription-Available' => Val}); +parse_property(<<16#29, Val, Bin/binary>>, Props) -> + parse_property(Bin, Props#{'Subscription-Identifier-Available' => Val}); +parse_property(<<16#2A, Val, Bin/binary>>, Props) -> + parse_property(Bin, Props#{'Shared-Subscription-Available' => Val}). + +parse_variable_byte_integer(Bin) -> + parse_variable_byte_integer(Bin, 1, 0). +parse_variable_byte_integer(<<1:1, Len:7, Rest/binary>>, Multiplier, Value) -> + parse_variable_byte_integer(Rest, Multiplier * ?HIGHBIT, Value + Len * Multiplier); +parse_variable_byte_integer(<<0:1, Len:7, Rest/binary>>, Multiplier, Value) -> + {Value + Len * Multiplier, Rest}. + +parse_topic_filters(subscribe, Bin) -> + [{Topic, #{rh => Rh, rap => Rap, nl => Nl, qos => QoS}} + || <> <= Bin]; + +parse_topic_filters(unsubscribe, Bin) -> + [Topic || <> <= Bin]. + +parse_reason_codes(Bin) -> + [Code || <> <= Bin]. + +parse_utf8_pair(<>) -> + {{Key, Val}, Rest}. + +parse_utf8_string(Bin, false) -> + {undefined, Bin}; +parse_utf8_string(Bin, true) -> + parse_utf8_string(Bin). + +parse_utf8_string(<>) -> + {Str, Rest}. + +parse_binary_data(<>) -> + {Data, Rest}. + +%%-------------------------------------------------------------------- +%% Serialize MQTT Packet +%%-------------------------------------------------------------------- + +serialize_fun() -> serialize_fun(?DEFAULT_OPTIONS). + +serialize_fun(#mqtt_packet_connect{proto_ver = ProtoVer, properties = ConnProps}) -> + MaxSize = get_property('Maximum-Packet-Size', ConnProps, ?MAX_PACKET_SIZE), + serialize_fun(#{version => ProtoVer, max_size => MaxSize}); + +serialize_fun(#{version := Ver, max_size := MaxSize}) -> + fun(Packet) -> + IoData = serialize(Packet, Ver), + case is_too_large(IoData, MaxSize) of + true -> <<>>; + false -> IoData + end + end. + +-spec(serialize(#mqtt_packet{}) -> iodata()). +serialize(Packet) -> serialize(Packet, ?MQTT_PROTO_V4). + +-spec(serialize(#mqtt_packet{}, version()) -> iodata()). +serialize(#mqtt_packet{header = Header, + variable = Variable, + payload = Payload}, Ver) -> + serialize(Header, serialize_variable(Variable, Ver), serialize_payload(Payload)). + +serialize(#mqtt_packet_header{type = Type, + dup = Dup, + qos = QoS, + retain = Retain + }, VariableBin, PayloadBin) + when ?CONNECT =< Type andalso Type =< ?AUTH -> + Len = iolist_size(VariableBin) + iolist_size(PayloadBin), + [<>, + serialize_remaining_len(Len), VariableBin, PayloadBin]. + +serialize_variable(#mqtt_packet_connect{ + proto_name = ProtoName, + proto_ver = ProtoVer, + is_bridge = IsBridge, + clean_start = CleanStart, + will_flag = WillFlag, + will_qos = WillQoS, + will_retain = WillRetain, + keepalive = KeepAlive, + properties = Properties, + clientid = ClientId, + will_props = WillProps, + will_topic = WillTopic, + will_payload = WillPayload, + username = Username, + password = Password}, _Ver) -> + [serialize_binary_data(ProtoName), + <<(case IsBridge of + true -> 16#80 + ProtoVer; + false -> ProtoVer + end):8, + (flag(Username)):1, + (flag(Password)):1, + (flag(WillRetain)):1, + WillQoS:2, + (flag(WillFlag)):1, + (flag(CleanStart)):1, + 0:1, + KeepAlive:16/big-unsigned-integer>>, + serialize_properties(Properties, ProtoVer), + serialize_utf8_string(ClientId), + case WillFlag of + true -> [serialize_properties(WillProps, ProtoVer), + serialize_utf8_string(WillTopic), + serialize_binary_data(WillPayload)]; + false -> <<>> + end, + serialize_utf8_string(Username, true), + serialize_utf8_string(Password, true)]; + +serialize_variable(#mqtt_packet_connack{ack_flags = AckFlags, + reason_code = ReasonCode, + properties = Properties}, Ver) -> + [AckFlags, ReasonCode, serialize_properties(Properties, Ver)]; + +serialize_variable(#mqtt_packet_publish{topic_name = TopicName, + packet_id = PacketId, + properties = Properties}, Ver) -> + [serialize_utf8_string(TopicName), + if + PacketId =:= undefined -> <<>>; + true -> <> + end, + serialize_properties(Properties, Ver)]; + +serialize_variable(#mqtt_packet_puback{packet_id = PacketId}, Ver) + when Ver == ?MQTT_PROTO_V3; Ver == ?MQTT_PROTO_V4 -> + <>; +serialize_variable(#mqtt_packet_puback{packet_id = PacketId, + reason_code = ReasonCode, + properties = Properties + }, + Ver = ?MQTT_PROTO_V5) -> + [<>, ReasonCode, + serialize_properties(Properties, Ver)]; + +serialize_variable(#mqtt_packet_subscribe{packet_id = PacketId, + properties = Properties, + topic_filters = TopicFilters}, Ver) -> + [<>, serialize_properties(Properties, Ver), + serialize_topic_filters(subscribe, TopicFilters, Ver)]; + +serialize_variable(#mqtt_packet_suback{packet_id = PacketId, + properties = Properties, + reason_codes = ReasonCodes}, Ver) -> + [<>, serialize_properties(Properties, Ver), + serialize_reason_codes(ReasonCodes)]; + +serialize_variable(#mqtt_packet_unsubscribe{packet_id = PacketId, + properties = Properties, + topic_filters = TopicFilters}, Ver) -> + [<>, serialize_properties(Properties, Ver), + serialize_topic_filters(unsubscribe, TopicFilters, Ver)]; + +serialize_variable(#mqtt_packet_unsuback{packet_id = PacketId, + properties = Properties, + reason_codes = ReasonCodes}, Ver) -> + [<>, serialize_properties(Properties, Ver), + serialize_reason_codes(ReasonCodes)]; + +serialize_variable(#mqtt_packet_disconnect{}, Ver) + when Ver == ?MQTT_PROTO_V3; Ver == ?MQTT_PROTO_V4 -> + <<>>; + +serialize_variable(#mqtt_packet_disconnect{reason_code = ReasonCode, + properties = Properties}, + Ver = ?MQTT_PROTO_V5) -> + [ReasonCode, serialize_properties(Properties, Ver)]; +serialize_variable(#mqtt_packet_disconnect{}, _Ver) -> + <<>>; + +serialize_variable(#mqtt_packet_auth{reason_code = ReasonCode, + properties = Properties}, + Ver = ?MQTT_PROTO_V5) -> + [ReasonCode, serialize_properties(Properties, Ver)]; + +serialize_variable(PacketId, ?MQTT_PROTO_V3) when is_integer(PacketId) -> + <>; +serialize_variable(PacketId, ?MQTT_PROTO_V4) when is_integer(PacketId) -> + <>; +serialize_variable(undefined, _Ver) -> + <<>>. + +serialize_payload(undefined) -> <<>>; +serialize_payload(Bin) -> Bin. + +serialize_properties(_Props, Ver) when Ver =/= ?MQTT_PROTO_V5 -> + <<>>; +serialize_properties(Props, ?MQTT_PROTO_V5) -> + serialize_properties(Props). + +serialize_properties(undefined) -> + <<0>>; +serialize_properties(Props) when map_size(Props) == 0 -> + <<0>>; +serialize_properties(Props) when is_map(Props) -> + Bin = << <<(serialize_property(Prop, Val))/binary>> || {Prop, Val} <- maps:to_list(Props) >>, + [serialize_variable_byte_integer(byte_size(Bin)), Bin]. + +serialize_property(_, undefined) -> + <<>>; +serialize_property('Payload-Format-Indicator', Val) -> + <<16#01, Val>>; +serialize_property('Message-Expiry-Interval', Val) -> + <<16#02, Val:32/big>>; +serialize_property('Content-Type', Val) -> + <<16#03, (serialize_utf8_string(Val))/binary>>; +serialize_property('Response-Topic', Val) -> + <<16#08, (serialize_utf8_string(Val))/binary>>; +serialize_property('Correlation-Data', Val) -> + <<16#09, (byte_size(Val)):16, Val/binary>>; +serialize_property('Subscription-Identifier', Val) -> + <<16#0B, (serialize_variable_byte_integer(Val))/binary>>; +serialize_property('Session-Expiry-Interval', Val) -> + <<16#11, Val:32/big>>; +serialize_property('Assigned-Client-Identifier', Val) -> + <<16#12, (serialize_utf8_string(Val))/binary>>; +serialize_property('Server-Keep-Alive', Val) -> + <<16#13, Val:16/big>>; +serialize_property('Authentication-Method', Val) -> + <<16#15, (serialize_utf8_string(Val))/binary>>; +serialize_property('Authentication-Data', Val) -> + <<16#16, (iolist_size(Val)):16, Val/binary>>; +serialize_property('Request-Problem-Information', Val) -> + <<16#17, Val>>; +serialize_property('Will-Delay-Interval', Val) -> + <<16#18, Val:32/big>>; +serialize_property('Request-Response-Information', Val) -> + <<16#19, Val>>; +serialize_property('Response-Information', Val) -> + <<16#1A, (serialize_utf8_string(Val))/binary>>; +serialize_property('Server-Reference', Val) -> + <<16#1C, (serialize_utf8_string(Val))/binary>>; +serialize_property('Reason-String', Val) -> + <<16#1F, (serialize_utf8_string(Val))/binary>>; +serialize_property('Receive-Maximum', Val) -> + <<16#21, Val:16/big>>; +serialize_property('Topic-Alias-Maximum', Val) -> + <<16#22, Val:16/big>>; +serialize_property('Topic-Alias', Val) -> + <<16#23, Val:16/big>>; +serialize_property('Maximum-QoS', Val) -> + <<16#24, Val>>; +serialize_property('Retain-Available', Val) -> + <<16#25, Val>>; +serialize_property('User-Property', {Key, Val}) -> + <<16#26, (serialize_utf8_pair({Key, Val}))/binary>>; +serialize_property('User-Property', Props) when is_list(Props) -> + << <<(serialize_property('User-Property', {Key, Val}))/binary>> + || {Key, Val} <- Props >>; +serialize_property('Maximum-Packet-Size', Val) -> + <<16#27, Val:32/big>>; +serialize_property('Wildcard-Subscription-Available', Val) -> + <<16#28, Val>>; +serialize_property('Subscription-Identifier-Available', Val) -> + <<16#29, Val>>; +serialize_property('Shared-Subscription-Available', Val) -> + <<16#2A, Val>>. + +serialize_topic_filters(subscribe, TopicFilters, ?MQTT_PROTO_V5) -> + << <<(serialize_utf8_string(Topic))/binary, + ?RESERVED:2, Rh:2, (flag(Rap)):1,(flag(Nl)):1, QoS:2 >> + || {Topic, #{rh := Rh, rap := Rap, nl := Nl, qos := QoS}} <- TopicFilters >>; + +serialize_topic_filters(subscribe, TopicFilters, _Ver) -> + << <<(serialize_utf8_string(Topic))/binary, ?RESERVED:6, QoS:2>> + || {Topic, #{qos := QoS}} <- TopicFilters >>; + +serialize_topic_filters(unsubscribe, TopicFilters, _Ver) -> + << <<(serialize_utf8_string(Topic))/binary>> || Topic <- TopicFilters >>. + +serialize_reason_codes(undefined) -> + <<>>; +serialize_reason_codes(ReasonCodes) when is_list(ReasonCodes) -> + << <> || Code <- ReasonCodes >>. + +serialize_utf8_pair({Name, Value}) -> + << (serialize_utf8_string(Name))/binary, (serialize_utf8_string(Value))/binary >>. + +serialize_binary_data(Bin) -> + [<<(byte_size(Bin)):16/big-unsigned-integer>>, Bin]. + +serialize_utf8_string(undefined, false) -> + error(utf8_string_undefined); +serialize_utf8_string(undefined, true) -> + <<>>; +serialize_utf8_string(String, _AllowNull) -> + serialize_utf8_string(String). + +serialize_utf8_string(String) -> + StringBin = unicode:characters_to_binary(String), + Len = byte_size(StringBin), + true = (Len =< 16#ffff), + <>. + +serialize_remaining_len(I) -> + serialize_variable_byte_integer(I). + +serialize_variable_byte_integer(N) when N =< ?LOWBITS -> + <<0:1, N:7>>; +serialize_variable_byte_integer(N) -> + <<1:1, (N rem ?HIGHBIT):7, (serialize_variable_byte_integer(N div ?HIGHBIT))/binary>>. + +%% Is the frame too large? +-spec(is_too_large(iodata(), pos_integer()) -> boolean()). +is_too_large(IoData, MaxSize) -> + iolist_size(IoData) >= MaxSize. + +get_property(_Key, undefined, Default) -> + Default; +get_property(Key, Props, Default) -> + maps:get(Key, Props, Default). + +%% Validate header if sctrict mode. See: mqtt-v5.0: 2.1.3 Flags +validate_header(?CONNECT, 0, 0, 0) -> ok; +validate_header(?CONNACK, 0, 0, 0) -> ok; +validate_header(?PUBLISH, 0, ?QOS_0, _) -> ok; +validate_header(?PUBLISH, _, ?QOS_1, _) -> ok; +validate_header(?PUBLISH, 0, ?QOS_2, _) -> ok; +validate_header(?PUBACK, 0, 0, 0) -> ok; +validate_header(?PUBREC, 0, 0, 0) -> ok; +validate_header(?PUBREL, 0, 1, 0) -> ok; +validate_header(?PUBCOMP, 0, 0, 0) -> ok; +validate_header(?SUBSCRIBE, 0, 1, 0) -> ok; +validate_header(?SUBACK, 0, 0, 0) -> ok; +validate_header(?UNSUBSCRIBE, 0, 1, 0) -> ok; +validate_header(?UNSUBACK, 0, 0, 0) -> ok; +validate_header(?PINGREQ, 0, 0, 0) -> ok; +validate_header(?PINGRESP, 0, 0, 0) -> ok; +validate_header(?DISCONNECT, 0, 0, 0) -> ok; +validate_header(?AUTH, 0, 0, 0) -> ok; +validate_header(_Type, _Dup, _QoS, _Rt) -> error(bad_frame_header). + +-compile({inline, [validate_packet_id/1]}). +validate_packet_id(0) -> error(bad_packet_id); +validate_packet_id(_) -> ok. + +validate_subqos([3|_]) -> error(bad_subqos); +validate_subqos([_|T]) -> validate_subqos(T); +validate_subqos([]) -> ok. + +bool(0) -> false; +bool(1) -> true. + +flag(undefined) -> ?RESERVED; +flag(false) -> 0; +flag(true) -> 1; +flag(X) when is_integer(X) -> X; +flag(B) when is_binary(B) -> 1. + +fixqos(?PUBREL, 0) -> 1; +fixqos(?SUBSCRIBE, 0) -> 1; +fixqos(?UNSUBSCRIBE, 0) -> 1; +fixqos(_Type, QoS) -> QoS. + diff --git a/src/emqtt_props.erl b/src/emqtt_props.erl new file mode 100644 index 0000000..a30f037 --- /dev/null +++ b/src/emqtt_props.erl @@ -0,0 +1,172 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc MQTT5 Properties +-module(emqtt_props). + +-include("emqtt.hrl"). + +-export([id/1, name/1, filter/2, validate/1]). + +%% For tests +-export([all/0]). + +-type(prop_name() :: atom()). +-type(prop_id() :: pos_integer()). + +-define(PROPS_TABLE, + #{16#01 => {'Payload-Format-Indicator', 'Byte', [?PUBLISH]}, + 16#02 => {'Message-Expiry-Interval', 'Four-Byte-Integer', [?PUBLISH]}, + 16#03 => {'Content-Type', 'UTF8-Encoded-String', [?PUBLISH]}, + 16#08 => {'Response-Topic', 'UTF8-Encoded-String', [?PUBLISH]}, + 16#09 => {'Correlation-Data', 'Binary-Data', [?PUBLISH]}, + 16#0B => {'Subscription-Identifier', 'Variable-Byte-Integer', [?PUBLISH, ?SUBSCRIBE]}, + 16#11 => {'Session-Expiry-Interval', 'Four-Byte-Integer', [?CONNECT, ?CONNACK, ?DISCONNECT]}, + 16#12 => {'Assigned-Client-Identifier', 'UTF8-Encoded-String', [?CONNACK]}, + 16#13 => {'Server-Keep-Alive', 'Two-Byte-Integer', [?CONNACK]}, + 16#15 => {'Authentication-Method', 'UTF8-Encoded-String', [?CONNECT, ?CONNACK, ?AUTH]}, + 16#16 => {'Authentication-Data', 'Binary-Data', [?CONNECT, ?CONNACK, ?AUTH]}, + 16#17 => {'Request-Problem-Information', 'Byte', [?CONNECT]}, + 16#18 => {'Will-Delay-Interval', 'Four-Byte-Integer', ['WILL']}, + 16#19 => {'Request-Response-Information', 'Byte', [?CONNECT]}, + 16#1A => {'Response-Information', 'UTF8-Encoded-String', [?CONNACK]}, + 16#1C => {'Server-Reference', 'UTF8-Encoded-String', [?CONNACK, ?DISCONNECT]}, + 16#1F => {'Reason-String', 'UTF8-Encoded-String', [?CONNACK, ?DISCONNECT, ?PUBACK, + ?PUBREC, ?PUBREL, ?PUBCOMP, + ?SUBACK, ?UNSUBACK, ?AUTH]}, + 16#21 => {'Receive-Maximum', 'Two-Byte-Integer', [?CONNECT, ?CONNACK]}, + 16#22 => {'Topic-Alias-Maximum', 'Two-Byte-Integer', [?CONNECT, ?CONNACK]}, + 16#23 => {'Topic-Alias', 'Two-Byte-Integer', [?PUBLISH]}, + 16#24 => {'Maximum-QoS', 'Byte', [?CONNACK]}, + 16#25 => {'Retain-Available', 'Byte', [?CONNACK]}, + 16#26 => {'User-Property', 'UTF8-String-Pair', 'ALL'}, + 16#27 => {'Maximum-Packet-Size', 'Four-Byte-Integer', [?CONNECT, ?CONNACK]}, + 16#28 => {'Wildcard-Subscription-Available', 'Byte', [?CONNACK]}, + 16#29 => {'Subscription-Identifier-Available', 'Byte', [?CONNACK]}, + 16#2A => {'Shared-Subscription-Available', 'Byte', [?CONNACK]} + }). + +-spec(id(prop_name()) -> prop_id()). +id('Payload-Format-Indicator') -> 16#01; +id('Message-Expiry-Interval') -> 16#02; +id('Content-Type') -> 16#03; +id('Response-Topic') -> 16#08; +id('Correlation-Data') -> 16#09; +id('Subscription-Identifier') -> 16#0B; +id('Session-Expiry-Interval') -> 16#11; +id('Assigned-Client-Identifier') -> 16#12; +id('Server-Keep-Alive') -> 16#13; +id('Authentication-Method') -> 16#15; +id('Authentication-Data') -> 16#16; +id('Request-Problem-Information') -> 16#17; +id('Will-Delay-Interval') -> 16#18; +id('Request-Response-Information') -> 16#19; +id('Response-Information') -> 16#1A; +id('Server-Reference') -> 16#1C; +id('Reason-String') -> 16#1F; +id('Receive-Maximum') -> 16#21; +id('Topic-Alias-Maximum') -> 16#22; +id('Topic-Alias') -> 16#23; +id('Maximum-QoS') -> 16#24; +id('Retain-Available') -> 16#25; +id('User-Property') -> 16#26; +id('Maximum-Packet-Size') -> 16#27; +id('Wildcard-Subscription-Available') -> 16#28; +id('Subscription-Identifier-Available') -> 16#29; +id('Shared-Subscription-Available') -> 16#2A; +id(Name) -> error({bad_property, Name}). + +-spec(name(prop_id()) -> prop_name()). +name(16#01) -> 'Payload-Format-Indicator'; +name(16#02) -> 'Message-Expiry-Interval'; +name(16#03) -> 'Content-Type'; +name(16#08) -> 'Response-Topic'; +name(16#09) -> 'Correlation-Data'; +name(16#0B) -> 'Subscription-Identifier'; +name(16#11) -> 'Session-Expiry-Interval'; +name(16#12) -> 'Assigned-Client-Identifier'; +name(16#13) -> 'Server-Keep-Alive'; +name(16#15) -> 'Authentication-Method'; +name(16#16) -> 'Authentication-Data'; +name(16#17) -> 'Request-Problem-Information'; +name(16#18) -> 'Will-Delay-Interval'; +name(16#19) -> 'Request-Response-Information'; +name(16#1A) -> 'Response-Information'; +name(16#1C) -> 'Server-Reference'; +name(16#1F) -> 'Reason-String'; +name(16#21) -> 'Receive-Maximum'; +name(16#22) -> 'Topic-Alias-Maximum'; +name(16#23) -> 'Topic-Alias'; +name(16#24) -> 'Maximum-QoS'; +name(16#25) -> 'Retain-Available'; +name(16#26) -> 'User-Property'; +name(16#27) -> 'Maximum-Packet-Size'; +name(16#28) -> 'Wildcard-Subscription-Available'; +name(16#29) -> 'Subscription-Identifier-Available'; +name(16#2A) -> 'Shared-Subscription-Available'; +name(Id) -> error({unsupported_property, Id}). + +filter(PacketType, Props) when is_map(Props) -> + maps:from_list(filter(PacketType, maps:to_list(Props))); + +filter(PacketType, Props) when ?CONNECT =< PacketType, PacketType =< ?AUTH, is_list(Props) -> + Filter = fun(Name) -> + case maps:find(id(Name), ?PROPS_TABLE) of + {ok, {Name, _Type, 'ALL'}} -> + true; + {ok, {Name, _Type, AllowedTypes}} -> + lists:member(PacketType, AllowedTypes); + error -> false + end + end, + [Prop || Prop = {Name, _} <- Props, Filter(Name)]. + +validate(Props) when is_map(Props) -> + lists:foreach(fun validate_prop/1, maps:to_list(Props)). + +validate_prop(Prop = {Name, Val}) -> + case maps:find(id(Name), ?PROPS_TABLE) of + {ok, {Name, Type, _}} -> + validate_value(Type, Val) + orelse error(bad_property, Prop); + error -> + error({bad_property, Prop}) + end. + +validate_value('Byte', Val) -> + is_integer(Val) andalso Val =< 16#FF; +validate_value('Two-Byte-Integer', Val) -> + is_integer(Val) andalso 0 =< Val andalso Val =< 16#FFFF; +validate_value('Four-Byte-Integer', Val) -> + is_integer(Val) andalso 0 =< Val andalso Val =< 16#FFFFFFFF; +validate_value('Variable-Byte-Integer', Val) -> + is_integer(Val) andalso 0 =< Val andalso Val =< 16#7FFFFFFF; +validate_value('UTF8-String-Pair', {Name, Val}) -> + validate_value('UTF8-Encoded-String', Name) + andalso validate_value('UTF8-Encoded-String', Val); +validate_value('UTF8-String-Pair', Pairs) when is_list(Pairs) -> + lists:foldl(fun(Pair, OK) -> + OK andalso validate_value('UTF8-String-Pair', Pair) + end, true, Pairs); +validate_value('UTF8-Encoded-String', Val) -> + is_binary(Val); +validate_value('Binary-Data', Val) -> + is_binary(Val); +validate_value(_Type, _Val) -> false. + +-spec(all() -> map()). +all() -> ?PROPS_TABLE. + diff --git a/src/emqtt_sock.erl b/src/emqtt_sock.erl new file mode 100644 index 0000000..05b234f --- /dev/null +++ b/src/emqtt_sock.erl @@ -0,0 +1,120 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqtt_sock). + +-export([connect/4, send/2, recv/2, close/1 ]). + +-export([ sockname/1, setopts/2, getstat/2 ]). + +-record(ssl_socket, { + tcp, + ssl +}). + +-type(socket() :: inet:socket() | #ssl_socket{}). + +-type(sockname() :: {inet:ip_address(), inet:port_number()}). + +-type(option() :: gen_tcp:connect_option() | {ssl_opts, [ssl:ssl_option()]}). + +-export_type([socket/0, option/0]). + +-define(DEFAULT_TCP_OPTIONS, [binary, {packet, raw}, {active, false}, + {nodelay, true}]). + +-spec(connect(inet:ip_address() | inet:hostname(), + inet:port_number(), [option()], timeout()) + -> {ok, socket()} | {error, term()}). +connect(Host, Port, SockOpts, Timeout) -> + TcpOpts = merge_opts(?DEFAULT_TCP_OPTIONS, + lists:keydelete(ssl_opts, 1, SockOpts)), + case gen_tcp:connect(Host, Port, TcpOpts, Timeout) of + {ok, Sock} -> + case lists:keyfind(ssl_opts, 1, SockOpts) of + {ssl_opts, SslOpts} -> + ssl_upgrade(Sock, SslOpts, Timeout); + false -> + {ok, Sock} + end; + {error, Reason} -> + {error, Reason} + end. + +ssl_upgrade(Sock, SslOpts, Timeout) -> + TlsVersions = proplists:get_value(versions, SslOpts, []), + Ciphers = proplists:get_value(ciphers, SslOpts, default_ciphers(TlsVersions)), + SslOpts2 = merge_opts(SslOpts, [{ciphers, Ciphers}]), + case ssl:connect(Sock, SslOpts2, Timeout) of + {ok, SslSock} -> + ok = ssl:controlling_process(SslSock, self()), + {ok, #ssl_socket{tcp = Sock, ssl = SslSock}}; + {error, Reason} -> + {error, Reason} + end. + +-spec(send(socket(), iodata()) -> ok | {error, einval | closed}). +send(Sock, Data) when is_port(Sock) -> + gen_tcp:send(Sock, Data); +send(#ssl_socket{ssl = SslSock}, Data) -> + ssl:send(SslSock, Data). + +-spec(recv(socket(), non_neg_integer()) + -> {ok, iodata()} | {error, closed | inet:posix()}). +recv(Sock, Length) when is_port(Sock) -> + gen_tcp:recv(Sock, Length); +recv(#ssl_socket{ssl = SslSock}, Length) -> + ssl:recv(SslSock, Length). + +-spec(close(socket()) -> ok). +close(Sock) when is_port(Sock) -> + gen_tcp:close(Sock); +close(#ssl_socket{ssl = SslSock}) -> + ssl:close(SslSock). + +-spec(setopts(socket(), [gen_tcp:option() | ssl:socketoption()]) -> ok). +setopts(Sock, Opts) when is_port(Sock) -> + inet:setopts(Sock, Opts); +setopts(#ssl_socket{ssl = SslSock}, Opts) -> + ssl:setopts(SslSock, Opts). + +-spec(getstat(socket(), [atom()]) + -> {ok, [{atom(), integer()}]} | {error, term()}). +getstat(Sock, Options) when is_port(Sock) -> + inet:getstat(Sock, Options); +getstat(#ssl_socket{tcp = Sock}, Options) -> + inet:getstat(Sock, Options). + +-spec(sockname(socket()) -> {ok, sockname()} | {error, term()}). +sockname(Sock) when is_port(Sock) -> + inet:sockname(Sock); +sockname(#ssl_socket{ssl = SslSock}) -> + ssl:sockname(SslSock). + +-spec(merge_opts(list(), list()) -> list()). +merge_opts(Defaults, Options) -> + lists:foldl( + fun({Opt, Val}, Acc) -> + lists:keystore(Opt, 1, Acc, {Opt, Val}); + (Opt, Acc) -> + lists:usort([Opt | Acc]) + end, Defaults, Options). + +default_ciphers(TlsVersions) -> + lists:foldl( + fun(TlsVer, Ciphers) -> + Ciphers ++ ssl:cipher_suites(all, TlsVer) + end, [], TlsVersions). \ No newline at end of file