init project
This commit is contained in:
commit
5111914a9b
19
.gitignore
vendored
Normal file
19
.gitignore
vendored
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
.rebar3
|
||||||
|
_*
|
||||||
|
.eunit
|
||||||
|
*.o
|
||||||
|
*.beam
|
||||||
|
*.plt
|
||||||
|
*.swp
|
||||||
|
*.swo
|
||||||
|
.erlang.cookie
|
||||||
|
ebin
|
||||||
|
log
|
||||||
|
erl_crash.dump
|
||||||
|
.rebar
|
||||||
|
logs
|
||||||
|
_build
|
||||||
|
.idea
|
||||||
|
*.iml
|
||||||
|
rebar3.crashdump
|
||||||
|
*~
|
||||||
191
LICENSE
Normal file
191
LICENSE
Normal file
@ -0,0 +1,191 @@
|
|||||||
|
Apache License
|
||||||
|
Version 2.0, January 2004
|
||||||
|
http://www.apache.org/licenses/
|
||||||
|
|
||||||
|
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
|
||||||
|
|
||||||
|
1. Definitions.
|
||||||
|
|
||||||
|
"License" shall mean the terms and conditions for use, reproduction,
|
||||||
|
and distribution as defined by Sections 1 through 9 of this document.
|
||||||
|
|
||||||
|
"Licensor" shall mean the copyright owner or entity authorized by
|
||||||
|
the copyright owner that is granting the License.
|
||||||
|
|
||||||
|
"Legal Entity" shall mean the union of the acting entity and all
|
||||||
|
other entities that control, are controlled by, or are under common
|
||||||
|
control with that entity. For the purposes of this definition,
|
||||||
|
"control" means (i) the power, direct or indirect, to cause the
|
||||||
|
direction or management of such entity, whether by contract or
|
||||||
|
otherwise, or (ii) ownership of fifty percent (50%) or more of the
|
||||||
|
outstanding shares, or (iii) beneficial ownership of such entity.
|
||||||
|
|
||||||
|
"You" (or "Your") shall mean an individual or Legal Entity
|
||||||
|
exercising permissions granted by this License.
|
||||||
|
|
||||||
|
"Source" form shall mean the preferred form for making modifications,
|
||||||
|
including but not limited to software source code, documentation
|
||||||
|
source, and configuration files.
|
||||||
|
|
||||||
|
"Object" form shall mean any form resulting from mechanical
|
||||||
|
transformation or translation of a Source form, including but
|
||||||
|
not limited to compiled object code, generated documentation,
|
||||||
|
and conversions to other media types.
|
||||||
|
|
||||||
|
"Work" shall mean the work of authorship, whether in Source or
|
||||||
|
Object form, made available under the License, as indicated by a
|
||||||
|
copyright notice that is included in or attached to the work
|
||||||
|
(an example is provided in the Appendix below).
|
||||||
|
|
||||||
|
"Derivative Works" shall mean any work, whether in Source or Object
|
||||||
|
form, that is based on (or derived from) the Work and for which the
|
||||||
|
editorial revisions, annotations, elaborations, or other modifications
|
||||||
|
represent, as a whole, an original work of authorship. For the purposes
|
||||||
|
of this License, Derivative Works shall not include works that remain
|
||||||
|
separable from, or merely link (or bind by name) to the interfaces of,
|
||||||
|
the Work and Derivative Works thereof.
|
||||||
|
|
||||||
|
"Contribution" shall mean any work of authorship, including
|
||||||
|
the original version of the Work and any modifications or additions
|
||||||
|
to that Work or Derivative Works thereof, that is intentionally
|
||||||
|
submitted to Licensor for inclusion in the Work by the copyright owner
|
||||||
|
or by an individual or Legal Entity authorized to submit on behalf of
|
||||||
|
the copyright owner. For the purposes of this definition, "submitted"
|
||||||
|
means any form of electronic, verbal, or written communication sent
|
||||||
|
to the Licensor or its representatives, including but not limited to
|
||||||
|
communication on electronic mailing lists, source code control systems,
|
||||||
|
and issue tracking systems that are managed by, or on behalf of, the
|
||||||
|
Licensor for the purpose of discussing and improving the Work, but
|
||||||
|
excluding communication that is conspicuously marked or otherwise
|
||||||
|
designated in writing by the copyright owner as "Not a Contribution."
|
||||||
|
|
||||||
|
"Contributor" shall mean Licensor and any individual or Legal Entity
|
||||||
|
on behalf of whom a Contribution has been received by Licensor and
|
||||||
|
subsequently incorporated within the Work.
|
||||||
|
|
||||||
|
2. Grant of Copyright License. Subject to the terms and conditions of
|
||||||
|
this License, each Contributor hereby grants to You a perpetual,
|
||||||
|
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||||
|
copyright license to reproduce, prepare Derivative Works of,
|
||||||
|
publicly display, publicly perform, sublicense, and distribute the
|
||||||
|
Work and such Derivative Works in Source or Object form.
|
||||||
|
|
||||||
|
3. Grant of Patent License. Subject to the terms and conditions of
|
||||||
|
this License, each Contributor hereby grants to You a perpetual,
|
||||||
|
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||||
|
(except as stated in this section) patent license to make, have made,
|
||||||
|
use, offer to sell, sell, import, and otherwise transfer the Work,
|
||||||
|
where such license applies only to those patent claims licensable
|
||||||
|
by such Contributor that are necessarily infringed by their
|
||||||
|
Contribution(s) alone or by combination of their Contribution(s)
|
||||||
|
with the Work to which such Contribution(s) was submitted. If You
|
||||||
|
institute patent litigation against any entity (including a
|
||||||
|
cross-claim or counterclaim in a lawsuit) alleging that the Work
|
||||||
|
or a Contribution incorporated within the Work constitutes direct
|
||||||
|
or contributory patent infringement, then any patent licenses
|
||||||
|
granted to You under this License for that Work shall terminate
|
||||||
|
as of the date such litigation is filed.
|
||||||
|
|
||||||
|
4. Redistribution. You may reproduce and distribute copies of the
|
||||||
|
Work or Derivative Works thereof in any medium, with or without
|
||||||
|
modifications, and in Source or Object form, provided that You
|
||||||
|
meet the following conditions:
|
||||||
|
|
||||||
|
(a) You must give any other recipients of the Work or
|
||||||
|
Derivative Works a copy of this License; and
|
||||||
|
|
||||||
|
(b) You must cause any modified files to carry prominent notices
|
||||||
|
stating that You changed the files; and
|
||||||
|
|
||||||
|
(c) You must retain, in the Source form of any Derivative Works
|
||||||
|
that You distribute, all copyright, patent, trademark, and
|
||||||
|
attribution notices from the Source form of the Work,
|
||||||
|
excluding those notices that do not pertain to any part of
|
||||||
|
the Derivative Works; and
|
||||||
|
|
||||||
|
(d) If the Work includes a "NOTICE" text file as part of its
|
||||||
|
distribution, then any Derivative Works that You distribute must
|
||||||
|
include a readable copy of the attribution notices contained
|
||||||
|
within such NOTICE file, excluding those notices that do not
|
||||||
|
pertain to any part of the Derivative Works, in at least one
|
||||||
|
of the following places: within a NOTICE text file distributed
|
||||||
|
as part of the Derivative Works; within the Source form or
|
||||||
|
documentation, if provided along with the Derivative Works; or,
|
||||||
|
within a display generated by the Derivative Works, if and
|
||||||
|
wherever such third-party notices normally appear. The contents
|
||||||
|
of the NOTICE file are for informational purposes only and
|
||||||
|
do not modify the License. You may add Your own attribution
|
||||||
|
notices within Derivative Works that You distribute, alongside
|
||||||
|
or as an addendum to the NOTICE text from the Work, provided
|
||||||
|
that such additional attribution notices cannot be construed
|
||||||
|
as modifying the License.
|
||||||
|
|
||||||
|
You may add Your own copyright statement to Your modifications and
|
||||||
|
may provide additional or different license terms and conditions
|
||||||
|
for use, reproduction, or distribution of Your modifications, or
|
||||||
|
for any such Derivative Works as a whole, provided Your use,
|
||||||
|
reproduction, and distribution of the Work otherwise complies with
|
||||||
|
the conditions stated in this License.
|
||||||
|
|
||||||
|
5. Submission of Contributions. Unless You explicitly state otherwise,
|
||||||
|
any Contribution intentionally submitted for inclusion in the Work
|
||||||
|
by You to the Licensor shall be under the terms and conditions of
|
||||||
|
this License, without any additional terms or conditions.
|
||||||
|
Notwithstanding the above, nothing herein shall supersede or modify
|
||||||
|
the terms of any separate license agreement you may have executed
|
||||||
|
with Licensor regarding such Contributions.
|
||||||
|
|
||||||
|
6. Trademarks. This License does not grant permission to use the trade
|
||||||
|
names, trademarks, service marks, or product names of the Licensor,
|
||||||
|
except as required for reasonable and customary use in describing the
|
||||||
|
origin of the Work and reproducing the content of the NOTICE file.
|
||||||
|
|
||||||
|
7. Disclaimer of Warranty. Unless required by applicable law or
|
||||||
|
agreed to in writing, Licensor provides the Work (and each
|
||||||
|
Contributor provides its Contributions) on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
implied, including, without limitation, any warranties or conditions
|
||||||
|
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
|
||||||
|
PARTICULAR PURPOSE. You are solely responsible for determining the
|
||||||
|
appropriateness of using or redistributing the Work and assume any
|
||||||
|
risks associated with Your exercise of permissions under this License.
|
||||||
|
|
||||||
|
8. Limitation of Liability. In no event and under no legal theory,
|
||||||
|
whether in tort (including negligence), contract, or otherwise,
|
||||||
|
unless required by applicable law (such as deliberate and grossly
|
||||||
|
negligent acts) or agreed to in writing, shall any Contributor be
|
||||||
|
liable to You for damages, including any direct, indirect, special,
|
||||||
|
incidental, or consequential damages of any character arising as a
|
||||||
|
result of this License or out of the use or inability to use the
|
||||||
|
Work (including but not limited to damages for loss of goodwill,
|
||||||
|
work stoppage, computer failure or malfunction, or any and all
|
||||||
|
other commercial damages or losses), even if such Contributor
|
||||||
|
has been advised of the possibility of such damages.
|
||||||
|
|
||||||
|
9. Accepting Warranty or Additional Liability. While redistributing
|
||||||
|
the Work or Derivative Works thereof, You may choose to offer,
|
||||||
|
and charge a fee for, acceptance of support, warranty, indemnity,
|
||||||
|
or other liability obligations and/or rights consistent with this
|
||||||
|
License. However, in accepting such obligations, You may act only
|
||||||
|
on Your own behalf and on Your sole responsibility, not on behalf
|
||||||
|
of any other Contributor, and only if You agree to indemnify,
|
||||||
|
defend, and hold each Contributor harmless for any liability
|
||||||
|
incurred by, or claims asserted against, such Contributor by reason
|
||||||
|
of your accepting any such warranty or additional liability.
|
||||||
|
|
||||||
|
END OF TERMS AND CONDITIONS
|
||||||
|
|
||||||
|
Copyright 2025, anlicheng <244108715@qq.com>.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
|
||||||
9
README.md
Normal file
9
README.md
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
emqtt
|
||||||
|
=====
|
||||||
|
|
||||||
|
An OTP library
|
||||||
|
|
||||||
|
Build
|
||||||
|
-----
|
||||||
|
|
||||||
|
$ rebar3 compile
|
||||||
535
include/emqtt.hrl
Normal file
535
include/emqtt.hrl
Normal file
@ -0,0 +1,535 @@
|
|||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-ifndef(EMQTT_HRL).
|
||||||
|
-define(EMQTT_HRL, true).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% MQTT Protocol Version and Names
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-define(MQTT_PROTO_V3, 3).
|
||||||
|
-define(MQTT_PROTO_V4, 4).
|
||||||
|
-define(MQTT_PROTO_V5, 5).
|
||||||
|
|
||||||
|
-define(PROTOCOL_NAMES, [
|
||||||
|
{?MQTT_PROTO_V3, <<"MQIsdp">>},
|
||||||
|
{?MQTT_PROTO_V4, <<"MQTT">>},
|
||||||
|
{?MQTT_PROTO_V5, <<"MQTT">>}]).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% MQTT QoS Levels
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-define(QOS_0, 0). %% At most once
|
||||||
|
-define(QOS_1, 1). %% At least once
|
||||||
|
-define(QOS_2, 2). %% Exactly once
|
||||||
|
|
||||||
|
-define(IS_QOS(I), (I >= ?QOS_0 andalso I =< ?QOS_2)).
|
||||||
|
|
||||||
|
-define(QOS_I(Name),
|
||||||
|
begin
|
||||||
|
(case Name of
|
||||||
|
?QOS_0 -> ?QOS_0;
|
||||||
|
qos0 -> ?QOS_0;
|
||||||
|
at_most_once -> ?QOS_0;
|
||||||
|
?QOS_1 -> ?QOS_1;
|
||||||
|
qos1 -> ?QOS_1;
|
||||||
|
at_least_once -> ?QOS_1;
|
||||||
|
?QOS_2 -> ?QOS_2;
|
||||||
|
qos2 -> ?QOS_2;
|
||||||
|
exactly_once -> ?QOS_2
|
||||||
|
end)
|
||||||
|
end).
|
||||||
|
|
||||||
|
-define(IS_QOS_NAME(I),
|
||||||
|
(I =:= qos0 orelse I =:= at_most_once orelse
|
||||||
|
I =:= qos1 orelse I =:= at_least_once orelse
|
||||||
|
I =:= qos2 orelse I =:= exactly_once)).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Maximum ClientId Length.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-define(MAX_CLIENTID_LEN, 65535).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% MQTT Control Packet Types
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-define(RESERVED, 0). %% Reserved
|
||||||
|
-define(CONNECT, 1). %% Client request to connect to Server
|
||||||
|
-define(CONNACK, 2). %% Server to Client: Connect acknowledgment
|
||||||
|
-define(PUBLISH, 3). %% Publish message
|
||||||
|
-define(PUBACK, 4). %% Publish acknowledgment
|
||||||
|
-define(PUBREC, 5). %% Publish received (assured delivery part 1)
|
||||||
|
-define(PUBREL, 6). %% Publish release (assured delivery part 2)
|
||||||
|
-define(PUBCOMP, 7). %% Publish complete (assured delivery part 3)
|
||||||
|
-define(SUBSCRIBE, 8). %% Client subscribe request
|
||||||
|
-define(SUBACK, 9). %% Server Subscribe acknowledgment
|
||||||
|
-define(UNSUBSCRIBE, 10). %% Unsubscribe request
|
||||||
|
-define(UNSUBACK, 11). %% Unsubscribe acknowledgment
|
||||||
|
-define(PINGREQ, 12). %% PING request
|
||||||
|
-define(PINGRESP, 13). %% PING response
|
||||||
|
-define(DISCONNECT, 14). %% Client or Server is disconnecting
|
||||||
|
-define(AUTH, 15). %% Authentication exchange
|
||||||
|
|
||||||
|
-define(TYPE_NAMES, [
|
||||||
|
'CONNECT',
|
||||||
|
'CONNACK',
|
||||||
|
'PUBLISH',
|
||||||
|
'PUBACK',
|
||||||
|
'PUBREC',
|
||||||
|
'PUBREL',
|
||||||
|
'PUBCOMP',
|
||||||
|
'SUBSCRIBE',
|
||||||
|
'SUBACK',
|
||||||
|
'UNSUBSCRIBE',
|
||||||
|
'UNSUBACK',
|
||||||
|
'PINGREQ',
|
||||||
|
'PINGRESP',
|
||||||
|
'DISCONNECT',
|
||||||
|
'AUTH']).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% MQTT V3.1.1 Connect Return Codes
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-define(CONNACK_ACCEPT, 0). %% Connection accepted
|
||||||
|
-define(CONNACK_PROTO_VER, 1). %% Unacceptable protocol version
|
||||||
|
-define(CONNACK_INVALID_ID, 2). %% Client Identifier is correct UTF-8 but not allowed by the Server
|
||||||
|
-define(CONNACK_SERVER, 3). %% Server unavailable
|
||||||
|
-define(CONNACK_CREDENTIALS, 4). %% Username or password is malformed
|
||||||
|
-define(CONNACK_AUTH, 5). %% Client is not authorized to connect
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% MQTT V5.0 Reason Codes
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-define(RC_SUCCESS, 16#00).
|
||||||
|
-define(RC_NORMAL_DISCONNECTION, 16#00).
|
||||||
|
-define(RC_GRANTED_QOS_0, 16#00).
|
||||||
|
-define(RC_GRANTED_QOS_1, 16#01).
|
||||||
|
-define(RC_GRANTED_QOS_2, 16#02).
|
||||||
|
-define(RC_DISCONNECT_WITH_WILL_MESSAGE, 16#04).
|
||||||
|
-define(RC_NO_MATCHING_SUBSCRIBERS, 16#10).
|
||||||
|
-define(RC_NO_SUBSCRIPTION_EXISTED, 16#11).
|
||||||
|
-define(RC_CONTINUE_AUTHENTICATION, 16#18).
|
||||||
|
-define(RC_RE_AUTHENTICATE, 16#19).
|
||||||
|
-define(RC_UNSPECIFIED_ERROR, 16#80).
|
||||||
|
-define(RC_MALFORMED_PACKET, 16#81).
|
||||||
|
-define(RC_PROTOCOL_ERROR, 16#82).
|
||||||
|
-define(RC_IMPLEMENTATION_SPECIFIC_ERROR, 16#83).
|
||||||
|
-define(RC_UNSUPPORTED_PROTOCOL_VERSION, 16#84).
|
||||||
|
-define(RC_CLIENT_IDENTIFIER_NOT_VALID, 16#85).
|
||||||
|
-define(RC_BAD_USER_NAME_OR_PASSWORD, 16#86).
|
||||||
|
-define(RC_NOT_AUTHORIZED, 16#87).
|
||||||
|
-define(RC_SERVER_UNAVAILABLE, 16#88).
|
||||||
|
-define(RC_SERVER_BUSY, 16#89).
|
||||||
|
-define(RC_BANNED, 16#8A).
|
||||||
|
-define(RC_SERVER_SHUTTING_DOWN, 16#8B).
|
||||||
|
-define(RC_BAD_AUTHENTICATION_METHOD, 16#8C).
|
||||||
|
-define(RC_KEEP_ALIVE_TIMEOUT, 16#8D).
|
||||||
|
-define(RC_SESSION_TAKEN_OVER, 16#8E).
|
||||||
|
-define(RC_TOPIC_FILTER_INVALID, 16#8F).
|
||||||
|
-define(RC_TOPIC_NAME_INVALID, 16#90).
|
||||||
|
-define(RC_PACKET_IDENTIFIER_IN_USE, 16#91).
|
||||||
|
-define(RC_PACKET_IDENTIFIER_NOT_FOUND, 16#92).
|
||||||
|
-define(RC_RECEIVE_MAXIMUM_EXCEEDED, 16#93).
|
||||||
|
-define(RC_TOPIC_ALIAS_INVALID, 16#94).
|
||||||
|
-define(RC_PACKET_TOO_LARGE, 16#95).
|
||||||
|
-define(RC_MESSAGE_RATE_TOO_HIGH, 16#96).
|
||||||
|
-define(RC_QUOTA_EXCEEDED, 16#97).
|
||||||
|
-define(RC_ADMINISTRATIVE_ACTION, 16#98).
|
||||||
|
-define(RC_PAYLOAD_FORMAT_INVALID, 16#99).
|
||||||
|
-define(RC_RETAIN_NOT_SUPPORTED, 16#9A).
|
||||||
|
-define(RC_QOS_NOT_SUPPORTED, 16#9B).
|
||||||
|
-define(RC_USE_ANOTHER_SERVER, 16#9C).
|
||||||
|
-define(RC_SERVER_MOVED, 16#9D).
|
||||||
|
-define(RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED, 16#9E).
|
||||||
|
-define(RC_CONNECTION_RATE_EXCEEDED, 16#9F).
|
||||||
|
-define(RC_MAXIMUM_CONNECT_TIME, 16#A0).
|
||||||
|
-define(RC_SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED, 16#A1).
|
||||||
|
-define(RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED, 16#A2).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Maximum MQTT Packet ID and Length
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-define(MAX_PACKET_ID, 16#ffff).
|
||||||
|
-define(MAX_PACKET_SIZE, 16#fffffff).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% MQTT Frame Mask
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-define(HIGHBIT, 2#10000000).
|
||||||
|
-define(LOWBITS, 2#01111111).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% MQTT Packet Fixed Header
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-record(mqtt_packet_header, {
|
||||||
|
type = ?RESERVED,
|
||||||
|
dup = false,
|
||||||
|
qos = ?QOS_0,
|
||||||
|
retain = false
|
||||||
|
}).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% MQTT Packets
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-define(DEFAULT_SUBOPTS, #{rh => 0, %% Retain Handling
|
||||||
|
rap => 0, %% Retain as Publish
|
||||||
|
nl => 0, %% No Local
|
||||||
|
qos => 0 %% QoS
|
||||||
|
}).
|
||||||
|
|
||||||
|
-record(mqtt_packet_connect, {
|
||||||
|
proto_name = <<"MQTT">>,
|
||||||
|
proto_ver = ?MQTT_PROTO_V4,
|
||||||
|
is_bridge = false,
|
||||||
|
clean_start = true,
|
||||||
|
will_flag = false,
|
||||||
|
will_qos = ?QOS_0,
|
||||||
|
will_retain = false,
|
||||||
|
keepalive = 0,
|
||||||
|
properties = undefined,
|
||||||
|
clientid = <<>>,
|
||||||
|
will_props = undefined,
|
||||||
|
will_topic = undefined,
|
||||||
|
will_payload = undefined,
|
||||||
|
username = undefined,
|
||||||
|
password = undefined
|
||||||
|
}).
|
||||||
|
|
||||||
|
-record(mqtt_packet_connack, {
|
||||||
|
ack_flags,
|
||||||
|
reason_code,
|
||||||
|
properties
|
||||||
|
}).
|
||||||
|
|
||||||
|
-record(mqtt_packet_publish, {
|
||||||
|
topic_name,
|
||||||
|
packet_id,
|
||||||
|
properties
|
||||||
|
}).
|
||||||
|
|
||||||
|
-record(mqtt_packet_puback, {
|
||||||
|
packet_id,
|
||||||
|
reason_code,
|
||||||
|
properties
|
||||||
|
}).
|
||||||
|
|
||||||
|
-record(mqtt_packet_subscribe, {
|
||||||
|
packet_id,
|
||||||
|
properties,
|
||||||
|
topic_filters
|
||||||
|
}).
|
||||||
|
|
||||||
|
-record(mqtt_packet_suback, {
|
||||||
|
packet_id,
|
||||||
|
properties,
|
||||||
|
reason_codes
|
||||||
|
}).
|
||||||
|
|
||||||
|
-record(mqtt_packet_unsubscribe, {
|
||||||
|
packet_id,
|
||||||
|
properties,
|
||||||
|
topic_filters
|
||||||
|
}).
|
||||||
|
|
||||||
|
-record(mqtt_packet_unsuback, {
|
||||||
|
packet_id,
|
||||||
|
properties,
|
||||||
|
reason_codes
|
||||||
|
}).
|
||||||
|
|
||||||
|
-record(mqtt_packet_disconnect, {
|
||||||
|
reason_code,
|
||||||
|
properties
|
||||||
|
}).
|
||||||
|
|
||||||
|
-record(mqtt_packet_auth, {
|
||||||
|
reason_code,
|
||||||
|
properties
|
||||||
|
}).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% MQTT Message
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-record(mqtt_msg, {
|
||||||
|
qos = ?QOS_0 :: emqtt:qos(),
|
||||||
|
retain = false :: boolean(),
|
||||||
|
dup = false :: boolean(),
|
||||||
|
packet_id :: emqtt:packet_id(),
|
||||||
|
topic :: emqtt:topic(),
|
||||||
|
props :: emqtt:properties(),
|
||||||
|
payload :: binary()
|
||||||
|
}).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% MQTT Control Packet
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-record(mqtt_packet, {
|
||||||
|
header :: #mqtt_packet_header{},
|
||||||
|
variable :: #mqtt_packet_connect{}
|
||||||
|
| #mqtt_packet_connack{}
|
||||||
|
| #mqtt_packet_publish{}
|
||||||
|
| #mqtt_packet_puback{}
|
||||||
|
| #mqtt_packet_subscribe{}
|
||||||
|
| #mqtt_packet_suback{}
|
||||||
|
| #mqtt_packet_unsubscribe{}
|
||||||
|
| #mqtt_packet_unsuback{}
|
||||||
|
| #mqtt_packet_disconnect{}
|
||||||
|
| #mqtt_packet_auth{}
|
||||||
|
| pos_integer()
|
||||||
|
| undefined,
|
||||||
|
payload :: binary() | undefined
|
||||||
|
}).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% MQTT Packet Match
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-define(CONNECT_PACKET(Var),
|
||||||
|
#mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT},
|
||||||
|
variable = Var}).
|
||||||
|
|
||||||
|
-define(CONNACK_PACKET(ReasonCode),
|
||||||
|
#mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK},
|
||||||
|
variable = #mqtt_packet_connack{ack_flags = 0,
|
||||||
|
reason_code = ReasonCode}
|
||||||
|
}).
|
||||||
|
|
||||||
|
-define(CONNACK_PACKET(ReasonCode, SessPresent),
|
||||||
|
#mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK},
|
||||||
|
variable = #mqtt_packet_connack{ack_flags = SessPresent,
|
||||||
|
reason_code = ReasonCode}
|
||||||
|
}).
|
||||||
|
|
||||||
|
-define(CONNACK_PACKET(ReasonCode, SessPresent, Properties),
|
||||||
|
#mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK},
|
||||||
|
variable = #mqtt_packet_connack{ack_flags = SessPresent,
|
||||||
|
reason_code = ReasonCode,
|
||||||
|
properties = Properties}
|
||||||
|
}).
|
||||||
|
|
||||||
|
-define(AUTH_PACKET(),
|
||||||
|
#mqtt_packet{header = #mqtt_packet_header{type = ?AUTH},
|
||||||
|
variable = #mqtt_packet_auth{reason_code = 0}
|
||||||
|
}).
|
||||||
|
|
||||||
|
-define(AUTH_PACKET(ReasonCode),
|
||||||
|
#mqtt_packet{header = #mqtt_packet_header{type = ?AUTH},
|
||||||
|
variable = #mqtt_packet_auth{reason_code = ReasonCode}
|
||||||
|
}).
|
||||||
|
|
||||||
|
-define(AUTH_PACKET(ReasonCode, Properties),
|
||||||
|
#mqtt_packet{header = #mqtt_packet_header{type = ?AUTH},
|
||||||
|
variable = #mqtt_packet_auth{reason_code = ReasonCode,
|
||||||
|
properties = Properties}
|
||||||
|
}).
|
||||||
|
|
||||||
|
-define(PUBLISH_PACKET(QoS),
|
||||||
|
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, qos = QoS}}).
|
||||||
|
|
||||||
|
-define(PUBLISH_PACKET(QoS, PacketId),
|
||||||
|
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
|
||||||
|
qos = QoS},
|
||||||
|
variable = #mqtt_packet_publish{packet_id = PacketId}
|
||||||
|
}).
|
||||||
|
|
||||||
|
-define(PUBLISH_PACKET(QoS, Topic, PacketId, Payload),
|
||||||
|
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
|
||||||
|
qos = QoS},
|
||||||
|
variable = #mqtt_packet_publish{topic_name = Topic,
|
||||||
|
packet_id = PacketId},
|
||||||
|
payload = Payload
|
||||||
|
}).
|
||||||
|
|
||||||
|
-define(PUBLISH_PACKET(QoS, Topic, PacketId, Properties, Payload),
|
||||||
|
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
|
||||||
|
qos = QoS},
|
||||||
|
variable = #mqtt_packet_publish{topic_name = Topic,
|
||||||
|
packet_id = PacketId,
|
||||||
|
properties = Properties},
|
||||||
|
payload = Payload
|
||||||
|
}).
|
||||||
|
|
||||||
|
-define(PUBACK_PACKET(PacketId),
|
||||||
|
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBACK},
|
||||||
|
variable = #mqtt_packet_puback{packet_id = PacketId,
|
||||||
|
reason_code = 0}
|
||||||
|
}).
|
||||||
|
|
||||||
|
-define(PUBACK_PACKET(PacketId, ReasonCode),
|
||||||
|
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBACK},
|
||||||
|
variable = #mqtt_packet_puback{packet_id = PacketId,
|
||||||
|
reason_code = ReasonCode}
|
||||||
|
}).
|
||||||
|
|
||||||
|
-define(PUBACK_PACKET(PacketId, ReasonCode, Properties),
|
||||||
|
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBACK},
|
||||||
|
variable = #mqtt_packet_puback{packet_id = PacketId,
|
||||||
|
reason_code = ReasonCode,
|
||||||
|
properties = Properties}
|
||||||
|
}).
|
||||||
|
|
||||||
|
-define(PUBREC_PACKET(PacketId),
|
||||||
|
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBREC},
|
||||||
|
variable = #mqtt_packet_puback{packet_id = PacketId,
|
||||||
|
reason_code = 0}
|
||||||
|
}).
|
||||||
|
|
||||||
|
-define(PUBREC_PACKET(PacketId, ReasonCode),
|
||||||
|
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBREC},
|
||||||
|
variable = #mqtt_packet_puback{packet_id = PacketId,
|
||||||
|
reason_code = ReasonCode}
|
||||||
|
}).
|
||||||
|
|
||||||
|
-define(PUBREC_PACKET(PacketId, ReasonCode, Properties),
|
||||||
|
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBREC},
|
||||||
|
variable = #mqtt_packet_puback{packet_id = PacketId,
|
||||||
|
reason_code = ReasonCode,
|
||||||
|
properties = Properties}
|
||||||
|
}).
|
||||||
|
|
||||||
|
-define(PUBREL_PACKET(PacketId),
|
||||||
|
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBREL,
|
||||||
|
qos = ?QOS_1},
|
||||||
|
variable = #mqtt_packet_puback{packet_id = PacketId,
|
||||||
|
reason_code = 0}
|
||||||
|
}).
|
||||||
|
|
||||||
|
-define(PUBREL_PACKET(PacketId, ReasonCode),
|
||||||
|
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBREL,
|
||||||
|
qos = ?QOS_1},
|
||||||
|
variable = #mqtt_packet_puback{packet_id = PacketId,
|
||||||
|
reason_code = ReasonCode}
|
||||||
|
}).
|
||||||
|
|
||||||
|
-define(PUBREL_PACKET(PacketId, ReasonCode, Properties),
|
||||||
|
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBREL,
|
||||||
|
qos = ?QOS_1},
|
||||||
|
variable = #mqtt_packet_puback{packet_id = PacketId,
|
||||||
|
reason_code = ReasonCode,
|
||||||
|
properties = Properties}
|
||||||
|
}).
|
||||||
|
|
||||||
|
-define(PUBCOMP_PACKET(PacketId),
|
||||||
|
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBCOMP},
|
||||||
|
variable = #mqtt_packet_puback{packet_id = PacketId,
|
||||||
|
reason_code = 0}
|
||||||
|
}).
|
||||||
|
|
||||||
|
-define(PUBCOMP_PACKET(PacketId, ReasonCode),
|
||||||
|
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBCOMP},
|
||||||
|
variable = #mqtt_packet_puback{packet_id = PacketId,
|
||||||
|
reason_code = ReasonCode}
|
||||||
|
}).
|
||||||
|
|
||||||
|
-define(PUBCOMP_PACKET(PacketId, ReasonCode, Properties),
|
||||||
|
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBCOMP},
|
||||||
|
variable = #mqtt_packet_puback{packet_id = PacketId,
|
||||||
|
reason_code = ReasonCode,
|
||||||
|
properties = Properties}
|
||||||
|
}).
|
||||||
|
|
||||||
|
-define(SUBSCRIBE_PACKET(PacketId, TopicFilters),
|
||||||
|
#mqtt_packet{header = #mqtt_packet_header{type = ?SUBSCRIBE,
|
||||||
|
qos = ?QOS_1},
|
||||||
|
variable = #mqtt_packet_subscribe{packet_id = PacketId,
|
||||||
|
topic_filters = TopicFilters}
|
||||||
|
}).
|
||||||
|
|
||||||
|
-define(SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
|
||||||
|
#mqtt_packet{header = #mqtt_packet_header{type = ?SUBSCRIBE,
|
||||||
|
qos = ?QOS_1},
|
||||||
|
variable = #mqtt_packet_subscribe{packet_id = PacketId,
|
||||||
|
properties = Properties,
|
||||||
|
topic_filters = TopicFilters}
|
||||||
|
}).
|
||||||
|
|
||||||
|
-define(SUBACK_PACKET(PacketId, ReasonCodes),
|
||||||
|
#mqtt_packet{header = #mqtt_packet_header{type = ?SUBACK},
|
||||||
|
variable = #mqtt_packet_suback{packet_id = PacketId,
|
||||||
|
reason_codes = ReasonCodes}
|
||||||
|
}).
|
||||||
|
|
||||||
|
-define(SUBACK_PACKET(PacketId, Properties, ReasonCodes),
|
||||||
|
#mqtt_packet{header = #mqtt_packet_header{type = ?SUBACK},
|
||||||
|
variable = #mqtt_packet_suback{packet_id = PacketId,
|
||||||
|
properties = Properties,
|
||||||
|
reason_codes = ReasonCodes}
|
||||||
|
}).
|
||||||
|
|
||||||
|
-define(UNSUBSCRIBE_PACKET(PacketId, TopicFilters),
|
||||||
|
#mqtt_packet{header = #mqtt_packet_header{type = ?UNSUBSCRIBE,
|
||||||
|
qos = ?QOS_1},
|
||||||
|
variable = #mqtt_packet_unsubscribe{packet_id = PacketId,
|
||||||
|
topic_filters = TopicFilters}
|
||||||
|
}).
|
||||||
|
|
||||||
|
-define(UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
|
||||||
|
#mqtt_packet{header = #mqtt_packet_header{type = ?UNSUBSCRIBE,
|
||||||
|
qos = ?QOS_1},
|
||||||
|
variable = #mqtt_packet_unsubscribe{packet_id = PacketId,
|
||||||
|
properties = Properties,
|
||||||
|
topic_filters = TopicFilters}
|
||||||
|
}).
|
||||||
|
|
||||||
|
-define(UNSUBACK_PACKET(PacketId),
|
||||||
|
#mqtt_packet{header = #mqtt_packet_header{type = ?UNSUBACK},
|
||||||
|
variable = #mqtt_packet_unsuback{packet_id = PacketId}
|
||||||
|
}).
|
||||||
|
|
||||||
|
-define(UNSUBACK_PACKET(PacketId, ReasonCodes),
|
||||||
|
#mqtt_packet{header = #mqtt_packet_header{type = ?UNSUBACK},
|
||||||
|
variable = #mqtt_packet_unsuback{packet_id = PacketId,
|
||||||
|
reason_codes = ReasonCodes}
|
||||||
|
}).
|
||||||
|
|
||||||
|
-define(UNSUBACK_PACKET(PacketId, Properties, ReasonCodes),
|
||||||
|
#mqtt_packet{header = #mqtt_packet_header{type = ?UNSUBACK},
|
||||||
|
variable = #mqtt_packet_unsuback{packet_id = PacketId,
|
||||||
|
properties = Properties,
|
||||||
|
reason_codes = ReasonCodes}
|
||||||
|
}).
|
||||||
|
|
||||||
|
-define(DISCONNECT_PACKET(),
|
||||||
|
#mqtt_packet{header = #mqtt_packet_header{type = ?DISCONNECT},
|
||||||
|
variable = #mqtt_packet_disconnect{reason_code = 0}
|
||||||
|
}).
|
||||||
|
|
||||||
|
-define(DISCONNECT_PACKET(ReasonCode),
|
||||||
|
#mqtt_packet{header = #mqtt_packet_header{type = ?DISCONNECT},
|
||||||
|
variable = #mqtt_packet_disconnect{reason_code = ReasonCode}
|
||||||
|
}).
|
||||||
|
|
||||||
|
-define(DISCONNECT_PACKET(ReasonCode, Properties),
|
||||||
|
#mqtt_packet{header = #mqtt_packet_header{type = ?DISCONNECT},
|
||||||
|
variable = #mqtt_packet_disconnect{reason_code = ReasonCode,
|
||||||
|
properties = Properties}
|
||||||
|
}).
|
||||||
|
|
||||||
|
-define(PACKET(Type), #mqtt_packet{header = #mqtt_packet_header{type = Type}}).
|
||||||
|
|
||||||
|
-endif.
|
||||||
2
rebar.config
Normal file
2
rebar.config
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
{erl_opts, [debug_info]}.
|
||||||
|
{deps, []}.
|
||||||
1
rebar.lock
Normal file
1
rebar.lock
Normal file
@ -0,0 +1 @@
|
|||||||
|
[].
|
||||||
14
src/emqtt.app.src
Normal file
14
src/emqtt.app.src
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
{application, emqtt,
|
||||||
|
[{description, "An OTP library"},
|
||||||
|
{vsn, "0.1.0"},
|
||||||
|
{registered, []},
|
||||||
|
{applications,
|
||||||
|
[kernel,
|
||||||
|
stdlib
|
||||||
|
]},
|
||||||
|
{env,[]},
|
||||||
|
{modules, []},
|
||||||
|
|
||||||
|
{licenses, ["Apache-2.0"]},
|
||||||
|
{links, []}
|
||||||
|
]}.
|
||||||
1319
src/emqtt.erl
Normal file
1319
src/emqtt.erl
Normal file
File diff suppressed because it is too large
Load Diff
738
src/emqtt_frame.erl
Normal file
738
src/emqtt_frame.erl
Normal file
@ -0,0 +1,738 @@
|
|||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqtt_frame).
|
||||||
|
|
||||||
|
-include("emqtt.hrl").
|
||||||
|
|
||||||
|
-export([initial_parse_state/0, initial_parse_state/1]).
|
||||||
|
|
||||||
|
-export([parse/1, parse/2, serialize_fun/0, serialize_fun/1, serialize/1, serialize/2 ]).
|
||||||
|
|
||||||
|
-export_type([options/0, parse_state/0, parse_result/0, serialize_fun/0]).
|
||||||
|
|
||||||
|
-type(version() :: ?MQTT_PROTO_V3
|
||||||
|
| ?MQTT_PROTO_V4
|
||||||
|
| ?MQTT_PROTO_V5).
|
||||||
|
|
||||||
|
-type(options() :: #{strict_mode => boolean(),
|
||||||
|
max_size => 1..?MAX_PACKET_SIZE,
|
||||||
|
version => version()}).
|
||||||
|
|
||||||
|
-opaque(parse_state() :: {none, options()} | cont_fun()).
|
||||||
|
|
||||||
|
-opaque(parse_result() :: {more, cont_fun()}
|
||||||
|
| {ok, #mqtt_packet{}, binary(), parse_state()}).
|
||||||
|
|
||||||
|
-type(cont_fun() :: fun((binary()) -> parse_result())).
|
||||||
|
|
||||||
|
-type(serialize_fun() :: fun((emqx_types:packet()) -> iodata())).
|
||||||
|
|
||||||
|
-define(none(Options), {none, Options}).
|
||||||
|
|
||||||
|
-define(DEFAULT_OPTIONS,
|
||||||
|
#{strict_mode => false,
|
||||||
|
max_size => ?MAX_PACKET_SIZE,
|
||||||
|
version => ?MQTT_PROTO_V4
|
||||||
|
}).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Init Parse State
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-spec(initial_parse_state() -> {none, options()}).
|
||||||
|
initial_parse_state() ->
|
||||||
|
initial_parse_state(#{}).
|
||||||
|
|
||||||
|
-spec(initial_parse_state(options()) -> {none, options()}).
|
||||||
|
initial_parse_state(Options) when is_map(Options) ->
|
||||||
|
?none(merge_opts(Options)).
|
||||||
|
|
||||||
|
%% @pivate
|
||||||
|
merge_opts(Options) ->
|
||||||
|
maps:merge(?DEFAULT_OPTIONS, Options).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Parse MQTT Frame
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-spec(parse(binary()) -> parse_result()).
|
||||||
|
parse(Bin) ->
|
||||||
|
parse(Bin, initial_parse_state()).
|
||||||
|
|
||||||
|
-spec(parse(binary(), parse_state()) -> parse_result()).
|
||||||
|
parse(<<>>, {none, Options}) ->
|
||||||
|
{more, fun(Bin) -> parse(Bin, {none, Options}) end};
|
||||||
|
parse(<<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>,
|
||||||
|
{none, Options = #{strict_mode := StrictMode}}) ->
|
||||||
|
%% Validate header if strict mode.
|
||||||
|
StrictMode andalso validate_header(Type, Dup, QoS, Retain),
|
||||||
|
Header = #mqtt_packet_header{type = Type,
|
||||||
|
dup = bool(Dup),
|
||||||
|
qos = QoS,
|
||||||
|
retain = bool(Retain)
|
||||||
|
},
|
||||||
|
Header1 = case fixqos(Type, QoS) of
|
||||||
|
QoS -> Header;
|
||||||
|
FixedQoS -> Header#mqtt_packet_header{qos = FixedQoS}
|
||||||
|
end,
|
||||||
|
parse_remaining_len(Rest, Header1, Options);
|
||||||
|
parse(Bin, Cont) when is_binary(Bin), is_function(Cont) ->
|
||||||
|
Cont(Bin).
|
||||||
|
|
||||||
|
parse_remaining_len(<<>>, Header, Options) ->
|
||||||
|
{more, fun(Bin) -> parse_remaining_len(Bin, Header, Options) end};
|
||||||
|
parse_remaining_len(Rest, Header, Options) ->
|
||||||
|
parse_remaining_len(Rest, Header, 1, 0, Options).
|
||||||
|
|
||||||
|
parse_remaining_len(_Bin, _Header, _Multiplier, Length, #{max_size := MaxSize}) when Length > MaxSize ->
|
||||||
|
error(frame_too_large);
|
||||||
|
parse_remaining_len(<<>>, Header, Multiplier, Length, Options) ->
|
||||||
|
{more, fun(Bin) -> parse_remaining_len(Bin, Header, Multiplier, Length, Options) end};
|
||||||
|
%% Match DISCONNECT without payload
|
||||||
|
parse_remaining_len(<<0:8, Rest/binary>>, Header = #mqtt_packet_header{type = ?DISCONNECT}, 1, 0, Options) ->
|
||||||
|
Packet = packet(Header, #mqtt_packet_disconnect{reason_code = ?RC_SUCCESS}),
|
||||||
|
{ok, Packet, Rest, ?none(Options)};
|
||||||
|
%% Match PINGREQ.
|
||||||
|
parse_remaining_len(<<0:8, Rest/binary>>, Header, 1, 0, Options) ->
|
||||||
|
parse_frame(Rest, Header, 0, Options);
|
||||||
|
%% Match PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK...
|
||||||
|
parse_remaining_len(<<0:1, 2:7, Rest/binary>>, Header, 1, 0, Options) ->
|
||||||
|
parse_frame(Rest, Header, 2, Options);
|
||||||
|
parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Options) ->
|
||||||
|
parse_remaining_len(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier, Options);
|
||||||
|
parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value,
|
||||||
|
Options = #{max_size := MaxSize}) ->
|
||||||
|
FrameLen = Value + Len * Multiplier,
|
||||||
|
if
|
||||||
|
FrameLen > MaxSize ->
|
||||||
|
error(frame_too_large);
|
||||||
|
true ->
|
||||||
|
parse_frame(Rest, Header, FrameLen, Options)
|
||||||
|
end.
|
||||||
|
|
||||||
|
parse_frame(Bin, Header, 0, Options) ->
|
||||||
|
{ok, packet(Header), Bin, ?none(Options)};
|
||||||
|
|
||||||
|
parse_frame(Bin, Header, Length, Options) ->
|
||||||
|
case Bin of
|
||||||
|
<<FrameBin:Length/binary, Rest/binary>> ->
|
||||||
|
case parse_packet(Header, FrameBin, Options) of
|
||||||
|
{Variable, Payload} ->
|
||||||
|
{ok, packet(Header, Variable, Payload), Rest, ?none(Options)};
|
||||||
|
Variable = #mqtt_packet_connect{proto_ver = Ver} ->
|
||||||
|
{ok, packet(Header, Variable), Rest, ?none(Options#{version := Ver})};
|
||||||
|
Variable ->
|
||||||
|
{ok, packet(Header, Variable), Rest, ?none(Options)}
|
||||||
|
end;
|
||||||
|
TooShortBin ->
|
||||||
|
{more, fun(BinMore) ->
|
||||||
|
parse_frame(<<TooShortBin/binary, BinMore/binary>>, Header, Length, Options)
|
||||||
|
end}
|
||||||
|
end.
|
||||||
|
|
||||||
|
-compile({inline, [packet/1, packet/2, packet/3]}).
|
||||||
|
packet(Header) ->
|
||||||
|
#mqtt_packet{header = Header}.
|
||||||
|
packet(Header, Variable) ->
|
||||||
|
#mqtt_packet{header = Header, variable = Variable}.
|
||||||
|
packet(Header, Variable, Payload) ->
|
||||||
|
#mqtt_packet{header = Header, variable = Variable, payload = Payload}.
|
||||||
|
|
||||||
|
parse_packet(#mqtt_packet_header{type = ?CONNECT}, FrameBin, _Options) ->
|
||||||
|
{ProtoName, Rest} = parse_utf8_string(FrameBin),
|
||||||
|
<<BridgeTag:4, ProtoVer:4, Rest1/binary>> = Rest,
|
||||||
|
% Note: Crash when reserved flag doesn't equal to 0, there is no strict
|
||||||
|
% compliance with the MQTT5.0.
|
||||||
|
<<UsernameFlag : 1,
|
||||||
|
PasswordFlag : 1,
|
||||||
|
WillRetain : 1,
|
||||||
|
WillQoS : 2,
|
||||||
|
WillFlag : 1,
|
||||||
|
CleanStart : 1,
|
||||||
|
0 : 1,
|
||||||
|
KeepAlive : 16/big,
|
||||||
|
Rest2/binary>> = Rest1,
|
||||||
|
|
||||||
|
{Properties, Rest3} = parse_properties(Rest2, ProtoVer),
|
||||||
|
{ClientId, Rest4} = parse_utf8_string(Rest3),
|
||||||
|
ConnPacket = #mqtt_packet_connect{proto_name = ProtoName,
|
||||||
|
proto_ver = ProtoVer,
|
||||||
|
is_bridge = (BridgeTag =:= 8),
|
||||||
|
clean_start = bool(CleanStart),
|
||||||
|
will_flag = bool(WillFlag),
|
||||||
|
will_qos = WillQoS,
|
||||||
|
will_retain = bool(WillRetain),
|
||||||
|
keepalive = KeepAlive,
|
||||||
|
properties = Properties,
|
||||||
|
clientid = ClientId
|
||||||
|
},
|
||||||
|
{ConnPacket1, Rest5} = parse_will_message(ConnPacket, Rest4),
|
||||||
|
{Username, Rest6} = parse_utf8_string(Rest5, bool(UsernameFlag)),
|
||||||
|
{Passsword, <<>>} = parse_utf8_string(Rest6, bool(PasswordFlag)),
|
||||||
|
ConnPacket1#mqtt_packet_connect{username = Username, password = Passsword};
|
||||||
|
|
||||||
|
parse_packet(#mqtt_packet_header{type = ?CONNACK},
|
||||||
|
<<AckFlags:8, ReasonCode:8, Rest/binary>>, #{version := Ver}) ->
|
||||||
|
{Properties, <<>>} = parse_properties(Rest, Ver),
|
||||||
|
#mqtt_packet_connack{ack_flags = AckFlags,
|
||||||
|
reason_code = ReasonCode,
|
||||||
|
properties = Properties
|
||||||
|
};
|
||||||
|
|
||||||
|
parse_packet(#mqtt_packet_header{type = ?PUBLISH, qos = QoS}, Bin,
|
||||||
|
#{strict_mode := StrictMode, version := Ver}) ->
|
||||||
|
{TopicName, Rest} = parse_utf8_string(Bin),
|
||||||
|
{PacketId, Rest1} = case QoS of
|
||||||
|
?QOS_0 -> {undefined, Rest};
|
||||||
|
_ -> parse_packet_id(Rest)
|
||||||
|
end,
|
||||||
|
(PacketId =/= undefined) andalso
|
||||||
|
StrictMode andalso validate_packet_id(PacketId),
|
||||||
|
{Properties, Payload} = parse_properties(Rest1, Ver),
|
||||||
|
Publish = #mqtt_packet_publish{topic_name = TopicName,
|
||||||
|
packet_id = PacketId,
|
||||||
|
properties = Properties
|
||||||
|
},
|
||||||
|
{Publish, Payload};
|
||||||
|
|
||||||
|
parse_packet(#mqtt_packet_header{type = PubAck}, <<PacketId:16/big>>, #{strict_mode := StrictMode})
|
||||||
|
when ?PUBACK =< PubAck, PubAck =< ?PUBCOMP ->
|
||||||
|
StrictMode andalso validate_packet_id(PacketId),
|
||||||
|
#mqtt_packet_puback{packet_id = PacketId, reason_code = 0};
|
||||||
|
|
||||||
|
parse_packet(#mqtt_packet_header{type = PubAck}, <<PacketId:16/big, ReasonCode, Rest/binary>>,
|
||||||
|
#{strict_mode := StrictMode, version := Ver = ?MQTT_PROTO_V5})
|
||||||
|
when ?PUBACK =< PubAck, PubAck =< ?PUBCOMP ->
|
||||||
|
StrictMode andalso validate_packet_id(PacketId),
|
||||||
|
{Properties, <<>>} = parse_properties(Rest, Ver),
|
||||||
|
#mqtt_packet_puback{packet_id = PacketId,
|
||||||
|
reason_code = ReasonCode,
|
||||||
|
properties = Properties
|
||||||
|
};
|
||||||
|
|
||||||
|
parse_packet(#mqtt_packet_header{type = ?SUBSCRIBE}, <<PacketId:16/big, Rest/binary>>,
|
||||||
|
#{strict_mode := StrictMode, version := Ver}) ->
|
||||||
|
StrictMode andalso validate_packet_id(PacketId),
|
||||||
|
{Properties, Rest1} = parse_properties(Rest, Ver),
|
||||||
|
TopicFilters = parse_topic_filters(subscribe, Rest1),
|
||||||
|
ok = validate_subqos([QoS || {_, #{qos := QoS}} <- TopicFilters]),
|
||||||
|
#mqtt_packet_subscribe{packet_id = PacketId,
|
||||||
|
properties = Properties,
|
||||||
|
topic_filters = TopicFilters
|
||||||
|
};
|
||||||
|
|
||||||
|
parse_packet(#mqtt_packet_header{type = ?SUBACK}, <<PacketId:16/big, Rest/binary>>,
|
||||||
|
#{strict_mode := StrictMode, version := Ver}) ->
|
||||||
|
StrictMode andalso validate_packet_id(PacketId),
|
||||||
|
{Properties, Rest1} = parse_properties(Rest, Ver),
|
||||||
|
ReasonCodes = parse_reason_codes(Rest1),
|
||||||
|
#mqtt_packet_suback{packet_id = PacketId,
|
||||||
|
properties = Properties,
|
||||||
|
reason_codes = ReasonCodes
|
||||||
|
};
|
||||||
|
|
||||||
|
parse_packet(#mqtt_packet_header{type = ?UNSUBSCRIBE}, <<PacketId:16/big, Rest/binary>>,
|
||||||
|
#{strict_mode := StrictMode, version := Ver}) ->
|
||||||
|
StrictMode andalso validate_packet_id(PacketId),
|
||||||
|
{Properties, Rest1} = parse_properties(Rest, Ver),
|
||||||
|
TopicFilters = parse_topic_filters(unsubscribe, Rest1),
|
||||||
|
#mqtt_packet_unsubscribe{packet_id = PacketId,
|
||||||
|
properties = Properties,
|
||||||
|
topic_filters = TopicFilters
|
||||||
|
};
|
||||||
|
|
||||||
|
parse_packet(#mqtt_packet_header{type = ?UNSUBACK}, <<PacketId:16/big>>,
|
||||||
|
#{strict_mode := StrictMode}) ->
|
||||||
|
StrictMode andalso validate_packet_id(PacketId),
|
||||||
|
#mqtt_packet_unsuback{packet_id = PacketId};
|
||||||
|
|
||||||
|
parse_packet(#mqtt_packet_header{type = ?UNSUBACK}, <<PacketId:16/big, Rest/binary>>,
|
||||||
|
#{strict_mode := StrictMode, version := Ver}) ->
|
||||||
|
StrictMode andalso validate_packet_id(PacketId),
|
||||||
|
{Properties, Rest1} = parse_properties(Rest, Ver),
|
||||||
|
ReasonCodes = parse_reason_codes(Rest1),
|
||||||
|
#mqtt_packet_unsuback{packet_id = PacketId,
|
||||||
|
properties = Properties,
|
||||||
|
reason_codes = ReasonCodes
|
||||||
|
};
|
||||||
|
|
||||||
|
parse_packet(#mqtt_packet_header{type = ?DISCONNECT}, <<ReasonCode, Rest/binary>>,
|
||||||
|
#{version := ?MQTT_PROTO_V5}) ->
|
||||||
|
{Properties, <<>>} = parse_properties(Rest, ?MQTT_PROTO_V5),
|
||||||
|
#mqtt_packet_disconnect{reason_code = ReasonCode,
|
||||||
|
properties = Properties
|
||||||
|
};
|
||||||
|
|
||||||
|
parse_packet(#mqtt_packet_header{type = ?AUTH}, <<ReasonCode, Rest/binary>>,
|
||||||
|
#{version := ?MQTT_PROTO_V5}) ->
|
||||||
|
{Properties, <<>>} = parse_properties(Rest, ?MQTT_PROTO_V5),
|
||||||
|
#mqtt_packet_auth{reason_code = ReasonCode, properties = Properties}.
|
||||||
|
|
||||||
|
parse_will_message(Packet = #mqtt_packet_connect{will_flag = true,
|
||||||
|
proto_ver = Ver}, Bin) ->
|
||||||
|
{Props, Rest} = parse_properties(Bin, Ver),
|
||||||
|
{Topic, Rest1} = parse_utf8_string(Rest),
|
||||||
|
{Payload, Rest2} = parse_binary_data(Rest1),
|
||||||
|
{Packet#mqtt_packet_connect{will_props = Props,
|
||||||
|
will_topic = Topic,
|
||||||
|
will_payload = Payload
|
||||||
|
}, Rest2};
|
||||||
|
parse_will_message(Packet, Bin) -> {Packet, Bin}.
|
||||||
|
|
||||||
|
-compile({inline, [parse_packet_id/1]}).
|
||||||
|
parse_packet_id(<<PacketId:16/big, Rest/binary>>) ->
|
||||||
|
{PacketId, Rest}.
|
||||||
|
|
||||||
|
parse_properties(Bin, Ver) when Ver =/= ?MQTT_PROTO_V5 ->
|
||||||
|
{undefined, Bin};
|
||||||
|
%% TODO: version mess?
|
||||||
|
parse_properties(<<>>, ?MQTT_PROTO_V5) ->
|
||||||
|
{#{}, <<>>};
|
||||||
|
parse_properties(<<0, Rest/binary>>, ?MQTT_PROTO_V5) ->
|
||||||
|
{#{}, Rest};
|
||||||
|
parse_properties(Bin, ?MQTT_PROTO_V5) ->
|
||||||
|
{Len, Rest} = parse_variable_byte_integer(Bin),
|
||||||
|
<<PropsBin:Len/binary, Rest1/binary>> = Rest,
|
||||||
|
{parse_property(PropsBin, #{}), Rest1}.
|
||||||
|
|
||||||
|
parse_property(<<>>, Props) ->
|
||||||
|
Props;
|
||||||
|
parse_property(<<16#01, Val, Bin/binary>>, Props) ->
|
||||||
|
parse_property(Bin, Props#{'Payload-Format-Indicator' => Val});
|
||||||
|
parse_property(<<16#02, Val:32/big, Bin/binary>>, Props) ->
|
||||||
|
parse_property(Bin, Props#{'Message-Expiry-Interval' => Val});
|
||||||
|
parse_property(<<16#03, Bin/binary>>, Props) ->
|
||||||
|
{Val, Rest} = parse_utf8_string(Bin),
|
||||||
|
parse_property(Rest, Props#{'Content-Type' => Val});
|
||||||
|
parse_property(<<16#08, Bin/binary>>, Props) ->
|
||||||
|
{Val, Rest} = parse_utf8_string(Bin),
|
||||||
|
parse_property(Rest, Props#{'Response-Topic' => Val});
|
||||||
|
parse_property(<<16#09, Len:16/big, Val:Len/binary, Bin/binary>>, Props) ->
|
||||||
|
parse_property(Bin, Props#{'Correlation-Data' => Val});
|
||||||
|
parse_property(<<16#0B, Bin/binary>>, Props) ->
|
||||||
|
{Val, Rest} = parse_variable_byte_integer(Bin),
|
||||||
|
parse_property(Rest, Props#{'Subscription-Identifier' => Val});
|
||||||
|
parse_property(<<16#11, Val:32/big, Bin/binary>>, Props) ->
|
||||||
|
parse_property(Bin, Props#{'Session-Expiry-Interval' => Val});
|
||||||
|
parse_property(<<16#12, Bin/binary>>, Props) ->
|
||||||
|
{Val, Rest} = parse_utf8_string(Bin),
|
||||||
|
parse_property(Rest, Props#{'Assigned-Client-Identifier' => Val});
|
||||||
|
parse_property(<<16#13, Val:16, Bin/binary>>, Props) ->
|
||||||
|
parse_property(Bin, Props#{'Server-Keep-Alive' => Val});
|
||||||
|
parse_property(<<16#15, Bin/binary>>, Props) ->
|
||||||
|
{Val, Rest} = parse_utf8_string(Bin),
|
||||||
|
parse_property(Rest, Props#{'Authentication-Method' => Val});
|
||||||
|
parse_property(<<16#16, Len:16/big, Val:Len/binary, Bin/binary>>, Props) ->
|
||||||
|
parse_property(Bin, Props#{'Authentication-Data' => Val});
|
||||||
|
parse_property(<<16#17, Val, Bin/binary>>, Props) ->
|
||||||
|
parse_property(Bin, Props#{'Request-Problem-Information' => Val});
|
||||||
|
parse_property(<<16#18, Val:32, Bin/binary>>, Props) ->
|
||||||
|
parse_property(Bin, Props#{'Will-Delay-Interval' => Val});
|
||||||
|
parse_property(<<16#19, Val, Bin/binary>>, Props) ->
|
||||||
|
parse_property(Bin, Props#{'Request-Response-Information' => Val});
|
||||||
|
parse_property(<<16#1A, Bin/binary>>, Props) ->
|
||||||
|
{Val, Rest} = parse_utf8_string(Bin),
|
||||||
|
parse_property(Rest, Props#{'Response-Information' => Val});
|
||||||
|
parse_property(<<16#1C, Bin/binary>>, Props) ->
|
||||||
|
{Val, Rest} = parse_utf8_string(Bin),
|
||||||
|
parse_property(Rest, Props#{'Server-Reference' => Val});
|
||||||
|
parse_property(<<16#1F, Bin/binary>>, Props) ->
|
||||||
|
{Val, Rest} = parse_utf8_string(Bin),
|
||||||
|
parse_property(Rest, Props#{'Reason-String' => Val});
|
||||||
|
parse_property(<<16#21, Val:16/big, Bin/binary>>, Props) ->
|
||||||
|
parse_property(Bin, Props#{'Receive-Maximum' => Val});
|
||||||
|
parse_property(<<16#22, Val:16/big, Bin/binary>>, Props) ->
|
||||||
|
parse_property(Bin, Props#{'Topic-Alias-Maximum' => Val});
|
||||||
|
parse_property(<<16#23, Val:16/big, Bin/binary>>, Props) ->
|
||||||
|
parse_property(Bin, Props#{'Topic-Alias' => Val});
|
||||||
|
parse_property(<<16#24, Val, Bin/binary>>, Props) ->
|
||||||
|
parse_property(Bin, Props#{'Maximum-QoS' => Val});
|
||||||
|
parse_property(<<16#25, Val, Bin/binary>>, Props) ->
|
||||||
|
parse_property(Bin, Props#{'Retain-Available' => Val});
|
||||||
|
parse_property(<<16#26, Bin/binary>>, Props) ->
|
||||||
|
{Pair, Rest} = parse_utf8_pair(Bin),
|
||||||
|
case maps:find('User-Property', Props) of
|
||||||
|
{ok, UserProps} ->
|
||||||
|
UserProps1 = lists:append(UserProps, [Pair]),
|
||||||
|
parse_property(Rest, Props#{'User-Property' := UserProps1});
|
||||||
|
error ->
|
||||||
|
parse_property(Rest, Props#{'User-Property' => [Pair]})
|
||||||
|
end;
|
||||||
|
parse_property(<<16#27, Val:32, Bin/binary>>, Props) ->
|
||||||
|
parse_property(Bin, Props#{'Maximum-Packet-Size' => Val});
|
||||||
|
parse_property(<<16#28, Val, Bin/binary>>, Props) ->
|
||||||
|
parse_property(Bin, Props#{'Wildcard-Subscription-Available' => Val});
|
||||||
|
parse_property(<<16#29, Val, Bin/binary>>, Props) ->
|
||||||
|
parse_property(Bin, Props#{'Subscription-Identifier-Available' => Val});
|
||||||
|
parse_property(<<16#2A, Val, Bin/binary>>, Props) ->
|
||||||
|
parse_property(Bin, Props#{'Shared-Subscription-Available' => Val}).
|
||||||
|
|
||||||
|
parse_variable_byte_integer(Bin) ->
|
||||||
|
parse_variable_byte_integer(Bin, 1, 0).
|
||||||
|
parse_variable_byte_integer(<<1:1, Len:7, Rest/binary>>, Multiplier, Value) ->
|
||||||
|
parse_variable_byte_integer(Rest, Multiplier * ?HIGHBIT, Value + Len * Multiplier);
|
||||||
|
parse_variable_byte_integer(<<0:1, Len:7, Rest/binary>>, Multiplier, Value) ->
|
||||||
|
{Value + Len * Multiplier, Rest}.
|
||||||
|
|
||||||
|
parse_topic_filters(subscribe, Bin) ->
|
||||||
|
[{Topic, #{rh => Rh, rap => Rap, nl => Nl, qos => QoS}}
|
||||||
|
|| <<Len:16/big, Topic:Len/binary, _:2, Rh:2, Rap:1, Nl:1, QoS:2>> <= Bin];
|
||||||
|
|
||||||
|
parse_topic_filters(unsubscribe, Bin) ->
|
||||||
|
[Topic || <<Len:16/big, Topic:Len/binary>> <= Bin].
|
||||||
|
|
||||||
|
parse_reason_codes(Bin) ->
|
||||||
|
[Code || <<Code>> <= Bin].
|
||||||
|
|
||||||
|
parse_utf8_pair(<<Len1:16/big, Key:Len1/binary,
|
||||||
|
Len2:16/big, Val:Len2/binary, Rest/binary>>) ->
|
||||||
|
{{Key, Val}, Rest}.
|
||||||
|
|
||||||
|
parse_utf8_string(Bin, false) ->
|
||||||
|
{undefined, Bin};
|
||||||
|
parse_utf8_string(Bin, true) ->
|
||||||
|
parse_utf8_string(Bin).
|
||||||
|
|
||||||
|
parse_utf8_string(<<Len:16/big, Str:Len/binary, Rest/binary>>) ->
|
||||||
|
{Str, Rest}.
|
||||||
|
|
||||||
|
parse_binary_data(<<Len:16/big, Data:Len/binary, Rest/binary>>) ->
|
||||||
|
{Data, Rest}.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Serialize MQTT Packet
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
serialize_fun() -> serialize_fun(?DEFAULT_OPTIONS).
|
||||||
|
|
||||||
|
serialize_fun(#mqtt_packet_connect{proto_ver = ProtoVer, properties = ConnProps}) ->
|
||||||
|
MaxSize = get_property('Maximum-Packet-Size', ConnProps, ?MAX_PACKET_SIZE),
|
||||||
|
serialize_fun(#{version => ProtoVer, max_size => MaxSize});
|
||||||
|
|
||||||
|
serialize_fun(#{version := Ver, max_size := MaxSize}) ->
|
||||||
|
fun(Packet) ->
|
||||||
|
IoData = serialize(Packet, Ver),
|
||||||
|
case is_too_large(IoData, MaxSize) of
|
||||||
|
true -> <<>>;
|
||||||
|
false -> IoData
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec(serialize(#mqtt_packet{}) -> iodata()).
|
||||||
|
serialize(Packet) -> serialize(Packet, ?MQTT_PROTO_V4).
|
||||||
|
|
||||||
|
-spec(serialize(#mqtt_packet{}, version()) -> iodata()).
|
||||||
|
serialize(#mqtt_packet{header = Header,
|
||||||
|
variable = Variable,
|
||||||
|
payload = Payload}, Ver) ->
|
||||||
|
serialize(Header, serialize_variable(Variable, Ver), serialize_payload(Payload)).
|
||||||
|
|
||||||
|
serialize(#mqtt_packet_header{type = Type,
|
||||||
|
dup = Dup,
|
||||||
|
qos = QoS,
|
||||||
|
retain = Retain
|
||||||
|
}, VariableBin, PayloadBin)
|
||||||
|
when ?CONNECT =< Type andalso Type =< ?AUTH ->
|
||||||
|
Len = iolist_size(VariableBin) + iolist_size(PayloadBin),
|
||||||
|
[<<Type:4, (flag(Dup)):1, (flag(QoS)):2, (flag(Retain)):1>>,
|
||||||
|
serialize_remaining_len(Len), VariableBin, PayloadBin].
|
||||||
|
|
||||||
|
serialize_variable(#mqtt_packet_connect{
|
||||||
|
proto_name = ProtoName,
|
||||||
|
proto_ver = ProtoVer,
|
||||||
|
is_bridge = IsBridge,
|
||||||
|
clean_start = CleanStart,
|
||||||
|
will_flag = WillFlag,
|
||||||
|
will_qos = WillQoS,
|
||||||
|
will_retain = WillRetain,
|
||||||
|
keepalive = KeepAlive,
|
||||||
|
properties = Properties,
|
||||||
|
clientid = ClientId,
|
||||||
|
will_props = WillProps,
|
||||||
|
will_topic = WillTopic,
|
||||||
|
will_payload = WillPayload,
|
||||||
|
username = Username,
|
||||||
|
password = Password}, _Ver) ->
|
||||||
|
[serialize_binary_data(ProtoName),
|
||||||
|
<<(case IsBridge of
|
||||||
|
true -> 16#80 + ProtoVer;
|
||||||
|
false -> ProtoVer
|
||||||
|
end):8,
|
||||||
|
(flag(Username)):1,
|
||||||
|
(flag(Password)):1,
|
||||||
|
(flag(WillRetain)):1,
|
||||||
|
WillQoS:2,
|
||||||
|
(flag(WillFlag)):1,
|
||||||
|
(flag(CleanStart)):1,
|
||||||
|
0:1,
|
||||||
|
KeepAlive:16/big-unsigned-integer>>,
|
||||||
|
serialize_properties(Properties, ProtoVer),
|
||||||
|
serialize_utf8_string(ClientId),
|
||||||
|
case WillFlag of
|
||||||
|
true -> [serialize_properties(WillProps, ProtoVer),
|
||||||
|
serialize_utf8_string(WillTopic),
|
||||||
|
serialize_binary_data(WillPayload)];
|
||||||
|
false -> <<>>
|
||||||
|
end,
|
||||||
|
serialize_utf8_string(Username, true),
|
||||||
|
serialize_utf8_string(Password, true)];
|
||||||
|
|
||||||
|
serialize_variable(#mqtt_packet_connack{ack_flags = AckFlags,
|
||||||
|
reason_code = ReasonCode,
|
||||||
|
properties = Properties}, Ver) ->
|
||||||
|
[AckFlags, ReasonCode, serialize_properties(Properties, Ver)];
|
||||||
|
|
||||||
|
serialize_variable(#mqtt_packet_publish{topic_name = TopicName,
|
||||||
|
packet_id = PacketId,
|
||||||
|
properties = Properties}, Ver) ->
|
||||||
|
[serialize_utf8_string(TopicName),
|
||||||
|
if
|
||||||
|
PacketId =:= undefined -> <<>>;
|
||||||
|
true -> <<PacketId:16/big-unsigned-integer>>
|
||||||
|
end,
|
||||||
|
serialize_properties(Properties, Ver)];
|
||||||
|
|
||||||
|
serialize_variable(#mqtt_packet_puback{packet_id = PacketId}, Ver)
|
||||||
|
when Ver == ?MQTT_PROTO_V3; Ver == ?MQTT_PROTO_V4 ->
|
||||||
|
<<PacketId:16/big-unsigned-integer>>;
|
||||||
|
serialize_variable(#mqtt_packet_puback{packet_id = PacketId,
|
||||||
|
reason_code = ReasonCode,
|
||||||
|
properties = Properties
|
||||||
|
},
|
||||||
|
Ver = ?MQTT_PROTO_V5) ->
|
||||||
|
[<<PacketId:16/big-unsigned-integer>>, ReasonCode,
|
||||||
|
serialize_properties(Properties, Ver)];
|
||||||
|
|
||||||
|
serialize_variable(#mqtt_packet_subscribe{packet_id = PacketId,
|
||||||
|
properties = Properties,
|
||||||
|
topic_filters = TopicFilters}, Ver) ->
|
||||||
|
[<<PacketId:16/big-unsigned-integer>>, serialize_properties(Properties, Ver),
|
||||||
|
serialize_topic_filters(subscribe, TopicFilters, Ver)];
|
||||||
|
|
||||||
|
serialize_variable(#mqtt_packet_suback{packet_id = PacketId,
|
||||||
|
properties = Properties,
|
||||||
|
reason_codes = ReasonCodes}, Ver) ->
|
||||||
|
[<<PacketId:16/big-unsigned-integer>>, serialize_properties(Properties, Ver),
|
||||||
|
serialize_reason_codes(ReasonCodes)];
|
||||||
|
|
||||||
|
serialize_variable(#mqtt_packet_unsubscribe{packet_id = PacketId,
|
||||||
|
properties = Properties,
|
||||||
|
topic_filters = TopicFilters}, Ver) ->
|
||||||
|
[<<PacketId:16/big-unsigned-integer>>, serialize_properties(Properties, Ver),
|
||||||
|
serialize_topic_filters(unsubscribe, TopicFilters, Ver)];
|
||||||
|
|
||||||
|
serialize_variable(#mqtt_packet_unsuback{packet_id = PacketId,
|
||||||
|
properties = Properties,
|
||||||
|
reason_codes = ReasonCodes}, Ver) ->
|
||||||
|
[<<PacketId:16/big-unsigned-integer>>, serialize_properties(Properties, Ver),
|
||||||
|
serialize_reason_codes(ReasonCodes)];
|
||||||
|
|
||||||
|
serialize_variable(#mqtt_packet_disconnect{}, Ver)
|
||||||
|
when Ver == ?MQTT_PROTO_V3; Ver == ?MQTT_PROTO_V4 ->
|
||||||
|
<<>>;
|
||||||
|
|
||||||
|
serialize_variable(#mqtt_packet_disconnect{reason_code = ReasonCode,
|
||||||
|
properties = Properties},
|
||||||
|
Ver = ?MQTT_PROTO_V5) ->
|
||||||
|
[ReasonCode, serialize_properties(Properties, Ver)];
|
||||||
|
serialize_variable(#mqtt_packet_disconnect{}, _Ver) ->
|
||||||
|
<<>>;
|
||||||
|
|
||||||
|
serialize_variable(#mqtt_packet_auth{reason_code = ReasonCode,
|
||||||
|
properties = Properties},
|
||||||
|
Ver = ?MQTT_PROTO_V5) ->
|
||||||
|
[ReasonCode, serialize_properties(Properties, Ver)];
|
||||||
|
|
||||||
|
serialize_variable(PacketId, ?MQTT_PROTO_V3) when is_integer(PacketId) ->
|
||||||
|
<<PacketId:16/big-unsigned-integer>>;
|
||||||
|
serialize_variable(PacketId, ?MQTT_PROTO_V4) when is_integer(PacketId) ->
|
||||||
|
<<PacketId:16/big-unsigned-integer>>;
|
||||||
|
serialize_variable(undefined, _Ver) ->
|
||||||
|
<<>>.
|
||||||
|
|
||||||
|
serialize_payload(undefined) -> <<>>;
|
||||||
|
serialize_payload(Bin) -> Bin.
|
||||||
|
|
||||||
|
serialize_properties(_Props, Ver) when Ver =/= ?MQTT_PROTO_V5 ->
|
||||||
|
<<>>;
|
||||||
|
serialize_properties(Props, ?MQTT_PROTO_V5) ->
|
||||||
|
serialize_properties(Props).
|
||||||
|
|
||||||
|
serialize_properties(undefined) ->
|
||||||
|
<<0>>;
|
||||||
|
serialize_properties(Props) when map_size(Props) == 0 ->
|
||||||
|
<<0>>;
|
||||||
|
serialize_properties(Props) when is_map(Props) ->
|
||||||
|
Bin = << <<(serialize_property(Prop, Val))/binary>> || {Prop, Val} <- maps:to_list(Props) >>,
|
||||||
|
[serialize_variable_byte_integer(byte_size(Bin)), Bin].
|
||||||
|
|
||||||
|
serialize_property(_, undefined) ->
|
||||||
|
<<>>;
|
||||||
|
serialize_property('Payload-Format-Indicator', Val) ->
|
||||||
|
<<16#01, Val>>;
|
||||||
|
serialize_property('Message-Expiry-Interval', Val) ->
|
||||||
|
<<16#02, Val:32/big>>;
|
||||||
|
serialize_property('Content-Type', Val) ->
|
||||||
|
<<16#03, (serialize_utf8_string(Val))/binary>>;
|
||||||
|
serialize_property('Response-Topic', Val) ->
|
||||||
|
<<16#08, (serialize_utf8_string(Val))/binary>>;
|
||||||
|
serialize_property('Correlation-Data', Val) ->
|
||||||
|
<<16#09, (byte_size(Val)):16, Val/binary>>;
|
||||||
|
serialize_property('Subscription-Identifier', Val) ->
|
||||||
|
<<16#0B, (serialize_variable_byte_integer(Val))/binary>>;
|
||||||
|
serialize_property('Session-Expiry-Interval', Val) ->
|
||||||
|
<<16#11, Val:32/big>>;
|
||||||
|
serialize_property('Assigned-Client-Identifier', Val) ->
|
||||||
|
<<16#12, (serialize_utf8_string(Val))/binary>>;
|
||||||
|
serialize_property('Server-Keep-Alive', Val) ->
|
||||||
|
<<16#13, Val:16/big>>;
|
||||||
|
serialize_property('Authentication-Method', Val) ->
|
||||||
|
<<16#15, (serialize_utf8_string(Val))/binary>>;
|
||||||
|
serialize_property('Authentication-Data', Val) ->
|
||||||
|
<<16#16, (iolist_size(Val)):16, Val/binary>>;
|
||||||
|
serialize_property('Request-Problem-Information', Val) ->
|
||||||
|
<<16#17, Val>>;
|
||||||
|
serialize_property('Will-Delay-Interval', Val) ->
|
||||||
|
<<16#18, Val:32/big>>;
|
||||||
|
serialize_property('Request-Response-Information', Val) ->
|
||||||
|
<<16#19, Val>>;
|
||||||
|
serialize_property('Response-Information', Val) ->
|
||||||
|
<<16#1A, (serialize_utf8_string(Val))/binary>>;
|
||||||
|
serialize_property('Server-Reference', Val) ->
|
||||||
|
<<16#1C, (serialize_utf8_string(Val))/binary>>;
|
||||||
|
serialize_property('Reason-String', Val) ->
|
||||||
|
<<16#1F, (serialize_utf8_string(Val))/binary>>;
|
||||||
|
serialize_property('Receive-Maximum', Val) ->
|
||||||
|
<<16#21, Val:16/big>>;
|
||||||
|
serialize_property('Topic-Alias-Maximum', Val) ->
|
||||||
|
<<16#22, Val:16/big>>;
|
||||||
|
serialize_property('Topic-Alias', Val) ->
|
||||||
|
<<16#23, Val:16/big>>;
|
||||||
|
serialize_property('Maximum-QoS', Val) ->
|
||||||
|
<<16#24, Val>>;
|
||||||
|
serialize_property('Retain-Available', Val) ->
|
||||||
|
<<16#25, Val>>;
|
||||||
|
serialize_property('User-Property', {Key, Val}) ->
|
||||||
|
<<16#26, (serialize_utf8_pair({Key, Val}))/binary>>;
|
||||||
|
serialize_property('User-Property', Props) when is_list(Props) ->
|
||||||
|
<< <<(serialize_property('User-Property', {Key, Val}))/binary>>
|
||||||
|
|| {Key, Val} <- Props >>;
|
||||||
|
serialize_property('Maximum-Packet-Size', Val) ->
|
||||||
|
<<16#27, Val:32/big>>;
|
||||||
|
serialize_property('Wildcard-Subscription-Available', Val) ->
|
||||||
|
<<16#28, Val>>;
|
||||||
|
serialize_property('Subscription-Identifier-Available', Val) ->
|
||||||
|
<<16#29, Val>>;
|
||||||
|
serialize_property('Shared-Subscription-Available', Val) ->
|
||||||
|
<<16#2A, Val>>.
|
||||||
|
|
||||||
|
serialize_topic_filters(subscribe, TopicFilters, ?MQTT_PROTO_V5) ->
|
||||||
|
<< <<(serialize_utf8_string(Topic))/binary,
|
||||||
|
?RESERVED:2, Rh:2, (flag(Rap)):1,(flag(Nl)):1, QoS:2 >>
|
||||||
|
|| {Topic, #{rh := Rh, rap := Rap, nl := Nl, qos := QoS}} <- TopicFilters >>;
|
||||||
|
|
||||||
|
serialize_topic_filters(subscribe, TopicFilters, _Ver) ->
|
||||||
|
<< <<(serialize_utf8_string(Topic))/binary, ?RESERVED:6, QoS:2>>
|
||||||
|
|| {Topic, #{qos := QoS}} <- TopicFilters >>;
|
||||||
|
|
||||||
|
serialize_topic_filters(unsubscribe, TopicFilters, _Ver) ->
|
||||||
|
<< <<(serialize_utf8_string(Topic))/binary>> || Topic <- TopicFilters >>.
|
||||||
|
|
||||||
|
serialize_reason_codes(undefined) ->
|
||||||
|
<<>>;
|
||||||
|
serialize_reason_codes(ReasonCodes) when is_list(ReasonCodes) ->
|
||||||
|
<< <<Code>> || Code <- ReasonCodes >>.
|
||||||
|
|
||||||
|
serialize_utf8_pair({Name, Value}) ->
|
||||||
|
<< (serialize_utf8_string(Name))/binary, (serialize_utf8_string(Value))/binary >>.
|
||||||
|
|
||||||
|
serialize_binary_data(Bin) ->
|
||||||
|
[<<(byte_size(Bin)):16/big-unsigned-integer>>, Bin].
|
||||||
|
|
||||||
|
serialize_utf8_string(undefined, false) ->
|
||||||
|
error(utf8_string_undefined);
|
||||||
|
serialize_utf8_string(undefined, true) ->
|
||||||
|
<<>>;
|
||||||
|
serialize_utf8_string(String, _AllowNull) ->
|
||||||
|
serialize_utf8_string(String).
|
||||||
|
|
||||||
|
serialize_utf8_string(String) ->
|
||||||
|
StringBin = unicode:characters_to_binary(String),
|
||||||
|
Len = byte_size(StringBin),
|
||||||
|
true = (Len =< 16#ffff),
|
||||||
|
<<Len:16/big, StringBin/binary>>.
|
||||||
|
|
||||||
|
serialize_remaining_len(I) ->
|
||||||
|
serialize_variable_byte_integer(I).
|
||||||
|
|
||||||
|
serialize_variable_byte_integer(N) when N =< ?LOWBITS ->
|
||||||
|
<<0:1, N:7>>;
|
||||||
|
serialize_variable_byte_integer(N) ->
|
||||||
|
<<1:1, (N rem ?HIGHBIT):7, (serialize_variable_byte_integer(N div ?HIGHBIT))/binary>>.
|
||||||
|
|
||||||
|
%% Is the frame too large?
|
||||||
|
-spec(is_too_large(iodata(), pos_integer()) -> boolean()).
|
||||||
|
is_too_large(IoData, MaxSize) ->
|
||||||
|
iolist_size(IoData) >= MaxSize.
|
||||||
|
|
||||||
|
get_property(_Key, undefined, Default) ->
|
||||||
|
Default;
|
||||||
|
get_property(Key, Props, Default) ->
|
||||||
|
maps:get(Key, Props, Default).
|
||||||
|
|
||||||
|
%% Validate header if sctrict mode. See: mqtt-v5.0: 2.1.3 Flags
|
||||||
|
validate_header(?CONNECT, 0, 0, 0) -> ok;
|
||||||
|
validate_header(?CONNACK, 0, 0, 0) -> ok;
|
||||||
|
validate_header(?PUBLISH, 0, ?QOS_0, _) -> ok;
|
||||||
|
validate_header(?PUBLISH, _, ?QOS_1, _) -> ok;
|
||||||
|
validate_header(?PUBLISH, 0, ?QOS_2, _) -> ok;
|
||||||
|
validate_header(?PUBACK, 0, 0, 0) -> ok;
|
||||||
|
validate_header(?PUBREC, 0, 0, 0) -> ok;
|
||||||
|
validate_header(?PUBREL, 0, 1, 0) -> ok;
|
||||||
|
validate_header(?PUBCOMP, 0, 0, 0) -> ok;
|
||||||
|
validate_header(?SUBSCRIBE, 0, 1, 0) -> ok;
|
||||||
|
validate_header(?SUBACK, 0, 0, 0) -> ok;
|
||||||
|
validate_header(?UNSUBSCRIBE, 0, 1, 0) -> ok;
|
||||||
|
validate_header(?UNSUBACK, 0, 0, 0) -> ok;
|
||||||
|
validate_header(?PINGREQ, 0, 0, 0) -> ok;
|
||||||
|
validate_header(?PINGRESP, 0, 0, 0) -> ok;
|
||||||
|
validate_header(?DISCONNECT, 0, 0, 0) -> ok;
|
||||||
|
validate_header(?AUTH, 0, 0, 0) -> ok;
|
||||||
|
validate_header(_Type, _Dup, _QoS, _Rt) -> error(bad_frame_header).
|
||||||
|
|
||||||
|
-compile({inline, [validate_packet_id/1]}).
|
||||||
|
validate_packet_id(0) -> error(bad_packet_id);
|
||||||
|
validate_packet_id(_) -> ok.
|
||||||
|
|
||||||
|
validate_subqos([3|_]) -> error(bad_subqos);
|
||||||
|
validate_subqos([_|T]) -> validate_subqos(T);
|
||||||
|
validate_subqos([]) -> ok.
|
||||||
|
|
||||||
|
bool(0) -> false;
|
||||||
|
bool(1) -> true.
|
||||||
|
|
||||||
|
flag(undefined) -> ?RESERVED;
|
||||||
|
flag(false) -> 0;
|
||||||
|
flag(true) -> 1;
|
||||||
|
flag(X) when is_integer(X) -> X;
|
||||||
|
flag(B) when is_binary(B) -> 1.
|
||||||
|
|
||||||
|
fixqos(?PUBREL, 0) -> 1;
|
||||||
|
fixqos(?SUBSCRIBE, 0) -> 1;
|
||||||
|
fixqos(?UNSUBSCRIBE, 0) -> 1;
|
||||||
|
fixqos(_Type, QoS) -> QoS.
|
||||||
|
|
||||||
172
src/emqtt_props.erl
Normal file
172
src/emqtt_props.erl
Normal file
@ -0,0 +1,172 @@
|
|||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
%% @doc MQTT5 Properties
|
||||||
|
-module(emqtt_props).
|
||||||
|
|
||||||
|
-include("emqtt.hrl").
|
||||||
|
|
||||||
|
-export([id/1, name/1, filter/2, validate/1]).
|
||||||
|
|
||||||
|
%% For tests
|
||||||
|
-export([all/0]).
|
||||||
|
|
||||||
|
-type(prop_name() :: atom()).
|
||||||
|
-type(prop_id() :: pos_integer()).
|
||||||
|
|
||||||
|
-define(PROPS_TABLE,
|
||||||
|
#{16#01 => {'Payload-Format-Indicator', 'Byte', [?PUBLISH]},
|
||||||
|
16#02 => {'Message-Expiry-Interval', 'Four-Byte-Integer', [?PUBLISH]},
|
||||||
|
16#03 => {'Content-Type', 'UTF8-Encoded-String', [?PUBLISH]},
|
||||||
|
16#08 => {'Response-Topic', 'UTF8-Encoded-String', [?PUBLISH]},
|
||||||
|
16#09 => {'Correlation-Data', 'Binary-Data', [?PUBLISH]},
|
||||||
|
16#0B => {'Subscription-Identifier', 'Variable-Byte-Integer', [?PUBLISH, ?SUBSCRIBE]},
|
||||||
|
16#11 => {'Session-Expiry-Interval', 'Four-Byte-Integer', [?CONNECT, ?CONNACK, ?DISCONNECT]},
|
||||||
|
16#12 => {'Assigned-Client-Identifier', 'UTF8-Encoded-String', [?CONNACK]},
|
||||||
|
16#13 => {'Server-Keep-Alive', 'Two-Byte-Integer', [?CONNACK]},
|
||||||
|
16#15 => {'Authentication-Method', 'UTF8-Encoded-String', [?CONNECT, ?CONNACK, ?AUTH]},
|
||||||
|
16#16 => {'Authentication-Data', 'Binary-Data', [?CONNECT, ?CONNACK, ?AUTH]},
|
||||||
|
16#17 => {'Request-Problem-Information', 'Byte', [?CONNECT]},
|
||||||
|
16#18 => {'Will-Delay-Interval', 'Four-Byte-Integer', ['WILL']},
|
||||||
|
16#19 => {'Request-Response-Information', 'Byte', [?CONNECT]},
|
||||||
|
16#1A => {'Response-Information', 'UTF8-Encoded-String', [?CONNACK]},
|
||||||
|
16#1C => {'Server-Reference', 'UTF8-Encoded-String', [?CONNACK, ?DISCONNECT]},
|
||||||
|
16#1F => {'Reason-String', 'UTF8-Encoded-String', [?CONNACK, ?DISCONNECT, ?PUBACK,
|
||||||
|
?PUBREC, ?PUBREL, ?PUBCOMP,
|
||||||
|
?SUBACK, ?UNSUBACK, ?AUTH]},
|
||||||
|
16#21 => {'Receive-Maximum', 'Two-Byte-Integer', [?CONNECT, ?CONNACK]},
|
||||||
|
16#22 => {'Topic-Alias-Maximum', 'Two-Byte-Integer', [?CONNECT, ?CONNACK]},
|
||||||
|
16#23 => {'Topic-Alias', 'Two-Byte-Integer', [?PUBLISH]},
|
||||||
|
16#24 => {'Maximum-QoS', 'Byte', [?CONNACK]},
|
||||||
|
16#25 => {'Retain-Available', 'Byte', [?CONNACK]},
|
||||||
|
16#26 => {'User-Property', 'UTF8-String-Pair', 'ALL'},
|
||||||
|
16#27 => {'Maximum-Packet-Size', 'Four-Byte-Integer', [?CONNECT, ?CONNACK]},
|
||||||
|
16#28 => {'Wildcard-Subscription-Available', 'Byte', [?CONNACK]},
|
||||||
|
16#29 => {'Subscription-Identifier-Available', 'Byte', [?CONNACK]},
|
||||||
|
16#2A => {'Shared-Subscription-Available', 'Byte', [?CONNACK]}
|
||||||
|
}).
|
||||||
|
|
||||||
|
-spec(id(prop_name()) -> prop_id()).
|
||||||
|
id('Payload-Format-Indicator') -> 16#01;
|
||||||
|
id('Message-Expiry-Interval') -> 16#02;
|
||||||
|
id('Content-Type') -> 16#03;
|
||||||
|
id('Response-Topic') -> 16#08;
|
||||||
|
id('Correlation-Data') -> 16#09;
|
||||||
|
id('Subscription-Identifier') -> 16#0B;
|
||||||
|
id('Session-Expiry-Interval') -> 16#11;
|
||||||
|
id('Assigned-Client-Identifier') -> 16#12;
|
||||||
|
id('Server-Keep-Alive') -> 16#13;
|
||||||
|
id('Authentication-Method') -> 16#15;
|
||||||
|
id('Authentication-Data') -> 16#16;
|
||||||
|
id('Request-Problem-Information') -> 16#17;
|
||||||
|
id('Will-Delay-Interval') -> 16#18;
|
||||||
|
id('Request-Response-Information') -> 16#19;
|
||||||
|
id('Response-Information') -> 16#1A;
|
||||||
|
id('Server-Reference') -> 16#1C;
|
||||||
|
id('Reason-String') -> 16#1F;
|
||||||
|
id('Receive-Maximum') -> 16#21;
|
||||||
|
id('Topic-Alias-Maximum') -> 16#22;
|
||||||
|
id('Topic-Alias') -> 16#23;
|
||||||
|
id('Maximum-QoS') -> 16#24;
|
||||||
|
id('Retain-Available') -> 16#25;
|
||||||
|
id('User-Property') -> 16#26;
|
||||||
|
id('Maximum-Packet-Size') -> 16#27;
|
||||||
|
id('Wildcard-Subscription-Available') -> 16#28;
|
||||||
|
id('Subscription-Identifier-Available') -> 16#29;
|
||||||
|
id('Shared-Subscription-Available') -> 16#2A;
|
||||||
|
id(Name) -> error({bad_property, Name}).
|
||||||
|
|
||||||
|
-spec(name(prop_id()) -> prop_name()).
|
||||||
|
name(16#01) -> 'Payload-Format-Indicator';
|
||||||
|
name(16#02) -> 'Message-Expiry-Interval';
|
||||||
|
name(16#03) -> 'Content-Type';
|
||||||
|
name(16#08) -> 'Response-Topic';
|
||||||
|
name(16#09) -> 'Correlation-Data';
|
||||||
|
name(16#0B) -> 'Subscription-Identifier';
|
||||||
|
name(16#11) -> 'Session-Expiry-Interval';
|
||||||
|
name(16#12) -> 'Assigned-Client-Identifier';
|
||||||
|
name(16#13) -> 'Server-Keep-Alive';
|
||||||
|
name(16#15) -> 'Authentication-Method';
|
||||||
|
name(16#16) -> 'Authentication-Data';
|
||||||
|
name(16#17) -> 'Request-Problem-Information';
|
||||||
|
name(16#18) -> 'Will-Delay-Interval';
|
||||||
|
name(16#19) -> 'Request-Response-Information';
|
||||||
|
name(16#1A) -> 'Response-Information';
|
||||||
|
name(16#1C) -> 'Server-Reference';
|
||||||
|
name(16#1F) -> 'Reason-String';
|
||||||
|
name(16#21) -> 'Receive-Maximum';
|
||||||
|
name(16#22) -> 'Topic-Alias-Maximum';
|
||||||
|
name(16#23) -> 'Topic-Alias';
|
||||||
|
name(16#24) -> 'Maximum-QoS';
|
||||||
|
name(16#25) -> 'Retain-Available';
|
||||||
|
name(16#26) -> 'User-Property';
|
||||||
|
name(16#27) -> 'Maximum-Packet-Size';
|
||||||
|
name(16#28) -> 'Wildcard-Subscription-Available';
|
||||||
|
name(16#29) -> 'Subscription-Identifier-Available';
|
||||||
|
name(16#2A) -> 'Shared-Subscription-Available';
|
||||||
|
name(Id) -> error({unsupported_property, Id}).
|
||||||
|
|
||||||
|
filter(PacketType, Props) when is_map(Props) ->
|
||||||
|
maps:from_list(filter(PacketType, maps:to_list(Props)));
|
||||||
|
|
||||||
|
filter(PacketType, Props) when ?CONNECT =< PacketType, PacketType =< ?AUTH, is_list(Props) ->
|
||||||
|
Filter = fun(Name) ->
|
||||||
|
case maps:find(id(Name), ?PROPS_TABLE) of
|
||||||
|
{ok, {Name, _Type, 'ALL'}} ->
|
||||||
|
true;
|
||||||
|
{ok, {Name, _Type, AllowedTypes}} ->
|
||||||
|
lists:member(PacketType, AllowedTypes);
|
||||||
|
error -> false
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
[Prop || Prop = {Name, _} <- Props, Filter(Name)].
|
||||||
|
|
||||||
|
validate(Props) when is_map(Props) ->
|
||||||
|
lists:foreach(fun validate_prop/1, maps:to_list(Props)).
|
||||||
|
|
||||||
|
validate_prop(Prop = {Name, Val}) ->
|
||||||
|
case maps:find(id(Name), ?PROPS_TABLE) of
|
||||||
|
{ok, {Name, Type, _}} ->
|
||||||
|
validate_value(Type, Val)
|
||||||
|
orelse error(bad_property, Prop);
|
||||||
|
error ->
|
||||||
|
error({bad_property, Prop})
|
||||||
|
end.
|
||||||
|
|
||||||
|
validate_value('Byte', Val) ->
|
||||||
|
is_integer(Val) andalso Val =< 16#FF;
|
||||||
|
validate_value('Two-Byte-Integer', Val) ->
|
||||||
|
is_integer(Val) andalso 0 =< Val andalso Val =< 16#FFFF;
|
||||||
|
validate_value('Four-Byte-Integer', Val) ->
|
||||||
|
is_integer(Val) andalso 0 =< Val andalso Val =< 16#FFFFFFFF;
|
||||||
|
validate_value('Variable-Byte-Integer', Val) ->
|
||||||
|
is_integer(Val) andalso 0 =< Val andalso Val =< 16#7FFFFFFF;
|
||||||
|
validate_value('UTF8-String-Pair', {Name, Val}) ->
|
||||||
|
validate_value('UTF8-Encoded-String', Name)
|
||||||
|
andalso validate_value('UTF8-Encoded-String', Val);
|
||||||
|
validate_value('UTF8-String-Pair', Pairs) when is_list(Pairs) ->
|
||||||
|
lists:foldl(fun(Pair, OK) ->
|
||||||
|
OK andalso validate_value('UTF8-String-Pair', Pair)
|
||||||
|
end, true, Pairs);
|
||||||
|
validate_value('UTF8-Encoded-String', Val) ->
|
||||||
|
is_binary(Val);
|
||||||
|
validate_value('Binary-Data', Val) ->
|
||||||
|
is_binary(Val);
|
||||||
|
validate_value(_Type, _Val) -> false.
|
||||||
|
|
||||||
|
-spec(all() -> map()).
|
||||||
|
all() -> ?PROPS_TABLE.
|
||||||
|
|
||||||
120
src/emqtt_sock.erl
Normal file
120
src/emqtt_sock.erl
Normal file
@ -0,0 +1,120 @@
|
|||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqtt_sock).
|
||||||
|
|
||||||
|
-export([connect/4, send/2, recv/2, close/1 ]).
|
||||||
|
|
||||||
|
-export([ sockname/1, setopts/2, getstat/2 ]).
|
||||||
|
|
||||||
|
-record(ssl_socket, {
|
||||||
|
tcp,
|
||||||
|
ssl
|
||||||
|
}).
|
||||||
|
|
||||||
|
-type(socket() :: inet:socket() | #ssl_socket{}).
|
||||||
|
|
||||||
|
-type(sockname() :: {inet:ip_address(), inet:port_number()}).
|
||||||
|
|
||||||
|
-type(option() :: gen_tcp:connect_option() | {ssl_opts, [ssl:ssl_option()]}).
|
||||||
|
|
||||||
|
-export_type([socket/0, option/0]).
|
||||||
|
|
||||||
|
-define(DEFAULT_TCP_OPTIONS, [binary, {packet, raw}, {active, false},
|
||||||
|
{nodelay, true}]).
|
||||||
|
|
||||||
|
-spec(connect(inet:ip_address() | inet:hostname(),
|
||||||
|
inet:port_number(), [option()], timeout())
|
||||||
|
-> {ok, socket()} | {error, term()}).
|
||||||
|
connect(Host, Port, SockOpts, Timeout) ->
|
||||||
|
TcpOpts = merge_opts(?DEFAULT_TCP_OPTIONS,
|
||||||
|
lists:keydelete(ssl_opts, 1, SockOpts)),
|
||||||
|
case gen_tcp:connect(Host, Port, TcpOpts, Timeout) of
|
||||||
|
{ok, Sock} ->
|
||||||
|
case lists:keyfind(ssl_opts, 1, SockOpts) of
|
||||||
|
{ssl_opts, SslOpts} ->
|
||||||
|
ssl_upgrade(Sock, SslOpts, Timeout);
|
||||||
|
false ->
|
||||||
|
{ok, Sock}
|
||||||
|
end;
|
||||||
|
{error, Reason} ->
|
||||||
|
{error, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
|
ssl_upgrade(Sock, SslOpts, Timeout) ->
|
||||||
|
TlsVersions = proplists:get_value(versions, SslOpts, []),
|
||||||
|
Ciphers = proplists:get_value(ciphers, SslOpts, default_ciphers(TlsVersions)),
|
||||||
|
SslOpts2 = merge_opts(SslOpts, [{ciphers, Ciphers}]),
|
||||||
|
case ssl:connect(Sock, SslOpts2, Timeout) of
|
||||||
|
{ok, SslSock} ->
|
||||||
|
ok = ssl:controlling_process(SslSock, self()),
|
||||||
|
{ok, #ssl_socket{tcp = Sock, ssl = SslSock}};
|
||||||
|
{error, Reason} ->
|
||||||
|
{error, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec(send(socket(), iodata()) -> ok | {error, einval | closed}).
|
||||||
|
send(Sock, Data) when is_port(Sock) ->
|
||||||
|
gen_tcp:send(Sock, Data);
|
||||||
|
send(#ssl_socket{ssl = SslSock}, Data) ->
|
||||||
|
ssl:send(SslSock, Data).
|
||||||
|
|
||||||
|
-spec(recv(socket(), non_neg_integer())
|
||||||
|
-> {ok, iodata()} | {error, closed | inet:posix()}).
|
||||||
|
recv(Sock, Length) when is_port(Sock) ->
|
||||||
|
gen_tcp:recv(Sock, Length);
|
||||||
|
recv(#ssl_socket{ssl = SslSock}, Length) ->
|
||||||
|
ssl:recv(SslSock, Length).
|
||||||
|
|
||||||
|
-spec(close(socket()) -> ok).
|
||||||
|
close(Sock) when is_port(Sock) ->
|
||||||
|
gen_tcp:close(Sock);
|
||||||
|
close(#ssl_socket{ssl = SslSock}) ->
|
||||||
|
ssl:close(SslSock).
|
||||||
|
|
||||||
|
-spec(setopts(socket(), [gen_tcp:option() | ssl:socketoption()]) -> ok).
|
||||||
|
setopts(Sock, Opts) when is_port(Sock) ->
|
||||||
|
inet:setopts(Sock, Opts);
|
||||||
|
setopts(#ssl_socket{ssl = SslSock}, Opts) ->
|
||||||
|
ssl:setopts(SslSock, Opts).
|
||||||
|
|
||||||
|
-spec(getstat(socket(), [atom()])
|
||||||
|
-> {ok, [{atom(), integer()}]} | {error, term()}).
|
||||||
|
getstat(Sock, Options) when is_port(Sock) ->
|
||||||
|
inet:getstat(Sock, Options);
|
||||||
|
getstat(#ssl_socket{tcp = Sock}, Options) ->
|
||||||
|
inet:getstat(Sock, Options).
|
||||||
|
|
||||||
|
-spec(sockname(socket()) -> {ok, sockname()} | {error, term()}).
|
||||||
|
sockname(Sock) when is_port(Sock) ->
|
||||||
|
inet:sockname(Sock);
|
||||||
|
sockname(#ssl_socket{ssl = SslSock}) ->
|
||||||
|
ssl:sockname(SslSock).
|
||||||
|
|
||||||
|
-spec(merge_opts(list(), list()) -> list()).
|
||||||
|
merge_opts(Defaults, Options) ->
|
||||||
|
lists:foldl(
|
||||||
|
fun({Opt, Val}, Acc) ->
|
||||||
|
lists:keystore(Opt, 1, Acc, {Opt, Val});
|
||||||
|
(Opt, Acc) ->
|
||||||
|
lists:usort([Opt | Acc])
|
||||||
|
end, Defaults, Options).
|
||||||
|
|
||||||
|
default_ciphers(TlsVersions) ->
|
||||||
|
lists:foldl(
|
||||||
|
fun(TlsVer, Ciphers) ->
|
||||||
|
Ciphers ++ ssl:cipher_suites(all, TlsVer)
|
||||||
|
end, [], TlsVersions).
|
||||||
Loading…
x
Reference in New Issue
Block a user