diff --git a/apps/endpoint/src/endpoint.app.src b/apps/endpoint/src/endpoint.app.src index 30e33cc..b10c10f 100644 --- a/apps/endpoint/src/endpoint.app.src +++ b/apps/endpoint/src/endpoint.app.src @@ -4,7 +4,9 @@ {registered, []}, {mod, {endpoint_app, []}}, {applications, - [kernel, + [ + emqtt, + kernel, stdlib ]}, {env,[]}, diff --git a/apps/iot/include/emqtt.hrl b/apps/iot/include/emqtt.hrl deleted file mode 100644 index 48cc014..0000000 --- a/apps/iot/include/emqtt.hrl +++ /dev/null @@ -1,535 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --ifndef(EMQTT_HRL). --define(EMQTT_HRL, true). - -%%-------------------------------------------------------------------- -%% MQTT Protocol Version and Names -%%-------------------------------------------------------------------- - --define(MQTT_PROTO_V3, 3). --define(MQTT_PROTO_V4, 4). --define(MQTT_PROTO_V5, 5). - --define(PROTOCOL_NAMES, [ - {?MQTT_PROTO_V3, <<"MQIsdp">>}, - {?MQTT_PROTO_V4, <<"MQTT">>}, - {?MQTT_PROTO_V5, <<"MQTT">>}]). - -%%-------------------------------------------------------------------- -%% MQTT QoS Levels -%%-------------------------------------------------------------------- - --define(QOS_0, 0). %% At most once --define(QOS_1, 1). %% At least once --define(QOS_2, 2). %% Exactly once - --define(IS_QOS(I), (I >= ?QOS_0 andalso I =< ?QOS_2)). - --define(QOS_I(Name), - begin - (case Name of - ?QOS_0 -> ?QOS_0; - qos0 -> ?QOS_0; - at_most_once -> ?QOS_0; - ?QOS_1 -> ?QOS_1; - qos1 -> ?QOS_1; - at_least_once -> ?QOS_1; - ?QOS_2 -> ?QOS_2; - qos2 -> ?QOS_2; - exactly_once -> ?QOS_2 - end) - end). - --define(IS_QOS_NAME(I), - (I =:= qos0 orelse I =:= at_most_once orelse - I =:= qos1 orelse I =:= at_least_once orelse - I =:= qos2 orelse I =:= exactly_once)). - -%%-------------------------------------------------------------------- -%% Maximum ClientId Length. -%%-------------------------------------------------------------------- - --define(MAX_CLIENTID_LEN, 65535). - -%%-------------------------------------------------------------------- -%% MQTT Control Packet Types -%%-------------------------------------------------------------------- - --define(RESERVED, 0). %% Reserved --define(CONNECT, 1). %% Client request to connect to Server --define(CONNACK, 2). %% Server to Client: Connect acknowledgment --define(PUBLISH, 3). %% Publish message --define(PUBACK, 4). %% Publish acknowledgment --define(PUBREC, 5). %% Publish received (assured delivery part 1) --define(PUBREL, 6). %% Publish release (assured delivery part 2) --define(PUBCOMP, 7). %% Publish complete (assured delivery part 3) --define(SUBSCRIBE, 8). %% Client subscribe request --define(SUBACK, 9). %% Server Subscribe acknowledgment --define(UNSUBSCRIBE, 10). %% Unsubscribe request --define(UNSUBACK, 11). %% Unsubscribe acknowledgment --define(PINGREQ, 12). %% PING request --define(PINGRESP, 13). %% PING response --define(DISCONNECT, 14). %% Client or Server is disconnecting --define(AUTH, 15). %% Authentication exchange - --define(TYPE_NAMES, [ - 'CONNECT', - 'CONNACK', - 'PUBLISH', - 'PUBACK', - 'PUBREC', - 'PUBREL', - 'PUBCOMP', - 'SUBSCRIBE', - 'SUBACK', - 'UNSUBSCRIBE', - 'UNSUBACK', - 'PINGREQ', - 'PINGRESP', - 'DISCONNECT', - 'AUTH']). - -%%-------------------------------------------------------------------- -%% MQTT V3.1.1 Connect Return Codes -%%-------------------------------------------------------------------- - --define(CONNACK_ACCEPT, 0). %% Connection accepted --define(CONNACK_PROTO_VER, 1). %% Unacceptable protocol version --define(CONNACK_INVALID_ID, 2). %% Client Identifier is correct UTF-8 but not allowed by the Server --define(CONNACK_SERVER, 3). %% Server unavailable --define(CONNACK_CREDENTIALS, 4). %% Username or password is malformed --define(CONNACK_AUTH, 5). %% Client is not authorized to connect - -%%-------------------------------------------------------------------- -%% MQTT V5.0 Reason Codes -%%-------------------------------------------------------------------- - --define(RC_SUCCESS, 16#00). --define(RC_NORMAL_DISCONNECTION, 16#00). --define(RC_GRANTED_QOS_0, 16#00). --define(RC_GRANTED_QOS_1, 16#01). --define(RC_GRANTED_QOS_2, 16#02). --define(RC_DISCONNECT_WITH_WILL_MESSAGE, 16#04). --define(RC_NO_MATCHING_SUBSCRIBERS, 16#10). --define(RC_NO_SUBSCRIPTION_EXISTED, 16#11). --define(RC_CONTINUE_AUTHENTICATION, 16#18). --define(RC_RE_AUTHENTICATE, 16#19). --define(RC_UNSPECIFIED_ERROR, 16#80). --define(RC_MALFORMED_PACKET, 16#81). --define(RC_PROTOCOL_ERROR, 16#82). --define(RC_IMPLEMENTATION_SPECIFIC_ERROR, 16#83). --define(RC_UNSUPPORTED_PROTOCOL_VERSION, 16#84). --define(RC_CLIENT_IDENTIFIER_NOT_VALID, 16#85). --define(RC_BAD_USER_NAME_OR_PASSWORD, 16#86). --define(RC_NOT_AUTHORIZED, 16#87). --define(RC_SERVER_UNAVAILABLE, 16#88). --define(RC_SERVER_BUSY, 16#89). --define(RC_BANNED, 16#8A). --define(RC_SERVER_SHUTTING_DOWN, 16#8B). --define(RC_BAD_AUTHENTICATION_METHOD, 16#8C). --define(RC_KEEP_ALIVE_TIMEOUT, 16#8D). --define(RC_SESSION_TAKEN_OVER, 16#8E). --define(RC_TOPIC_FILTER_INVALID, 16#8F). --define(RC_TOPIC_NAME_INVALID, 16#90). --define(RC_PACKET_IDENTIFIER_IN_USE, 16#91). --define(RC_PACKET_IDENTIFIER_NOT_FOUND, 16#92). --define(RC_RECEIVE_MAXIMUM_EXCEEDED, 16#93). --define(RC_TOPIC_ALIAS_INVALID, 16#94). --define(RC_PACKET_TOO_LARGE, 16#95). --define(RC_MESSAGE_RATE_TOO_HIGH, 16#96). --define(RC_QUOTA_EXCEEDED, 16#97). --define(RC_ADMINISTRATIVE_ACTION, 16#98). --define(RC_PAYLOAD_FORMAT_INVALID, 16#99). --define(RC_RETAIN_NOT_SUPPORTED, 16#9A). --define(RC_QOS_NOT_SUPPORTED, 16#9B). --define(RC_USE_ANOTHER_SERVER, 16#9C). --define(RC_SERVER_MOVED, 16#9D). --define(RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED, 16#9E). --define(RC_CONNECTION_RATE_EXCEEDED, 16#9F). --define(RC_MAXIMUM_CONNECT_TIME, 16#A0). --define(RC_SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED, 16#A1). --define(RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED, 16#A2). - -%%-------------------------------------------------------------------- -%% Maximum MQTT Packet ID and Length -%%-------------------------------------------------------------------- - --define(MAX_PACKET_ID, 16#ffff). --define(MAX_PACKET_SIZE, 16#fffffff). - -%%-------------------------------------------------------------------- -%% MQTT Frame Mask -%%-------------------------------------------------------------------- - --define(HIGHBIT, 2#10000000). --define(LOWBITS, 2#01111111). - -%%-------------------------------------------------------------------- -%% MQTT Packet Fixed Header -%%-------------------------------------------------------------------- - --record(mqtt_packet_header, { - type = ?RESERVED, - dup = false, - qos = ?QOS_0, - retain = false - }). - -%%-------------------------------------------------------------------- -%% MQTT Packets -%%-------------------------------------------------------------------- - --define(DEFAULT_SUBOPTS, #{rh => 0, %% Retain Handling - rap => 0, %% Retain as Publish - nl => 0, %% No Local - qos => 0 %% QoS - }). - --record(mqtt_packet_connect, { - proto_name = <<"MQTT">>, - proto_ver = ?MQTT_PROTO_V4, - is_bridge = false, - clean_start = true, - will_flag = false, - will_qos = ?QOS_0, - will_retain = false, - keepalive = 0, - properties = undefined, - clientid = <<>>, - will_props = undefined, - will_topic = undefined, - will_payload = undefined, - username = undefined, - password = undefined - }). - --record(mqtt_packet_connack, { - ack_flags, - reason_code, - properties - }). - --record(mqtt_packet_publish, { - topic_name, - packet_id, - properties - }). - --record(mqtt_packet_puback, { - packet_id, - reason_code, - properties - }). - --record(mqtt_packet_subscribe, { - packet_id, - properties, - topic_filters - }). - --record(mqtt_packet_suback, { - packet_id, - properties, - reason_codes - }). - --record(mqtt_packet_unsubscribe, { - packet_id, - properties, - topic_filters - }). - --record(mqtt_packet_unsuback, { - packet_id, - properties, - reason_codes - }). - --record(mqtt_packet_disconnect, { - reason_code, - properties - }). - --record(mqtt_packet_auth, { - reason_code, - properties - }). - -%%-------------------------------------------------------------------- -%% MQTT Message -%%-------------------------------------------------------------------- - --record(mqtt_msg, { - qos = ?QOS_0 :: emqtt:qos(), - retain = false :: boolean(), - dup = false :: boolean(), - packet_id :: emqtt:packet_id(), - topic :: emqtt:topic(), - props :: emqtt:properties(), - payload :: binary() - }). - -%%-------------------------------------------------------------------- -%% MQTT Control Packet -%%-------------------------------------------------------------------- - --record(mqtt_packet, { - header :: #mqtt_packet_header{}, - variable :: #mqtt_packet_connect{} - | #mqtt_packet_connack{} - | #mqtt_packet_publish{} - | #mqtt_packet_puback{} - | #mqtt_packet_subscribe{} - | #mqtt_packet_suback{} - | #mqtt_packet_unsubscribe{} - | #mqtt_packet_unsuback{} - | #mqtt_packet_disconnect{} - | #mqtt_packet_auth{} - | pos_integer() - | undefined, - payload :: binary() | undefined - }). - -%%-------------------------------------------------------------------- -%% MQTT Packet Match -%%-------------------------------------------------------------------- - --define(CONNECT_PACKET(Var), - #mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT}, - variable = Var}). - --define(CONNACK_PACKET(ReasonCode), - #mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}, - variable = #mqtt_packet_connack{ack_flags = 0, - reason_code = ReasonCode} - }). - --define(CONNACK_PACKET(ReasonCode, SessPresent), - #mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}, - variable = #mqtt_packet_connack{ack_flags = SessPresent, - reason_code = ReasonCode} - }). - --define(CONNACK_PACKET(ReasonCode, SessPresent, Properties), - #mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}, - variable = #mqtt_packet_connack{ack_flags = SessPresent, - reason_code = ReasonCode, - properties = Properties} - }). - --define(AUTH_PACKET(), - #mqtt_packet{header = #mqtt_packet_header{type = ?AUTH}, - variable = #mqtt_packet_auth{reason_code = 0} - }). - --define(AUTH_PACKET(ReasonCode), - #mqtt_packet{header = #mqtt_packet_header{type = ?AUTH}, - variable = #mqtt_packet_auth{reason_code = ReasonCode} - }). - --define(AUTH_PACKET(ReasonCode, Properties), - #mqtt_packet{header = #mqtt_packet_header{type = ?AUTH}, - variable = #mqtt_packet_auth{reason_code = ReasonCode, - properties = Properties} - }). - --define(PUBLISH_PACKET(QoS), - #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, qos = QoS}}). - --define(PUBLISH_PACKET(QoS, PacketId), - #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, - qos = QoS}, - variable = #mqtt_packet_publish{packet_id = PacketId} - }). - --define(PUBLISH_PACKET(QoS, Topic, PacketId, Payload), - #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, - qos = QoS}, - variable = #mqtt_packet_publish{topic_name = Topic, - packet_id = PacketId}, - payload = Payload - }). - --define(PUBLISH_PACKET(QoS, Topic, PacketId, Properties, Payload), - #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, - qos = QoS}, - variable = #mqtt_packet_publish{topic_name = Topic, - packet_id = PacketId, - properties = Properties}, - payload = Payload - }). - --define(PUBACK_PACKET(PacketId), - #mqtt_packet{header = #mqtt_packet_header{type = ?PUBACK}, - variable = #mqtt_packet_puback{packet_id = PacketId, - reason_code = 0} - }). - --define(PUBACK_PACKET(PacketId, ReasonCode), - #mqtt_packet{header = #mqtt_packet_header{type = ?PUBACK}, - variable = #mqtt_packet_puback{packet_id = PacketId, - reason_code = ReasonCode} - }). - --define(PUBACK_PACKET(PacketId, ReasonCode, Properties), - #mqtt_packet{header = #mqtt_packet_header{type = ?PUBACK}, - variable = #mqtt_packet_puback{packet_id = PacketId, - reason_code = ReasonCode, - properties = Properties} - }). - --define(PUBREC_PACKET(PacketId), - #mqtt_packet{header = #mqtt_packet_header{type = ?PUBREC}, - variable = #mqtt_packet_puback{packet_id = PacketId, - reason_code = 0} - }). - --define(PUBREC_PACKET(PacketId, ReasonCode), - #mqtt_packet{header = #mqtt_packet_header{type = ?PUBREC}, - variable = #mqtt_packet_puback{packet_id = PacketId, - reason_code = ReasonCode} - }). - --define(PUBREC_PACKET(PacketId, ReasonCode, Properties), - #mqtt_packet{header = #mqtt_packet_header{type = ?PUBREC}, - variable = #mqtt_packet_puback{packet_id = PacketId, - reason_code = ReasonCode, - properties = Properties} - }). - --define(PUBREL_PACKET(PacketId), - #mqtt_packet{header = #mqtt_packet_header{type = ?PUBREL, - qos = ?QOS_1}, - variable = #mqtt_packet_puback{packet_id = PacketId, - reason_code = 0} - }). - --define(PUBREL_PACKET(PacketId, ReasonCode), - #mqtt_packet{header = #mqtt_packet_header{type = ?PUBREL, - qos = ?QOS_1}, - variable = #mqtt_packet_puback{packet_id = PacketId, - reason_code = ReasonCode} - }). - --define(PUBREL_PACKET(PacketId, ReasonCode, Properties), - #mqtt_packet{header = #mqtt_packet_header{type = ?PUBREL, - qos = ?QOS_1}, - variable = #mqtt_packet_puback{packet_id = PacketId, - reason_code = ReasonCode, - properties = Properties} - }). - --define(PUBCOMP_PACKET(PacketId), - #mqtt_packet{header = #mqtt_packet_header{type = ?PUBCOMP}, - variable = #mqtt_packet_puback{packet_id = PacketId, - reason_code = 0} - }). - --define(PUBCOMP_PACKET(PacketId, ReasonCode), - #mqtt_packet{header = #mqtt_packet_header{type = ?PUBCOMP}, - variable = #mqtt_packet_puback{packet_id = PacketId, - reason_code = ReasonCode} - }). - --define(PUBCOMP_PACKET(PacketId, ReasonCode, Properties), - #mqtt_packet{header = #mqtt_packet_header{type = ?PUBCOMP}, - variable = #mqtt_packet_puback{packet_id = PacketId, - reason_code = ReasonCode, - properties = Properties} - }). - --define(SUBSCRIBE_PACKET(PacketId, TopicFilters), - #mqtt_packet{header = #mqtt_packet_header{type = ?SUBSCRIBE, - qos = ?QOS_1}, - variable = #mqtt_packet_subscribe{packet_id = PacketId, - topic_filters = TopicFilters} - }). - --define(SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), - #mqtt_packet{header = #mqtt_packet_header{type = ?SUBSCRIBE, - qos = ?QOS_1}, - variable = #mqtt_packet_subscribe{packet_id = PacketId, - properties = Properties, - topic_filters = TopicFilters} - }). - --define(SUBACK_PACKET(PacketId, ReasonCodes), - #mqtt_packet{header = #mqtt_packet_header{type = ?SUBACK}, - variable = #mqtt_packet_suback{packet_id = PacketId, - reason_codes = ReasonCodes} - }). - --define(SUBACK_PACKET(PacketId, Properties, ReasonCodes), - #mqtt_packet{header = #mqtt_packet_header{type = ?SUBACK}, - variable = #mqtt_packet_suback{packet_id = PacketId, - properties = Properties, - reason_codes = ReasonCodes} - }). - --define(UNSUBSCRIBE_PACKET(PacketId, TopicFilters), - #mqtt_packet{header = #mqtt_packet_header{type = ?UNSUBSCRIBE, - qos = ?QOS_1}, - variable = #mqtt_packet_unsubscribe{packet_id = PacketId, - topic_filters = TopicFilters} - }). - --define(UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), - #mqtt_packet{header = #mqtt_packet_header{type = ?UNSUBSCRIBE, - qos = ?QOS_1}, - variable = #mqtt_packet_unsubscribe{packet_id = PacketId, - properties = Properties, - topic_filters = TopicFilters} - }). - --define(UNSUBACK_PACKET(PacketId), - #mqtt_packet{header = #mqtt_packet_header{type = ?UNSUBACK}, - variable = #mqtt_packet_unsuback{packet_id = PacketId} - }). - --define(UNSUBACK_PACKET(PacketId, ReasonCodes), - #mqtt_packet{header = #mqtt_packet_header{type = ?UNSUBACK}, - variable = #mqtt_packet_unsuback{packet_id = PacketId, - reason_codes = ReasonCodes} - }). - --define(UNSUBACK_PACKET(PacketId, Properties, ReasonCodes), - #mqtt_packet{header = #mqtt_packet_header{type = ?UNSUBACK}, - variable = #mqtt_packet_unsuback{packet_id = PacketId, - properties = Properties, - reason_codes = ReasonCodes} - }). - --define(DISCONNECT_PACKET(), - #mqtt_packet{header = #mqtt_packet_header{type = ?DISCONNECT}, - variable = #mqtt_packet_disconnect{reason_code = 0} - }). - --define(DISCONNECT_PACKET(ReasonCode), - #mqtt_packet{header = #mqtt_packet_header{type = ?DISCONNECT}, - variable = #mqtt_packet_disconnect{reason_code = ReasonCode} - }). - --define(DISCONNECT_PACKET(ReasonCode, Properties), - #mqtt_packet{header = #mqtt_packet_header{type = ?DISCONNECT}, - variable = #mqtt_packet_disconnect{reason_code = ReasonCode, - properties = Properties} - }). - --define(PACKET(Type), #mqtt_packet{header = #mqtt_packet_header{type = Type}}). - --endif. diff --git a/apps/iot/src/emqtt/emqtt.erl b/apps/iot/src/emqtt/emqtt.erl deleted file mode 100644 index 586c828..0000000 --- a/apps/iot/src/emqtt/emqtt.erl +++ /dev/null @@ -1,1319 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqtt). - --behaviour(gen_statem). - --include("emqtt.hrl"). - --export([start_link/0, start_link/1]). - --export([connect/1, connect/2, disconnect/1, disconnect/2, disconnect/3]). - --export([ping/1]). - -%% PubSub --export([ subscribe/2, subscribe/3, subscribe/4, publish/2, publish/3, publish/4, publish/5, unsubscribe/2, unsubscribe/3]). - -%% Puback... --export([puback/2, puback/3, puback/4, pubrec/2, pubrec/3, pubrec/4, pubrel/2, pubrel/3, pubrel/4, pubcomp/2, pubcomp/3, pubcomp/4 ]). - --export([subscriptions/1]). - --export([info/1, stop/1]). - -%% For test cases --export([ pause/1, resume/1 ]). - --export([ initialized/3, waiting_for_connack/3, connected/3, inflight_full/3, random_client_id/0, reason_code_name/1 ]). - --export([ init/1, callback_mode/0, handle_event/4, terminate/3, code_change/4 ]). - --export_type([host/0 , option/0 , properties/0 , payload/0 , pubopt/0 , subopt/0 , mqtt_msg/0 , client/0]). - --type(host() :: inet:ip_address() | inet:hostname()). - -%% Message handler is a set of callbacks defined to handle MQTT messages -%% as well as the disconnect event. --define(NO_MSG_HDLR, undefined). - --type(mfas() :: {module(), atom(), list()} | {function(), list()}). - --type(msg_handler() :: #{puback := fun((_) -> any()) | mfas(), - publish := fun((emqx_types:message()) -> any()) | mfas(), - disconnected := fun(({reason_code(), _Properties :: term()}) -> any()) | mfas() - }). - --type(option() :: {name, atom()} - | {owner, pid()} - | {msg_handler, msg_handler()} - | {host, host()} - | {hosts, [{host(), inet:port_number()}]} - | {port, inet:port_number()} - | {tcp_opts, [gen_tcp:option()]} - | {ssl, boolean()} - | {ssl_opts, [ssl:ssl_option()]} - | {ws_path, string()} - | {connect_timeout, pos_integer()} - | {bridge_mode, boolean()} - | {clientid, iodata()} - | {clean_start, boolean()} - | {username, iodata()} - | {password, iodata()} - | {proto_ver, v3 | v4 | v5} - | {keepalive, non_neg_integer()} - | {max_inflight, pos_integer()} - | {retry_interval, timeout()} - | {will_topic, iodata()} - | {will_payload, iodata()} - | {will_retain, boolean()} - | {will_qos, qos()} - | {will_props, properties()} - | {auto_ack, boolean()} - | {ack_timeout, pos_integer()} - | {force_ping, boolean()} - | {properties, properties()}). - --type(maybe(T) :: undefined | T). --type(topic() :: binary()). --type(payload() :: iodata()). --type(packet_id() :: 0..16#FFFF). --type(reason_code() :: 0..16#FF). --type(properties() :: #{atom() => term()}). --type(version() :: ?MQTT_PROTO_V3 - | ?MQTT_PROTO_V4 - | ?MQTT_PROTO_V5). --type(qos() :: ?QOS_0 | ?QOS_1 | ?QOS_2). --type(qos_name() :: qos0 | at_most_once | - qos1 | at_least_once | - qos2 | exactly_once). --type(pubopt() :: {retain, boolean()} - | {qos, qos() | qos_name()}). --type(subopt() :: {rh, 0 | 1 | 2} - | {rap, boolean()} - | {nl, boolean()} - | {qos, qos() | qos_name()}). - --type(subscribe_ret() :: - {ok, properties(), [reason_code()]} | {error, term()}). - --type(client() :: pid() | atom()). - --opaque(mqtt_msg() :: #mqtt_msg{}). - --record(state, { - name :: atom(), - owner :: pid(), - msg_handler :: ?NO_MSG_HDLR | msg_handler(), - host :: host(), - port :: inet:port_number(), - hosts :: [{host(), inet:port_number()}], - socket :: inet:socket() | pid(), - sock_opts :: [emqtt_sock:option()|emqtt_ws:option()], - connect_timeout :: pos_integer(), - bridge_mode :: boolean(), - clientid :: binary(), - clean_start :: boolean(), - username :: maybe(binary()), - password :: maybe(binary()), - proto_ver :: version(), - proto_name :: iodata(), - keepalive :: non_neg_integer(), - keepalive_timer :: maybe(reference()), - force_ping :: boolean(), - paused :: boolean(), - will_flag :: boolean(), - will_msg :: mqtt_msg(), - properties :: properties(), - pending_calls :: list(), - subscriptions :: map(), - max_inflight :: infinity | pos_integer(), - inflight :: #{packet_id() => term()}, - awaiting_rel :: map(), - auto_ack :: boolean(), - ack_timeout :: pos_integer(), - ack_timer :: reference(), - retry_interval :: pos_integer(), - retry_timer :: reference(), - session_present :: boolean(), - last_packet_id :: packet_id(), - parse_state :: emqtt_frame:parse_state() - }). - --record(call, { - id, - from, - req, - ts -}). - -%% Default timeout --define(DEFAULT_KEEPALIVE, 60). --define(DEFAULT_RETRY_INTERVAL, 30000). --define(DEFAULT_ACK_TIMEOUT, 30000). --define(DEFAULT_CONNECT_TIMEOUT, 60000). - --define(PROPERTY(Name, Val), #state{properties = #{Name := Val}}). - --define(WILL_MSG(QoS, Retain, Topic, Props, Payload), - #mqtt_msg{qos = QoS, - retain = Retain, - topic = Topic, - props = Props, - payload = Payload - }). - --define(NO_CLIENT_ID, <<>>). - -%%-------------------------------------------------------------------- -%% API -%%-------------------------------------------------------------------- - --spec(start_link() -> gen_statem:start_ret()). -start_link() -> start_link([]). - --spec(start_link(map() | [option()]) -> gen_statem:start_ret()). -start_link(Options) when is_map(Options) -> - start_link(maps:to_list(Options)); -start_link(Options) when is_list(Options) -> - ok = emqtt_props:validate( - proplists:get_value(properties, Options, #{})), - case proplists:get_value(name, Options) of - undefined -> - gen_statem:start_link(?MODULE, [with_owner(Options)], []); - Name when is_atom(Name) -> - gen_statem:start_link({local, Name}, ?MODULE, [with_owner(Options)], []) - end. - -with_owner(Options) -> - case proplists:get_value(owner, Options) of - Owner when is_pid(Owner) -> - Options; - undefined -> - [{owner, self()} | Options] - end. - --spec(connect(client()) -> {ok, properties()} | {error, term()}). -connect(Client) -> - connect(Client, infinity). -connect(Client, Timeout) -> - gen_statem:call(Client, connect, Timeout). - --spec(subscribe(client(), topic() | {topic(), qos() | qos_name() | [subopt()]} | [{topic(), qos()}]) - -> subscribe_ret()). -subscribe(Client, Topic) when is_binary(Topic) -> - subscribe(Client, {Topic, ?QOS_0}); -subscribe(Client, {Topic, QoS}) when is_binary(Topic), is_atom(QoS) -> - subscribe(Client, {Topic, ?QOS_I(QoS)}); -subscribe(Client, {Topic, QoS}) when is_binary(Topic), ?IS_QOS(QoS) -> - subscribe(Client, [{Topic, ?QOS_I(QoS)}]); -subscribe(Client, Topics) when is_list(Topics) -> - subscribe(Client, #{}, lists:map( - fun({Topic, QoS}) when is_binary(Topic), is_atom(QoS) -> - {Topic, [{qos, ?QOS_I(QoS)}]}; - ({Topic, QoS}) when is_binary(Topic), ?IS_QOS(QoS) -> - {Topic, [{qos, ?QOS_I(QoS)}]}; - ({Topic, Opts}) when is_binary(Topic), is_list(Opts) -> - {Topic, Opts} - end, Topics)). - --spec(subscribe(client(), topic(), qos() | qos_name() | [subopt()]) -> - subscribe_ret(); - (client(), properties(), [{topic(), qos() | [subopt()]}]) -> - subscribe_ret()). -subscribe(Client, Topic, QoS) when is_binary(Topic), is_atom(QoS) -> - subscribe(Client, Topic, ?QOS_I(QoS)); -subscribe(Client, Topic, QoS) when is_binary(Topic), ?IS_QOS(QoS) -> - subscribe(Client, Topic, [{qos, QoS}]); -subscribe(Client, Topic, Opts) when is_binary(Topic), is_list(Opts) -> - subscribe(Client, #{}, [{Topic, Opts}]); -subscribe(Client, Properties, Topics) when is_map(Properties), is_list(Topics) -> - Topics1 = [{Topic, parse_subopt(Opts)} || {Topic, Opts} <- Topics], - gen_statem:call(Client, {subscribe, Properties, Topics1}). - --spec(subscribe(client(), properties(), topic(), qos() | qos_name() | [subopt()]) - -> subscribe_ret()). -subscribe(Client, Properties, Topic, QoS) - when is_map(Properties), is_binary(Topic), is_atom(QoS) -> - subscribe(Client, Properties, Topic, ?QOS_I(QoS)); -subscribe(Client, Properties, Topic, QoS) - when is_map(Properties), is_binary(Topic), ?IS_QOS(QoS) -> - subscribe(Client, Properties, Topic, [{qos, QoS}]); -subscribe(Client, Properties, Topic, Opts) - when is_map(Properties), is_binary(Topic), is_list(Opts) -> - subscribe(Client, Properties, [{Topic, Opts}]). - -parse_subopt(Opts) -> - parse_subopt(Opts, #{rh => 0, rap => 0, nl => 0, qos => ?QOS_0}). - -parse_subopt([], Result) -> - Result; -parse_subopt([{rh, I} | Opts], Result) when I >= 0, I =< 2 -> - parse_subopt(Opts, Result#{rh := I}); -parse_subopt([{rap, true} | Opts], Result) -> - parse_subopt(Opts, Result#{rap := 1}); -parse_subopt([{rap, false} | Opts], Result) -> - parse_subopt(Opts, Result#{rap := 0}); -parse_subopt([{nl, true} | Opts], Result) -> - parse_subopt(Opts, Result#{nl := 1}); -parse_subopt([{nl, false} | Opts], Result) -> - parse_subopt(Opts, Result#{nl := 0}); -parse_subopt([{qos, QoS} | Opts], Result) -> - parse_subopt(Opts, Result#{qos := ?QOS_I(QoS)}); -parse_subopt([_ | Opts], Result) -> - parse_subopt(Opts, Result). - --spec(publish(client(), topic(), payload()) -> ok | {error, term()}). -publish(Client, Topic, Payload) when is_binary(Topic) -> - publish(Client, #mqtt_msg{topic = Topic, qos = ?QOS_0, payload = iolist_to_binary(Payload)}). - --spec(publish(client(), topic(), payload(), qos() | qos_name() | [pubopt()]) - -> ok | {ok, packet_id()} | {error, term()}). -publish(Client, Topic, Payload, QoS) when is_binary(Topic), is_atom(QoS) -> - publish(Client, Topic, Payload, [{qos, ?QOS_I(QoS)}]); -publish(Client, Topic, Payload, QoS) when is_binary(Topic), ?IS_QOS(QoS) -> - publish(Client, Topic, Payload, [{qos, QoS}]); -publish(Client, Topic, Payload, Opts) when is_binary(Topic), is_list(Opts) -> - publish(Client, Topic, #{}, Payload, Opts). - --spec(publish(client(), topic(), properties(), payload(), [pubopt()]) - -> ok | {ok, packet_id()} | {error, term()}). -publish(Client, Topic, Properties, Payload, Opts) - when is_binary(Topic), is_map(Properties), is_list(Opts) -> - ok = emqtt_props:validate(Properties), - Retain = proplists:get_bool(retain, Opts), - QoS = ?QOS_I(proplists:get_value(qos, Opts, ?QOS_0)), - publish(Client, #mqtt_msg{qos = QoS, - retain = Retain, - topic = Topic, - props = Properties, - payload = iolist_to_binary(Payload)}). - --spec(publish(client(), #mqtt_msg{}) -> ok | {ok, packet_id()} | {error, term()}). -publish(Client, Msg) -> - gen_statem:call(Client, {publish, Msg}). - --spec(unsubscribe(client(), topic() | [topic()]) -> subscribe_ret()). -unsubscribe(Client, Topic) when is_binary(Topic) -> - unsubscribe(Client, [Topic]); -unsubscribe(Client, Topics) when is_list(Topics) -> - unsubscribe(Client, #{}, Topics). - --spec(unsubscribe(client(), properties(), topic() | [topic()]) -> subscribe_ret()). -unsubscribe(Client, Properties, Topic) when is_map(Properties), is_binary(Topic) -> - unsubscribe(Client, Properties, [Topic]); -unsubscribe(Client, Properties, Topics) when is_map(Properties), is_list(Topics) -> - gen_statem:call(Client, {unsubscribe, Properties, Topics}). - --spec(ping(client()) -> pong). -ping(Client) -> - gen_statem:call(Client, ping). - --spec(disconnect(client()) -> ok). -disconnect(Client) -> - disconnect(Client, ?RC_SUCCESS). - --spec(disconnect(client(), reason_code()) -> ok). -disconnect(Client, ReasonCode) -> - disconnect(Client, ReasonCode, #{}). - --spec(disconnect(client(), reason_code(), properties()) -> ok). -disconnect(Client, ReasonCode, Properties) -> - gen_statem:call(Client, {disconnect, ReasonCode, Properties}). - -%%-------------------------------------------------------------------- -%% For test cases -%%-------------------------------------------------------------------- - -puback(Client, PacketId) when is_integer(PacketId) -> - puback(Client, PacketId, ?RC_SUCCESS). -puback(Client, PacketId, ReasonCode) - when is_integer(PacketId), is_integer(ReasonCode) -> - puback(Client, PacketId, ReasonCode, #{}). -puback(Client, PacketId, ReasonCode, Properties) - when is_integer(PacketId), is_integer(ReasonCode), is_map(Properties) -> - gen_statem:cast(Client, {puback, PacketId, ReasonCode, Properties}). - -pubrec(Client, PacketId) when is_integer(PacketId) -> - pubrec(Client, PacketId, ?RC_SUCCESS). -pubrec(Client, PacketId, ReasonCode) - when is_integer(PacketId), is_integer(ReasonCode) -> - pubrec(Client, PacketId, ReasonCode, #{}). -pubrec(Client, PacketId, ReasonCode, Properties) - when is_integer(PacketId), is_integer(ReasonCode), is_map(Properties) -> - gen_statem:cast(Client, {pubrec, PacketId, ReasonCode, Properties}). - -pubrel(Client, PacketId) when is_integer(PacketId) -> - pubrel(Client, PacketId, ?RC_SUCCESS). -pubrel(Client, PacketId, ReasonCode) - when is_integer(PacketId), is_integer(ReasonCode) -> - pubrel(Client, PacketId, ReasonCode, #{}). -pubrel(Client, PacketId, ReasonCode, Properties) - when is_integer(PacketId), is_integer(ReasonCode), is_map(Properties) -> - gen_statem:cast(Client, {pubrel, PacketId, ReasonCode, Properties}). - -pubcomp(Client, PacketId) when is_integer(PacketId) -> - pubcomp(Client, PacketId, ?RC_SUCCESS). -pubcomp(Client, PacketId, ReasonCode) - when is_integer(PacketId), is_integer(ReasonCode) -> - pubcomp(Client, PacketId, ReasonCode, #{}). -pubcomp(Client, PacketId, ReasonCode, Properties) - when is_integer(PacketId), is_integer(ReasonCode), is_map(Properties) -> - gen_statem:cast(Client, {pubcomp, PacketId, ReasonCode, Properties}). - -subscriptions(Client) -> - gen_statem:call(Client, subscriptions). - -info(Client) -> - gen_statem:call(Client, info). - -stop(Client) -> - gen_statem:call(Client, stop). - -pause(Client) -> - gen_statem:call(Client, pause). - -resume(Client) -> - gen_statem:call(Client, resume). - -%%-------------------------------------------------------------------- -%% gen_statem callbacks -%%-------------------------------------------------------------------- - -init([Options]) -> - process_flag(trap_exit, true), - ClientId = case {proplists:get_value(proto_ver, Options, v4), - proplists:get_value(clientid, Options)} of - {v5, undefined} -> ?NO_CLIENT_ID; - {_ver, undefined} -> random_client_id(); - {_ver, Id} -> iolist_to_binary(Id) - end, - State = init(Options, #state{host = {127,0,0,1}, - port = 1883, - hosts = [], - sock_opts = [], - bridge_mode = false, - clientid = ClientId, - clean_start = true, - proto_ver = ?MQTT_PROTO_V4, - proto_name = <<"MQTT">>, - keepalive = ?DEFAULT_KEEPALIVE, - force_ping = false, - paused = false, - will_flag = false, - will_msg = #mqtt_msg{}, - pending_calls = [], - subscriptions = #{}, - max_inflight = infinity, - inflight = #{}, - awaiting_rel = #{}, - properties = #{}, - auto_ack = true, - ack_timeout = ?DEFAULT_ACK_TIMEOUT, - retry_interval = ?DEFAULT_RETRY_INTERVAL, - connect_timeout = ?DEFAULT_CONNECT_TIMEOUT, - last_packet_id = 1 - }), - {ok, initialized, init_parse_state(State)}. - -random_client_id() -> - rand:seed(exsplus, erlang:timestamp()), - I1 = rand:uniform(round(math:pow(2, 48))) - 1, - I2 = rand:uniform(round(math:pow(2, 32))) - 1, - {ok, Host} = inet:gethostname(), - RandId = io_lib:format("~12.16.0b~8.16.0b", [I1, I2]), - iolist_to_binary(["emqtt-", Host, "-", RandId]). - -init([], State) -> - State; -init([{name, Name} | Opts], State) -> - init(Opts, State#state{name = Name}); -init([{owner, Owner} | Opts], State) when is_pid(Owner) -> - link(Owner), - init(Opts, State#state{owner = Owner}); -init([{msg_handler, Hdlr} | Opts], State) -> - init(Opts, State#state{msg_handler = Hdlr}); -init([{host, Host} | Opts], State) -> - init(Opts, State#state{host = Host}); -init([{port, Port} | Opts], State) -> - init(Opts, State#state{port = Port}); -init([{hosts, Hosts} | Opts], State) -> - Hosts1 = - lists:foldl(fun({Host, Port}, Acc) -> - [{Host, Port}|Acc]; - (Host, Acc) -> - [{Host, 1883}|Acc] - end, [], Hosts), - init(Opts, State#state{hosts = Hosts1}); -init([{tcp_opts, TcpOpts} | Opts], State = #state{sock_opts = SockOpts}) -> - init(Opts, State#state{sock_opts = merge_opts(SockOpts, TcpOpts)}); -init([{ssl, EnableSsl} | Opts], State) -> - case lists:keytake(ssl_opts, 1, Opts) of - {value, SslOpts, WithOutSslOpts} -> - init([SslOpts, {ssl, EnableSsl}| WithOutSslOpts], State); - false -> - init([{ssl_opts, []}, {ssl, EnableSsl}| Opts], State) - end; -init([{ssl_opts, SslOpts} | Opts], State = #state{sock_opts = SockOpts}) -> - case lists:keytake(ssl, 1, Opts) of - {value, {ssl, true}, WithOutEnableSsl} -> - ok = ssl:start(), - SockOpts1 = merge_opts(SockOpts, [{ssl_opts, SslOpts}]), - init(WithOutEnableSsl, State#state{sock_opts = SockOpts1}); - {value, {ssl, false}, WithOutEnableSsl} -> - init(WithOutEnableSsl, State); - false -> - init(Opts, State) - end; -init([{ws_path, Path} | Opts], State = #state{sock_opts = SockOpts}) -> - init(Opts, State#state{sock_opts = [{ws_path, Path}|SockOpts]}); -init([{clientid, ClientId} | Opts], State) -> - init(Opts, State#state{clientid = iolist_to_binary(ClientId)}); -init([{clean_start, CleanStart} | Opts], State) when is_boolean(CleanStart) -> - init(Opts, State#state{clean_start = CleanStart}); -init([{username, Username} | Opts], State) -> - init(Opts, State#state{username = iolist_to_binary(Username)}); -init([{password, Password} | Opts], State) -> - init(Opts, State#state{password = iolist_to_binary(Password)}); -init([{keepalive, Secs} | Opts], State) -> - init(Opts, State#state{keepalive = Secs}); -init([{proto_ver, v3} | Opts], State) -> - init(Opts, State#state{proto_ver = ?MQTT_PROTO_V3, - proto_name = <<"MQIsdp">>}); -init([{proto_ver, v4} | Opts], State) -> - init(Opts, State#state{proto_ver = ?MQTT_PROTO_V4, - proto_name = <<"MQTT">>}); -init([{proto_ver, v5} | Opts], State) -> - init(Opts, State#state{proto_ver = ?MQTT_PROTO_V5, - proto_name = <<"MQTT">>}); -init([{will_topic, Topic} | Opts], State = #state{will_msg = WillMsg}) -> - WillMsg1 = init_will_msg({topic, Topic}, WillMsg), - init(Opts, State#state{will_flag = true, will_msg = WillMsg1}); -init([{will_props, Properties} | Opts], State = #state{will_msg = WillMsg}) -> - init(Opts, State#state{will_msg = init_will_msg({props, Properties}, WillMsg)}); -init([{will_payload, Payload} | Opts], State = #state{will_msg = WillMsg}) -> - init(Opts, State#state{will_msg = init_will_msg({payload, Payload}, WillMsg)}); -init([{will_retain, Retain} | Opts], State = #state{will_msg = WillMsg}) -> - init(Opts, State#state{will_msg = init_will_msg({retain, Retain}, WillMsg)}); -init([{will_qos, QoS} | Opts], State = #state{will_msg = WillMsg}) -> - init(Opts, State#state{will_msg = init_will_msg({qos, QoS}, WillMsg)}); -init([{connect_timeout, Timeout}| Opts], State) -> - init(Opts, State#state{connect_timeout = timer:seconds(Timeout)}); -init([{ack_timeout, Timeout}| Opts], State) -> - init(Opts, State#state{ack_timeout = timer:seconds(Timeout)}); -init([force_ping | Opts], State) -> - init(Opts, State#state{force_ping = true}); -init([{force_ping, ForcePing} | Opts], State) when is_boolean(ForcePing) -> - init(Opts, State#state{force_ping = ForcePing}); -init([{properties, Properties} | Opts], State = #state{properties = InitProps}) -> - init(Opts, State#state{properties = maps:merge(InitProps, Properties)}); -init([{max_inflight, infinity} | Opts], State) -> - init(Opts, State#state{max_inflight = infinity, - inflight = #{}}); -init([{max_inflight, I} | Opts], State) when is_integer(I) -> - init(Opts, State#state{max_inflight = I, - inflight = #{}}); -init([auto_ack | Opts], State) -> - init(Opts, State#state{auto_ack = true}); -init([{auto_ack, AutoAck} | Opts], State) when is_boolean(AutoAck) -> - init(Opts, State#state{auto_ack = AutoAck}); -init([{retry_interval, I} | Opts], State) -> - init(Opts, State#state{retry_interval = timer:seconds(I)}); -init([{bridge_mode, Mode} | Opts], State) when is_boolean(Mode) -> - init(Opts, State#state{bridge_mode = Mode}); -init([_Opt | Opts], State) -> - init(Opts, State). - -init_will_msg({topic, Topic}, WillMsg) -> - WillMsg#mqtt_msg{topic = iolist_to_binary(Topic)}; -init_will_msg({props, Props}, WillMsg) -> - WillMsg#mqtt_msg{props = Props}; -init_will_msg({payload, Payload}, WillMsg) -> - WillMsg#mqtt_msg{payload = iolist_to_binary(Payload)}; -init_will_msg({retain, Retain}, WillMsg) when is_boolean(Retain) -> - WillMsg#mqtt_msg{retain = Retain}; -init_will_msg({qos, QoS}, WillMsg) -> - WillMsg#mqtt_msg{qos = ?QOS_I(QoS)}. - -init_parse_state(State = #state{proto_ver = Ver, properties = Properties}) -> - MaxSize = maps:get('Maximum-Packet-Size', Properties, ?MAX_PACKET_SIZE), - ParseState = emqtt_frame:initial_parse_state( - #{max_size => MaxSize, version => Ver}), - State#state{parse_state = ParseState}. - -merge_opts(Defaults, Options) -> - lists:foldl( - fun({Opt, Val}, Acc) -> - lists:keystore(Opt, 1, Acc, {Opt, Val}); - (Opt, Acc) -> - lists:usort([Opt | Acc]) - end, Defaults, Options). - -callback_mode() -> state_functions. - -initialized({call, From}, connect, State = #state{sock_opts = SockOpts, connect_timeout = Timeout}) -> - case sock_connect(hosts(State), SockOpts, Timeout) of - {ok, Sock} -> - case mqtt_connect(run_sock(State#state{socket = Sock})) of - {ok, NewState} -> - {next_state, waiting_for_connack, - add_call(new_call(connect, From), NewState), [Timeout]}; - Error = {error, Reason} -> - {stop_and_reply, Reason, [{reply, From, Error}]} - end; - Error = {error, Reason} -> - {stop_and_reply, {shutdown, Reason}, [{reply, From, Error}]} - end; - -initialized(EventType, EventContent, State) -> - handle_event(EventType, EventContent, initialized, State). - -mqtt_connect(State = #state{clientid = ClientId, - clean_start = CleanStart, - bridge_mode = IsBridge, - username = Username, - password = Password, - proto_ver = ProtoVer, - proto_name = ProtoName, - keepalive = KeepAlive, - will_flag = WillFlag, - will_msg = WillMsg, - properties = Properties}) -> - ?WILL_MSG(WillQoS, WillRetain, WillTopic, WillProps, WillPayload) = WillMsg, - ConnProps = emqtt_props:filter(?CONNECT, Properties), - send(?CONNECT_PACKET( - #mqtt_packet_connect{proto_ver = ProtoVer, - proto_name = ProtoName, - is_bridge = IsBridge, - clean_start = CleanStart, - will_flag = WillFlag, - will_qos = WillQoS, - will_retain = WillRetain, - keepalive = KeepAlive, - properties = ConnProps, - clientid = ClientId, - will_props = WillProps, - will_topic = WillTopic, - will_payload = WillPayload, - username = Username, - password = Password}), State). - -waiting_for_connack(cast, ?CONNACK_PACKET(?RC_SUCCESS, - SessPresent, - Properties), - State = #state{properties = AllProps, - clientid = ClientId}) -> - case take_call(connect, State) of - {value, #call{from = From}, State1} -> - AllProps1 = case Properties of - undefined -> AllProps; - _ -> maps:merge(AllProps, Properties) - end, - Reply = {ok, Properties}, - State2 = State1#state{clientid = assign_id(ClientId, AllProps1), - properties = AllProps1, - session_present = SessPresent}, - {next_state, connected, ensure_keepalive_timer(State2), - [{reply, From, Reply}]}; - false -> - {stop, bad_connack} - end; - -waiting_for_connack(cast, ?CONNACK_PACKET(ReasonCode, - _SessPresent, - Properties), - State = #state{proto_ver = ProtoVer}) -> - Reason = reason_code_name(ReasonCode, ProtoVer), - case take_call(connect, State) of - {value, #call{from = From}, _State} -> - Reply = {error, {Reason, Properties}}, - {stop_and_reply, {shutdown, Reason}, [{reply, From, Reply}]}; - false -> {stop, connack_error} - end; - -waiting_for_connack(timeout, _Timeout, State) -> - case take_call(connect, State) of - {value, #call{from = From}, _State} -> - Reply = {error, connack_timeout}, - {stop_and_reply, connack_timeout, [{reply, From, Reply}]}; - false -> {stop, connack_timeout} - end; - -waiting_for_connack(EventType, EventContent, State) -> - case take_call(connect, State) of - {value, #call{from = From}, _State} -> - case handle_event(EventType, EventContent, waiting_for_connack, State) of - {stop, Reason, State} -> - Reply = {error, {Reason, EventContent}}, - {stop_and_reply, Reason, [{reply, From, Reply}]}; - StateCallbackResult -> - StateCallbackResult - end; - false -> - {stop, connack_timeout} - end. - -connected({call, From}, subscriptions, #state{subscriptions = Subscriptions}) -> - {keep_state_and_data, [{reply, From, maps:to_list(Subscriptions)}]}; - -connected({call, From}, info, State) -> - Info = lists:zip(record_info(fields, state), tl(tuple_to_list(State))), - {keep_state_and_data, [{reply, From, Info}]}; - -connected({call, From}, pause, State) -> - {keep_state, State#state{paused = true}, [{reply, From, ok}]}; - -connected({call, From}, resume, State) -> - {keep_state, State#state{paused = false}, [{reply, From, ok}]}; - -connected({call, From}, clientid, #state{clientid = ClientId}) -> - {keep_state_and_data, [{reply, From, ClientId}]}; - -connected({call, From}, SubReq = {subscribe, Properties, Topics}, - State = #state{last_packet_id = PacketId, subscriptions = Subscriptions}) -> - case send(?SUBSCRIBE_PACKET(PacketId, Properties, Topics), State) of - {ok, NewState} -> - Call = new_call({subscribe, PacketId}, From, SubReq), - Subscriptions1 = - lists:foldl(fun({Topic, Opts}, Acc) -> - maps:put(Topic, Opts, Acc) - end, Subscriptions, Topics), - {keep_state, ensure_ack_timer(add_call(Call,NewState#state{subscriptions = Subscriptions1}))}; - Error = {error, Reason} -> - {stop_and_reply, Reason, [{reply, From, Error}]} - end; - -connected({call, From}, {publish, Msg = #mqtt_msg{qos = ?QOS_0}}, State) -> - case send(Msg, State) of - {ok, NewState} -> - {keep_state, NewState, [{reply, From, ok}]}; - Error = {error, Reason} -> - {stop_and_reply, Reason, [{reply, From, Error}]} - end; - -connected({call, From}, {publish, Msg = #mqtt_msg{qos = QoS}}, - State = #state{inflight = Inflight, last_packet_id = PacketId}) - when (QoS =:= ?QOS_1); (QoS =:= ?QOS_2) -> - Msg1 = Msg#mqtt_msg{packet_id = PacketId}, - case send(Msg1, State) of - {ok, NewState} -> - Inflight1 = maps:put(PacketId, {publish, Msg1, os:timestamp()}, Inflight), - State1 = ensure_retry_timer(NewState#state{inflight = Inflight1}), - Actions = [{reply, From, {ok, PacketId}}], - case is_inflight_full(State1) of - true -> {next_state, inflight_full, State1, Actions}; - false -> {keep_state, State1, Actions} - end; - {error, Reason} -> - {stop_and_reply, Reason, [{reply, From, {error, {PacketId, Reason}}}]} - end; - -connected({call, From}, UnsubReq = {unsubscribe, Properties, Topics}, - State = #state{last_packet_id = PacketId}) -> - case send(?UNSUBSCRIBE_PACKET(PacketId, Properties, Topics), State) of - {ok, NewState} -> - Call = new_call({unsubscribe, PacketId}, From, UnsubReq), - {keep_state, ensure_ack_timer(add_call(Call, NewState))}; - Error = {error, Reason} -> - {stop_and_reply, Reason, [{reply, From, Error}]} - end; - -connected({call, From}, ping, State) -> - case send(?PACKET(?PINGREQ), State) of - {ok, NewState} -> - Call = new_call(ping, From), - {keep_state, ensure_ack_timer(add_call(Call, NewState))}; - Error = {error, Reason} -> - {stop_and_reply, Reason, [{reply, From, Error}]} - end; - -connected({call, From}, {disconnect, ReasonCode, Properties}, State) -> - case send(?DISCONNECT_PACKET(ReasonCode, Properties), State) of - {ok, NewState} -> - {stop_and_reply, normal, [{reply, From, ok}], NewState}; - Error = {error, Reason} -> - {stop_and_reply, Reason, [{reply, From, Error}]} - end; - -connected(cast, {puback, PacketId, ReasonCode, Properties}, State) -> - send_puback(?PUBACK_PACKET(PacketId, ReasonCode, Properties), State); - -connected(cast, {pubrec, PacketId, ReasonCode, Properties}, State) -> - send_puback(?PUBREC_PACKET(PacketId, ReasonCode, Properties), State); - -connected(cast, {pubrel, PacketId, ReasonCode, Properties}, State) -> - send_puback(?PUBREL_PACKET(PacketId, ReasonCode, Properties), State); - -connected(cast, {pubcomp, PacketId, ReasonCode, Properties}, State) -> - send_puback(?PUBCOMP_PACKET(PacketId, ReasonCode, Properties), State); - -connected(cast, ?PUBLISH_PACKET(_QoS, _PacketId), #state{paused = true}) -> - keep_state_and_data; - -connected(cast, Packet = ?PUBLISH_PACKET(?QOS_0, _PacketId), State) -> - {keep_state, deliver(packet_to_msg(Packet), State)}; - -connected(cast, Packet = ?PUBLISH_PACKET(?QOS_1, _PacketId), State) -> - publish_process(?QOS_1, Packet, State); - -connected(cast, Packet = ?PUBLISH_PACKET(?QOS_2, _PacketId), State) -> - publish_process(?QOS_2, Packet, State); - -connected(cast, ?PUBACK_PACKET(_PacketId, _ReasonCode, _Properties) = PubAck, State) -> - {keep_state, delete_inflight(PubAck, State)}; - -connected(cast, ?PUBREC_PACKET(PacketId), State = #state{inflight = Inflight}) -> - NState = case maps:find(PacketId, Inflight) of - {ok, {publish, _Msg, _Ts}} -> - Inflight1 = maps:put(PacketId, {pubrel, PacketId, os:timestamp()}, Inflight), - State#state{inflight = Inflight1}; - {ok, {pubrel, _Ref, _Ts}} -> - lager:notice("[emqtt] Duplicated PUBREC Packet: ~p, client_id: ~p", [PacketId, State#state.clientid]), - State; - error -> - lager:warning("[emqtt] Unexpected PUBREC Packet: ~p, client_id: ~p", [PacketId, State#state.clientid]), - State - end, - send_puback(?PUBREL_PACKET(PacketId), NState); - -connected(cast, ?PUBREC_PACKET(PacketId, ReasonCode), State) -> - lager:notice("[emqtt] Duplicated PUBREC Packet: ~p, reason_code: ~p, client_id: ~p", [PacketId, ReasonCode, State#state.clientid]), - keep_state_and_data; - -%%TODO::... if auto_ack is false, should we take PacketId from the map? -connected(cast, ?PUBREL_PACKET(PacketId), - State = #state{awaiting_rel = AwaitingRel, auto_ack = AutoAck}) -> - case maps:take(PacketId, AwaitingRel) of - {Packet, AwaitingRel1} -> - NewState = deliver(packet_to_msg(Packet), State#state{awaiting_rel = AwaitingRel1}), - case AutoAck of - true -> send_puback(?PUBCOMP_PACKET(PacketId), NewState); - false -> {keep_state, NewState} - end; - error -> - lager:warning("[emqtt] Unexpected PUBREL: ~p, client_id: ~p", [PacketId, State#state.clientid]), - keep_state_and_data - end; - -connected(cast, ?PUBCOMP_PACKET(_PacketId, _ReasonCode, _Properties) = PubComp, State) -> - {keep_state, delete_inflight(PubComp, State)}; - -connected(cast, ?SUBACK_PACKET(PacketId, Properties, ReasonCodes), - State = #state{subscriptions = _Subscriptions}) -> - case take_call({subscribe, PacketId}, State) of - {value, #call{from = From}, NewState} -> - %%TODO: Merge reason codes to subscriptions? - Reply = {ok, Properties, ReasonCodes}, - {keep_state, NewState, [{reply, From, Reply}]}; - false -> - keep_state_and_data - end; - -connected(cast, ?UNSUBACK_PACKET(PacketId, Properties, ReasonCodes), - State = #state{subscriptions = Subscriptions}) -> - case take_call({unsubscribe, PacketId}, State) of - {value, #call{from = From, req = {_, _, Topics}}, NewState} -> - Subscriptions1 = - lists:foldl(fun(Topic, Acc) -> - maps:remove(Topic, Acc) - end, Subscriptions, Topics), - {keep_state, NewState#state{subscriptions = Subscriptions1}, - [{reply, From, {ok, Properties, ReasonCodes}}]}; - false -> - keep_state_and_data - end; - -connected(cast, ?PACKET(?PINGRESP), #state{pending_calls = []}) -> - keep_state_and_data; -connected(cast, ?PACKET(?PINGRESP), State) -> - case take_call(ping, State) of - {value, #call{from = From}, NewState} -> - {keep_state, NewState, [{reply, From, pong}]}; - false -> - keep_state_and_data - end; - -connected(cast, ?DISCONNECT_PACKET(ReasonCode, Properties), State) -> - {stop, {disconnected, ReasonCode, Properties}, State}; - -connected(info, {timeout, _TRef, keepalive}, State = #state{force_ping = true}) -> - case send(?PACKET(?PINGREQ), State) of - {ok, NewState} -> - {keep_state, ensure_keepalive_timer(NewState)}; - Error -> {stop, Error} - end; - -connected(info, {timeout, TRef, keepalive}, State = #state{socket = Sock, paused = Paused, keepalive_timer = TRef}) -> - case (not Paused) andalso should_ping(Sock) of - true -> - case send(?PACKET(?PINGREQ), State) of - {ok, NewState} -> - {ok, [{send_oct, Val}]} = emqtt_sock:getstat(Sock, [send_oct]), - put(send_oct, Val), - {keep_state, ensure_keepalive_timer(NewState), [hibernate]}; - Error -> {stop, Error} - end; - false -> - {keep_state, ensure_keepalive_timer(State), [hibernate]}; - {error, Reason} -> - {stop, Reason} - end; - -connected(info, {timeout, TRef, ack}, State = #state{ack_timer = TRef, - ack_timeout = Timeout, - pending_calls = Calls}) -> - NewState = State#state{ack_timer = undefined, - pending_calls = timeout_calls(Timeout, Calls)}, - {keep_state, ensure_ack_timer(NewState)}; - -connected(info, {timeout, TRef, retry}, State = #state{retry_timer = TRef, inflight = Inflight}) -> - case maps:size(Inflight) == 0 of - true -> {keep_state, State#state{retry_timer = undefined}}; - false -> retry_send(State) - end; - -connected(EventType, EventContent, Data) -> - handle_event(EventType, EventContent, connected, Data). - -inflight_full({call, _From}, {publish, #mqtt_msg{qos = QoS}}, _State) when (QoS =:= ?QOS_1); (QoS =:= ?QOS_2) -> - {keep_state_and_data, [postpone]}; -inflight_full(cast, ?PUBACK_PACKET(_PacketId, _ReasonCode, _Properties) = PubAck, State) -> - delete_inflight_when_full(PubAck, State); -inflight_full(cast, ?PUBCOMP_PACKET(_PacketId, _ReasonCode, _Properties) = PubComp, State) -> - delete_inflight_when_full(PubComp, State); -inflight_full(EventType, EventContent, Data) -> - %% inflight_full is a sub-state of connected state, - %% delegate all other events to connected state. - connected(EventType, EventContent, Data). - -handle_event({call, From}, stop, _StateName, _State) -> - {stop_and_reply, normal, [{reply, From, ok}]}; - -handle_event(info, {TcpOrSsL, _Sock, Data}, _StateName, State) when TcpOrSsL =:= tcp; TcpOrSsL =:= ssl -> - %lager:debug("[emqtt] RECV Data: ~p, client_id: ~p", [Data, State#state.clientid]), - process_incoming(Data, [], run_sock(State)); - -handle_event(info, {Error, _Sock, Reason}, _StateName, State) when Error =:= tcp_error; Error =:= ssl_error -> - lager:error("[emqtt] The connection error occured ~p, reason:~p, client_id: ~p", [Error, Reason, State#state.clientid]), - {stop, {shutdown, Reason}, State}; - -handle_event(info, {Closed, _Sock}, _StateName, State) when Closed =:= tcp_closed; Closed =:= ssl_closed -> - lager:debug("[emqtt] sokcet closed: ~p, client_id: ~p", [Closed, State#state.clientid]), - {stop, {shutdown, Closed}, State}; - -handle_event(info, {'EXIT', Owner, Reason}, _, State = #state{owner = Owner}) -> - lager:warning("[emqtt] Got EXIT from owner, Reason: ~p, client_id: ~p", [Reason, State#state.clientid]), - {stop, {shutdown, Reason}, State}; - -handle_event(info, {inet_reply, _Sock, ok}, _, _State) -> - keep_state_and_data; - -handle_event(info, {inet_reply, _Sock, {error, Reason}}, _, State) -> - lager:error("[emqtt] Got tcp error: ~p, client_id: ~p", [Reason, State#state.clientid]), - {stop, {shutdown, Reason}, State}; - -handle_event(info, EventContent = {'EXIT', Pid, Reason}, StateName, State) -> - lager:warning("[emqtt] State: ~s, Unexpected Event: (info, ~p), from pid: ~p, client_id: ~p", [StateName, EventContent, Pid, State#state.clientid]), - {stop, {shutdown, Reason}, State}; - -handle_event(EventType, EventContent, StateName, State) -> - lager:error("[emqtt] State: ~s, Unexpected Event: (~p, ~p), client_id: ~p", [StateName, EventType, EventContent, State#state.clientid]), - keep_state_and_data. - -%% Mandatory callback functions -terminate(Reason, _StateName, State = #state{socket = Socket}) -> - case Reason of - {disconnected, ReasonCode, Properties} -> - %% backward compatible - ok = eval_msg_handler(State, disconnected, {ReasonCode, Properties}); - _ -> - ok = eval_msg_handler(State, disconnected, Reason) - end, - case Socket =:= undefined of - true -> ok; - _ -> emqtt_sock:close(Socket) - end. - -code_change(_Vsn, State, Data, _Extra) -> - {ok, State, Data}. - -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- - -should_ping(Sock) -> - case emqtt_sock:getstat(Sock, [send_oct]) of - {ok, [{send_oct, Val}]} -> - OldVal = get(send_oct), put(send_oct, Val), - OldVal == undefined orelse OldVal == Val; - Error = {error, _Reason} -> - Error - end. - -is_inflight_full(#state{max_inflight = infinity}) -> - false; -is_inflight_full(#state{max_inflight = MaxLimit, inflight = Inflight}) -> - maps:size(Inflight) >= MaxLimit. - -delete_inflight(?PUBACK_PACKET(PacketId, ReasonCode, Properties), - State = #state{inflight = Inflight}) -> - case maps:find(PacketId, Inflight) of - {ok, {publish, #mqtt_msg{packet_id = PacketId}, _Ts}} -> - ok = eval_msg_handler(State, puback, #{packet_id => PacketId, - reason_code => ReasonCode, - properties => Properties}), - State#state{inflight = maps:remove(PacketId, Inflight)}; - error -> - lager:warning("[emqtt] Unexpected PUBACK: ~p, client_id: ~p", [PacketId, State#state.clientid]), - State - end; -delete_inflight(?PUBCOMP_PACKET(PacketId, ReasonCode, Properties), - State = #state{inflight = Inflight}) -> - case maps:find(PacketId, Inflight) of - {ok, {pubrel, _PacketId, _Ts}} -> - ok = eval_msg_handler(State, puback, #{packet_id => PacketId, - reason_code => ReasonCode, - properties => Properties}), - State#state{inflight = maps:remove(PacketId, Inflight)}; - error -> - lager:warning("[emqtt] Unexpected PUBCOMP Packet: ~p, client_id: ~p", [PacketId, State#state.clientid]), - State - end. - -delete_inflight_when_full(Packet, State) -> - State1 = delete_inflight(Packet, State), - case is_inflight_full(State1) of - true -> - {keep_state, State1}; - false -> - {next_state, connected, State1} - end. - -assign_id(?NO_CLIENT_ID, Props) -> - case maps:find('Assigned-Client-Identifier', Props) of - {ok, Value} -> - Value; - _ -> - error(bad_client_id) - end; -assign_id(Id, _Props) -> - Id. - -publish_process(?QOS_1, Packet = ?PUBLISH_PACKET(?QOS_1, PacketId), - State0 = #state{auto_ack = AutoAck}) -> - State = deliver(packet_to_msg(Packet), State0), - case AutoAck of - true -> - send_puback(?PUBACK_PACKET(PacketId), State); - false -> - {keep_state, State} - end; -publish_process(?QOS_2, Packet = ?PUBLISH_PACKET(?QOS_2, PacketId), - State = #state{awaiting_rel = AwaitingRel}) -> - case send_puback(?PUBREC_PACKET(PacketId), State) of - {keep_state, NewState} -> - AwaitingRel1 = maps:put(PacketId, Packet, AwaitingRel), - {keep_state, NewState#state{awaiting_rel = AwaitingRel1}}; - Stop -> - Stop - end. - -ensure_keepalive_timer(State = ?PROPERTY('Server-Keep-Alive', Secs)) -> - ensure_keepalive_timer(timer:seconds(Secs), State#state{keepalive = Secs}); -ensure_keepalive_timer(State = #state{keepalive = 0}) -> - State; -ensure_keepalive_timer(State = #state{keepalive = I}) -> - ensure_keepalive_timer(timer:seconds(I), State). -ensure_keepalive_timer(I, State) when is_integer(I) -> - State#state{keepalive_timer = erlang:start_timer(I, self(), keepalive)}. - -new_call(Id, From) -> - new_call(Id, From, undefined). -new_call(Id, From, Req) -> - #call{id = Id, from = From, req = Req, ts = os:timestamp()}. - -add_call(Call, Data = #state{pending_calls = Calls}) -> - Data#state{pending_calls = [Call | Calls]}. - -take_call(Id, Data = #state{pending_calls = Calls}) -> - case lists:keytake(Id, #call.id, Calls) of - {value, Call, Left} -> - {value, Call, Data#state{pending_calls = Left}}; - false -> false - end. - -timeout_calls(Timeout, Calls) -> - timeout_calls(os:timestamp(), Timeout, Calls). -timeout_calls(Now, Timeout, Calls) -> - lists:foldl(fun(C = #call{from = From, ts = Ts}, Acc) -> - case (timer:now_diff(Now, Ts) div 1000) >= Timeout of - true -> - gen_statem:reply(From, {error, ack_timeout}), - Acc; - false -> [C | Acc] - end - end, [], Calls). - -ensure_ack_timer(State = #state{ack_timer = undefined, - ack_timeout = Timeout, - pending_calls = Calls}) when length(Calls) > 0 -> - State#state{ack_timer = erlang:start_timer(Timeout, self(), ack)}; -ensure_ack_timer(State) -> State. - -ensure_retry_timer(State = #state{retry_interval = Interval}) -> - do_ensure_retry_timer(Interval, State). - -do_ensure_retry_timer(Interval, State = #state{retry_timer = undefined}) - when Interval > 0 -> - State#state{retry_timer = erlang:start_timer(Interval, self(), retry)}; -do_ensure_retry_timer(_Interval, State) -> - State. - -retry_send(State = #state{inflight = Inflight}) -> - SortFun = fun({_, _, Ts1}, {_, _, Ts2}) -> Ts1 < Ts2 end, - Msgs = lists:sort(SortFun, maps:values(Inflight)), - retry_send(Msgs, os:timestamp(), State ). - -retry_send([], _Now, State) -> - {keep_state, ensure_retry_timer(State)}; -retry_send([{Type, Msg, Ts} | Msgs], Now, State = #state{retry_interval = Interval}) -> - Diff = timer:now_diff(Now, Ts) div 1000, %% micro -> ms - case (Diff >= Interval) of - true -> case retry_send(Type, Msg, Now, State) of - {ok, NewState} -> retry_send(Msgs, Now, NewState); - {error, Error} -> {stop, Error} - end; - false -> {keep_state, do_ensure_retry_timer(Interval - Diff, State)} - end. - -retry_send(publish, Msg = #mqtt_msg{qos = QoS, packet_id = PacketId}, - Now, State = #state{inflight = Inflight}) -> - Msg1 = Msg#mqtt_msg{dup = (QoS =:= ?QOS_1)}, - case send(Msg1, State) of - {ok, NewState} -> - Inflight1 = maps:put(PacketId, {publish, Msg1, Now}, Inflight), - {ok, NewState#state{inflight = Inflight1}}; - Error = {error, _Reason} -> - Error - end; -retry_send(pubrel, PacketId, Now, State = #state{inflight = Inflight}) -> - case send(?PUBREL_PACKET(PacketId), State) of - {ok, NewState} -> - Inflight1 = maps:put(PacketId, {pubrel, PacketId, Now}, Inflight), - {ok, NewState#state{inflight = Inflight1}}; - Error = {error, _Reason} -> - Error - end. - -deliver(#mqtt_msg{qos = QoS, dup = Dup, retain = Retain, packet_id = PacketId, - topic = Topic, props = Props, payload = Payload}, - State) -> - Msg = #{qos => QoS, dup => Dup, retain => Retain, packet_id => PacketId, - topic => Topic, properties => Props, payload => Payload, - client_pid => self()}, - ok = eval_msg_handler(State, publish, Msg), - State. - -eval_msg_handler(#state{msg_handler = ?NO_MSG_HDLR, - owner = Owner}, - disconnected, {ReasonCode, Properties}) when is_integer(ReasonCode) -> - %% Special handling for disconnected message when there is no handler callback - Owner ! {disconnected, ReasonCode, Properties}, - ok; -eval_msg_handler(#state{msg_handler = ?NO_MSG_HDLR}, - disconnected, _OtherReason) -> - %% do nothing to be backward compatible - ok; -eval_msg_handler(#state{msg_handler = ?NO_MSG_HDLR, - owner = Owner}, Kind, Msg) -> - Owner ! {Kind, Msg}, - ok; -eval_msg_handler(#state{msg_handler = Handler}, Kind, Msg) -> - F = maps:get(Kind, Handler), - _ = apply_handler_function(F, Msg), - ok. - -apply_handler_function(F, Msg) - when is_function(F) -> - erlang:apply(F, [Msg]); -apply_handler_function({F, A}, Msg) - when is_function(F), - is_list(A) -> - erlang:apply(F, [Msg] ++ A); -apply_handler_function({M, F, A}, Msg) - when is_atom(M), - is_atom(F), - is_list(A) -> - erlang:apply(M, F, [Msg] ++ A). - -packet_to_msg(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, - dup = Dup, - qos = QoS, - retain = R}, - variable = #mqtt_packet_publish{topic_name = Topic, - packet_id = PacketId, - properties = Props}, - payload = Payload}) -> - #mqtt_msg{qos = QoS, retain = R, dup = Dup, packet_id = PacketId, - topic = Topic, props = Props, payload = Payload}. - -msg_to_packet(#mqtt_msg{qos = QoS, dup = Dup, retain = Retain, packet_id = PacketId, - topic = Topic, props = Props, payload = Payload}) -> - #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, - qos = QoS, - retain = Retain, - dup = Dup}, - variable = #mqtt_packet_publish{topic_name = Topic, - packet_id = PacketId, - properties = Props}, - payload = Payload}. - -%%-------------------------------------------------------------------- -%% Socket Connect/Send - -sock_connect(Hosts, SockOpts, Timeout) -> - sock_connect(Hosts, SockOpts, Timeout, {error, no_hosts}). - -sock_connect([], _SockOpts, _Timeout, LastErr) -> - LastErr; -sock_connect([{Host, Port} | Hosts], SockOpts, Timeout, _LastErr) -> - case emqtt_sock:connect(Host, Port, SockOpts, Timeout) of - {ok, SockOrPid} -> - {ok, SockOrPid}; - Error = {error, _Reason} -> - sock_connect(Hosts, SockOpts, Timeout, Error) - end. - -hosts(#state{hosts = [], host = Host, port = Port}) -> - [{Host, Port}]; -hosts(#state{hosts = Hosts}) -> Hosts. - -send_puback(Packet, State) -> - case send(Packet, State) of - {ok, NewState} -> {keep_state, NewState}; - {error, Reason} -> {stop, {shutdown, Reason}} - end. - -send(Msg, State) when is_record(Msg, mqtt_msg) -> - send(msg_to_packet(Msg), State); - -send(Packet, State = #state{socket = Sock, proto_ver = Ver}) - when is_record(Packet, mqtt_packet) -> - Data = emqtt_frame:serialize(Packet, Ver), - case emqtt_sock:send(Sock, Data) of - ok -> - {ok, bump_last_packet_id(State)}; - Error -> - Error - end. - -run_sock(State = #state{socket = Sock}) -> - emqtt_sock:setopts(Sock, [{active, once}]), State. - -%%-------------------------------------------------------------------- -%% Process incomming - -process_incoming(<<>>, Packets, State) -> - {keep_state, State, next_events(Packets)}; - -process_incoming(Bytes, Packets, State = #state{parse_state = ParseState}) -> - try emqtt_frame:parse(Bytes, ParseState) of - {ok, Packet, Rest, NParseState} -> - process_incoming(Rest, [Packet|Packets], State#state{parse_state = NParseState}); - {more, NParseState} -> - {keep_state, State#state{parse_state = NParseState}, next_events(Packets)} - catch - error:Error -> - {stop, Error} - end. - --compile({inline, [next_events/1]}). -next_events([]) -> []; -next_events([Packet]) -> - {next_event, cast, Packet}; -next_events(Packets) -> - [{next_event, cast, Packet} || Packet <- lists:reverse(Packets)]. - -%%-------------------------------------------------------------------- -%% packet_id generation - -bump_last_packet_id(State = #state{last_packet_id = Id}) -> - State#state{last_packet_id = next_packet_id(Id)}. - --spec next_packet_id(packet_id()) -> packet_id(). -next_packet_id(?MAX_PACKET_ID) -> - 1; -next_packet_id(Id) -> - Id + 1. - -%%-------------------------------------------------------------------- -%% ReasonCode Name - -reason_code_name(I, Ver) when Ver >= ?MQTT_PROTO_V5 -> - reason_code_name(I); -reason_code_name(0, _Ver) -> connection_accepted; -reason_code_name(1, _Ver) -> unacceptable_protocol_version; -reason_code_name(2, _Ver) -> client_identifier_not_valid; -reason_code_name(3, _Ver) -> server_unavaliable; -reason_code_name(4, _Ver) -> malformed_username_or_password; -reason_code_name(5, _Ver) -> unauthorized_client; -reason_code_name(_, _Ver) -> unknown_error. - -reason_code_name(16#00) -> success; -reason_code_name(16#01) -> granted_qos1; -reason_code_name(16#02) -> granted_qos2; -reason_code_name(16#04) -> disconnect_with_will_message; -reason_code_name(16#10) -> no_matching_subscribers; -reason_code_name(16#11) -> no_subscription_existed; -reason_code_name(16#18) -> continue_authentication; -reason_code_name(16#19) -> re_authenticate; -reason_code_name(16#80) -> unspecified_error; -reason_code_name(16#81) -> malformed_Packet; -reason_code_name(16#82) -> protocol_error; -reason_code_name(16#83) -> implementation_specific_error; -reason_code_name(16#84) -> unsupported_protocol_version; -reason_code_name(16#85) -> client_identifier_not_valid; -reason_code_name(16#86) -> bad_username_or_password; -reason_code_name(16#87) -> not_authorized; -reason_code_name(16#88) -> server_unavailable; -reason_code_name(16#89) -> server_busy; -reason_code_name(16#8A) -> banned; -reason_code_name(16#8B) -> server_shutting_down; -reason_code_name(16#8C) -> bad_authentication_method; -reason_code_name(16#8D) -> keepalive_timeout; -reason_code_name(16#8E) -> session_taken_over; -reason_code_name(16#8F) -> topic_filter_invalid; -reason_code_name(16#90) -> topic_name_invalid; -reason_code_name(16#91) -> packet_identifier_inuse; -reason_code_name(16#92) -> packet_identifier_not_found; -reason_code_name(16#93) -> receive_maximum_exceeded; -reason_code_name(16#94) -> topic_alias_invalid; -reason_code_name(16#95) -> packet_too_large; -reason_code_name(16#96) -> message_rate_too_high; -reason_code_name(16#97) -> quota_exceeded; -reason_code_name(16#98) -> administrative_action; -reason_code_name(16#99) -> payload_format_invalid; -reason_code_name(16#9A) -> retain_not_supported; -reason_code_name(16#9B) -> qos_not_supported; -reason_code_name(16#9C) -> use_another_server; -reason_code_name(16#9D) -> server_moved; -reason_code_name(16#9E) -> shared_subscriptions_not_supported; -reason_code_name(16#9F) -> connection_rate_exceeded; -reason_code_name(16#A0) -> maximum_connect_time; -reason_code_name(16#A1) -> subscription_identifiers_not_supported; -reason_code_name(16#A2) -> wildcard_subscriptions_not_supported; -reason_code_name(_Code) -> unknown_error. diff --git a/apps/iot/src/emqtt/emqtt_frame.erl b/apps/iot/src/emqtt/emqtt_frame.erl deleted file mode 100644 index 96393f3..0000000 --- a/apps/iot/src/emqtt/emqtt_frame.erl +++ /dev/null @@ -1,738 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqtt_frame). - --include("emqtt.hrl"). - --export([initial_parse_state/0, initial_parse_state/1]). - --export([parse/1, parse/2, serialize_fun/0, serialize_fun/1, serialize/1, serialize/2 ]). - --export_type([options/0, parse_state/0, parse_result/0, serialize_fun/0]). - --type(version() :: ?MQTT_PROTO_V3 - | ?MQTT_PROTO_V4 - | ?MQTT_PROTO_V5). - --type(options() :: #{strict_mode => boolean(), - max_size => 1..?MAX_PACKET_SIZE, - version => version()}). - --opaque(parse_state() :: {none, options()} | cont_fun()). - --opaque(parse_result() :: {more, cont_fun()} - | {ok, #mqtt_packet{}, binary(), parse_state()}). - --type(cont_fun() :: fun((binary()) -> parse_result())). - --type(serialize_fun() :: fun((emqx_types:packet()) -> iodata())). - --define(none(Options), {none, Options}). - --define(DEFAULT_OPTIONS, - #{strict_mode => false, - max_size => ?MAX_PACKET_SIZE, - version => ?MQTT_PROTO_V4 - }). - -%%-------------------------------------------------------------------- -%% Init Parse State -%%-------------------------------------------------------------------- - --spec(initial_parse_state() -> {none, options()}). -initial_parse_state() -> - initial_parse_state(#{}). - --spec(initial_parse_state(options()) -> {none, options()}). -initial_parse_state(Options) when is_map(Options) -> - ?none(merge_opts(Options)). - -%% @pivate -merge_opts(Options) -> - maps:merge(?DEFAULT_OPTIONS, Options). - -%%-------------------------------------------------------------------- -%% Parse MQTT Frame -%%-------------------------------------------------------------------- - --spec(parse(binary()) -> parse_result()). -parse(Bin) -> - parse(Bin, initial_parse_state()). - --spec(parse(binary(), parse_state()) -> parse_result()). -parse(<<>>, {none, Options}) -> - {more, fun(Bin) -> parse(Bin, {none, Options}) end}; -parse(<>, - {none, Options = #{strict_mode := StrictMode}}) -> - %% Validate header if strict mode. - StrictMode andalso validate_header(Type, Dup, QoS, Retain), - Header = #mqtt_packet_header{type = Type, - dup = bool(Dup), - qos = QoS, - retain = bool(Retain) - }, - Header1 = case fixqos(Type, QoS) of - QoS -> Header; - FixedQoS -> Header#mqtt_packet_header{qos = FixedQoS} - end, - parse_remaining_len(Rest, Header1, Options); -parse(Bin, Cont) when is_binary(Bin), is_function(Cont) -> - Cont(Bin). - -parse_remaining_len(<<>>, Header, Options) -> - {more, fun(Bin) -> parse_remaining_len(Bin, Header, Options) end}; -parse_remaining_len(Rest, Header, Options) -> - parse_remaining_len(Rest, Header, 1, 0, Options). - -parse_remaining_len(_Bin, _Header, _Multiplier, Length, #{max_size := MaxSize}) when Length > MaxSize -> - error(frame_too_large); -parse_remaining_len(<<>>, Header, Multiplier, Length, Options) -> - {more, fun(Bin) -> parse_remaining_len(Bin, Header, Multiplier, Length, Options) end}; -%% Match DISCONNECT without payload -parse_remaining_len(<<0:8, Rest/binary>>, Header = #mqtt_packet_header{type = ?DISCONNECT}, 1, 0, Options) -> - Packet = packet(Header, #mqtt_packet_disconnect{reason_code = ?RC_SUCCESS}), - {ok, Packet, Rest, ?none(Options)}; -%% Match PINGREQ. -parse_remaining_len(<<0:8, Rest/binary>>, Header, 1, 0, Options) -> - parse_frame(Rest, Header, 0, Options); -%% Match PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK... -parse_remaining_len(<<0:1, 2:7, Rest/binary>>, Header, 1, 0, Options) -> - parse_frame(Rest, Header, 2, Options); -parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Options) -> - parse_remaining_len(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier, Options); -parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value, - Options = #{max_size := MaxSize}) -> - FrameLen = Value + Len * Multiplier, - if - FrameLen > MaxSize -> - error(frame_too_large); - true -> - parse_frame(Rest, Header, FrameLen, Options) - end. - -parse_frame(Bin, Header, 0, Options) -> - {ok, packet(Header), Bin, ?none(Options)}; - -parse_frame(Bin, Header, Length, Options) -> - case Bin of - <> -> - case parse_packet(Header, FrameBin, Options) of - {Variable, Payload} -> - {ok, packet(Header, Variable, Payload), Rest, ?none(Options)}; - Variable = #mqtt_packet_connect{proto_ver = Ver} -> - {ok, packet(Header, Variable), Rest, ?none(Options#{version := Ver})}; - Variable -> - {ok, packet(Header, Variable), Rest, ?none(Options)} - end; - TooShortBin -> - {more, fun(BinMore) -> - parse_frame(<>, Header, Length, Options) - end} - end. - --compile({inline, [packet/1, packet/2, packet/3]}). -packet(Header) -> - #mqtt_packet{header = Header}. -packet(Header, Variable) -> - #mqtt_packet{header = Header, variable = Variable}. -packet(Header, Variable, Payload) -> - #mqtt_packet{header = Header, variable = Variable, payload = Payload}. - -parse_packet(#mqtt_packet_header{type = ?CONNECT}, FrameBin, _Options) -> - {ProtoName, Rest} = parse_utf8_string(FrameBin), - <> = Rest, - % Note: Crash when reserved flag doesn't equal to 0, there is no strict - % compliance with the MQTT5.0. - <> = Rest1, - - {Properties, Rest3} = parse_properties(Rest2, ProtoVer), - {ClientId, Rest4} = parse_utf8_string(Rest3), - ConnPacket = #mqtt_packet_connect{proto_name = ProtoName, - proto_ver = ProtoVer, - is_bridge = (BridgeTag =:= 8), - clean_start = bool(CleanStart), - will_flag = bool(WillFlag), - will_qos = WillQoS, - will_retain = bool(WillRetain), - keepalive = KeepAlive, - properties = Properties, - clientid = ClientId - }, - {ConnPacket1, Rest5} = parse_will_message(ConnPacket, Rest4), - {Username, Rest6} = parse_utf8_string(Rest5, bool(UsernameFlag)), - {Passsword, <<>>} = parse_utf8_string(Rest6, bool(PasswordFlag)), - ConnPacket1#mqtt_packet_connect{username = Username, password = Passsword}; - -parse_packet(#mqtt_packet_header{type = ?CONNACK}, - <>, #{version := Ver}) -> - {Properties, <<>>} = parse_properties(Rest, Ver), - #mqtt_packet_connack{ack_flags = AckFlags, - reason_code = ReasonCode, - properties = Properties - }; - -parse_packet(#mqtt_packet_header{type = ?PUBLISH, qos = QoS}, Bin, - #{strict_mode := StrictMode, version := Ver}) -> - {TopicName, Rest} = parse_utf8_string(Bin), - {PacketId, Rest1} = case QoS of - ?QOS_0 -> {undefined, Rest}; - _ -> parse_packet_id(Rest) - end, - (PacketId =/= undefined) andalso - StrictMode andalso validate_packet_id(PacketId), - {Properties, Payload} = parse_properties(Rest1, Ver), - Publish = #mqtt_packet_publish{topic_name = TopicName, - packet_id = PacketId, - properties = Properties - }, - {Publish, Payload}; - -parse_packet(#mqtt_packet_header{type = PubAck}, <>, #{strict_mode := StrictMode}) - when ?PUBACK =< PubAck, PubAck =< ?PUBCOMP -> - StrictMode andalso validate_packet_id(PacketId), - #mqtt_packet_puback{packet_id = PacketId, reason_code = 0}; - -parse_packet(#mqtt_packet_header{type = PubAck}, <>, - #{strict_mode := StrictMode, version := Ver = ?MQTT_PROTO_V5}) - when ?PUBACK =< PubAck, PubAck =< ?PUBCOMP -> - StrictMode andalso validate_packet_id(PacketId), - {Properties, <<>>} = parse_properties(Rest, Ver), - #mqtt_packet_puback{packet_id = PacketId, - reason_code = ReasonCode, - properties = Properties - }; - -parse_packet(#mqtt_packet_header{type = ?SUBSCRIBE}, <>, - #{strict_mode := StrictMode, version := Ver}) -> - StrictMode andalso validate_packet_id(PacketId), - {Properties, Rest1} = parse_properties(Rest, Ver), - TopicFilters = parse_topic_filters(subscribe, Rest1), - ok = validate_subqos([QoS || {_, #{qos := QoS}} <- TopicFilters]), - #mqtt_packet_subscribe{packet_id = PacketId, - properties = Properties, - topic_filters = TopicFilters - }; - -parse_packet(#mqtt_packet_header{type = ?SUBACK}, <>, - #{strict_mode := StrictMode, version := Ver}) -> - StrictMode andalso validate_packet_id(PacketId), - {Properties, Rest1} = parse_properties(Rest, Ver), - ReasonCodes = parse_reason_codes(Rest1), - #mqtt_packet_suback{packet_id = PacketId, - properties = Properties, - reason_codes = ReasonCodes - }; - -parse_packet(#mqtt_packet_header{type = ?UNSUBSCRIBE}, <>, - #{strict_mode := StrictMode, version := Ver}) -> - StrictMode andalso validate_packet_id(PacketId), - {Properties, Rest1} = parse_properties(Rest, Ver), - TopicFilters = parse_topic_filters(unsubscribe, Rest1), - #mqtt_packet_unsubscribe{packet_id = PacketId, - properties = Properties, - topic_filters = TopicFilters - }; - -parse_packet(#mqtt_packet_header{type = ?UNSUBACK}, <>, - #{strict_mode := StrictMode}) -> - StrictMode andalso validate_packet_id(PacketId), - #mqtt_packet_unsuback{packet_id = PacketId}; - -parse_packet(#mqtt_packet_header{type = ?UNSUBACK}, <>, - #{strict_mode := StrictMode, version := Ver}) -> - StrictMode andalso validate_packet_id(PacketId), - {Properties, Rest1} = parse_properties(Rest, Ver), - ReasonCodes = parse_reason_codes(Rest1), - #mqtt_packet_unsuback{packet_id = PacketId, - properties = Properties, - reason_codes = ReasonCodes - }; - -parse_packet(#mqtt_packet_header{type = ?DISCONNECT}, <>, - #{version := ?MQTT_PROTO_V5}) -> - {Properties, <<>>} = parse_properties(Rest, ?MQTT_PROTO_V5), - #mqtt_packet_disconnect{reason_code = ReasonCode, - properties = Properties - }; - -parse_packet(#mqtt_packet_header{type = ?AUTH}, <>, - #{version := ?MQTT_PROTO_V5}) -> - {Properties, <<>>} = parse_properties(Rest, ?MQTT_PROTO_V5), - #mqtt_packet_auth{reason_code = ReasonCode, properties = Properties}. - -parse_will_message(Packet = #mqtt_packet_connect{will_flag = true, - proto_ver = Ver}, Bin) -> - {Props, Rest} = parse_properties(Bin, Ver), - {Topic, Rest1} = parse_utf8_string(Rest), - {Payload, Rest2} = parse_binary_data(Rest1), - {Packet#mqtt_packet_connect{will_props = Props, - will_topic = Topic, - will_payload = Payload - }, Rest2}; -parse_will_message(Packet, Bin) -> {Packet, Bin}. - --compile({inline, [parse_packet_id/1]}). -parse_packet_id(<>) -> - {PacketId, Rest}. - -parse_properties(Bin, Ver) when Ver =/= ?MQTT_PROTO_V5 -> - {undefined, Bin}; -%% TODO: version mess? -parse_properties(<<>>, ?MQTT_PROTO_V5) -> - {#{}, <<>>}; -parse_properties(<<0, Rest/binary>>, ?MQTT_PROTO_V5) -> - {#{}, Rest}; -parse_properties(Bin, ?MQTT_PROTO_V5) -> - {Len, Rest} = parse_variable_byte_integer(Bin), - <> = Rest, - {parse_property(PropsBin, #{}), Rest1}. - -parse_property(<<>>, Props) -> - Props; -parse_property(<<16#01, Val, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Payload-Format-Indicator' => Val}); -parse_property(<<16#02, Val:32/big, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Message-Expiry-Interval' => Val}); -parse_property(<<16#03, Bin/binary>>, Props) -> - {Val, Rest} = parse_utf8_string(Bin), - parse_property(Rest, Props#{'Content-Type' => Val}); -parse_property(<<16#08, Bin/binary>>, Props) -> - {Val, Rest} = parse_utf8_string(Bin), - parse_property(Rest, Props#{'Response-Topic' => Val}); -parse_property(<<16#09, Len:16/big, Val:Len/binary, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Correlation-Data' => Val}); -parse_property(<<16#0B, Bin/binary>>, Props) -> - {Val, Rest} = parse_variable_byte_integer(Bin), - parse_property(Rest, Props#{'Subscription-Identifier' => Val}); -parse_property(<<16#11, Val:32/big, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Session-Expiry-Interval' => Val}); -parse_property(<<16#12, Bin/binary>>, Props) -> - {Val, Rest} = parse_utf8_string(Bin), - parse_property(Rest, Props#{'Assigned-Client-Identifier' => Val}); -parse_property(<<16#13, Val:16, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Server-Keep-Alive' => Val}); -parse_property(<<16#15, Bin/binary>>, Props) -> - {Val, Rest} = parse_utf8_string(Bin), - parse_property(Rest, Props#{'Authentication-Method' => Val}); -parse_property(<<16#16, Len:16/big, Val:Len/binary, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Authentication-Data' => Val}); -parse_property(<<16#17, Val, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Request-Problem-Information' => Val}); -parse_property(<<16#18, Val:32, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Will-Delay-Interval' => Val}); -parse_property(<<16#19, Val, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Request-Response-Information' => Val}); -parse_property(<<16#1A, Bin/binary>>, Props) -> - {Val, Rest} = parse_utf8_string(Bin), - parse_property(Rest, Props#{'Response-Information' => Val}); -parse_property(<<16#1C, Bin/binary>>, Props) -> - {Val, Rest} = parse_utf8_string(Bin), - parse_property(Rest, Props#{'Server-Reference' => Val}); -parse_property(<<16#1F, Bin/binary>>, Props) -> - {Val, Rest} = parse_utf8_string(Bin), - parse_property(Rest, Props#{'Reason-String' => Val}); -parse_property(<<16#21, Val:16/big, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Receive-Maximum' => Val}); -parse_property(<<16#22, Val:16/big, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Topic-Alias-Maximum' => Val}); -parse_property(<<16#23, Val:16/big, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Topic-Alias' => Val}); -parse_property(<<16#24, Val, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Maximum-QoS' => Val}); -parse_property(<<16#25, Val, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Retain-Available' => Val}); -parse_property(<<16#26, Bin/binary>>, Props) -> - {Pair, Rest} = parse_utf8_pair(Bin), - case maps:find('User-Property', Props) of - {ok, UserProps} -> - UserProps1 = lists:append(UserProps, [Pair]), - parse_property(Rest, Props#{'User-Property' := UserProps1}); - error -> - parse_property(Rest, Props#{'User-Property' => [Pair]}) - end; -parse_property(<<16#27, Val:32, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Maximum-Packet-Size' => Val}); -parse_property(<<16#28, Val, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Wildcard-Subscription-Available' => Val}); -parse_property(<<16#29, Val, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Subscription-Identifier-Available' => Val}); -parse_property(<<16#2A, Val, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Shared-Subscription-Available' => Val}). - -parse_variable_byte_integer(Bin) -> - parse_variable_byte_integer(Bin, 1, 0). -parse_variable_byte_integer(<<1:1, Len:7, Rest/binary>>, Multiplier, Value) -> - parse_variable_byte_integer(Rest, Multiplier * ?HIGHBIT, Value + Len * Multiplier); -parse_variable_byte_integer(<<0:1, Len:7, Rest/binary>>, Multiplier, Value) -> - {Value + Len * Multiplier, Rest}. - -parse_topic_filters(subscribe, Bin) -> - [{Topic, #{rh => Rh, rap => Rap, nl => Nl, qos => QoS}} - || <> <= Bin]; - -parse_topic_filters(unsubscribe, Bin) -> - [Topic || <> <= Bin]. - -parse_reason_codes(Bin) -> - [Code || <> <= Bin]. - -parse_utf8_pair(<>) -> - {{Key, Val}, Rest}. - -parse_utf8_string(Bin, false) -> - {undefined, Bin}; -parse_utf8_string(Bin, true) -> - parse_utf8_string(Bin). - -parse_utf8_string(<>) -> - {Str, Rest}. - -parse_binary_data(<>) -> - {Data, Rest}. - -%%-------------------------------------------------------------------- -%% Serialize MQTT Packet -%%-------------------------------------------------------------------- - -serialize_fun() -> serialize_fun(?DEFAULT_OPTIONS). - -serialize_fun(#mqtt_packet_connect{proto_ver = ProtoVer, properties = ConnProps}) -> - MaxSize = get_property('Maximum-Packet-Size', ConnProps, ?MAX_PACKET_SIZE), - serialize_fun(#{version => ProtoVer, max_size => MaxSize}); - -serialize_fun(#{version := Ver, max_size := MaxSize}) -> - fun(Packet) -> - IoData = serialize(Packet, Ver), - case is_too_large(IoData, MaxSize) of - true -> <<>>; - false -> IoData - end - end. - --spec(serialize(#mqtt_packet{}) -> iodata()). -serialize(Packet) -> serialize(Packet, ?MQTT_PROTO_V4). - --spec(serialize(#mqtt_packet{}, version()) -> iodata()). -serialize(#mqtt_packet{header = Header, - variable = Variable, - payload = Payload}, Ver) -> - serialize(Header, serialize_variable(Variable, Ver), serialize_payload(Payload)). - -serialize(#mqtt_packet_header{type = Type, - dup = Dup, - qos = QoS, - retain = Retain - }, VariableBin, PayloadBin) - when ?CONNECT =< Type andalso Type =< ?AUTH -> - Len = iolist_size(VariableBin) + iolist_size(PayloadBin), - [<>, - serialize_remaining_len(Len), VariableBin, PayloadBin]. - -serialize_variable(#mqtt_packet_connect{ - proto_name = ProtoName, - proto_ver = ProtoVer, - is_bridge = IsBridge, - clean_start = CleanStart, - will_flag = WillFlag, - will_qos = WillQoS, - will_retain = WillRetain, - keepalive = KeepAlive, - properties = Properties, - clientid = ClientId, - will_props = WillProps, - will_topic = WillTopic, - will_payload = WillPayload, - username = Username, - password = Password}, _Ver) -> - [serialize_binary_data(ProtoName), - <<(case IsBridge of - true -> 16#80 + ProtoVer; - false -> ProtoVer - end):8, - (flag(Username)):1, - (flag(Password)):1, - (flag(WillRetain)):1, - WillQoS:2, - (flag(WillFlag)):1, - (flag(CleanStart)):1, - 0:1, - KeepAlive:16/big-unsigned-integer>>, - serialize_properties(Properties, ProtoVer), - serialize_utf8_string(ClientId), - case WillFlag of - true -> [serialize_properties(WillProps, ProtoVer), - serialize_utf8_string(WillTopic), - serialize_binary_data(WillPayload)]; - false -> <<>> - end, - serialize_utf8_string(Username, true), - serialize_utf8_string(Password, true)]; - -serialize_variable(#mqtt_packet_connack{ack_flags = AckFlags, - reason_code = ReasonCode, - properties = Properties}, Ver) -> - [AckFlags, ReasonCode, serialize_properties(Properties, Ver)]; - -serialize_variable(#mqtt_packet_publish{topic_name = TopicName, - packet_id = PacketId, - properties = Properties}, Ver) -> - [serialize_utf8_string(TopicName), - if - PacketId =:= undefined -> <<>>; - true -> <> - end, - serialize_properties(Properties, Ver)]; - -serialize_variable(#mqtt_packet_puback{packet_id = PacketId}, Ver) - when Ver == ?MQTT_PROTO_V3; Ver == ?MQTT_PROTO_V4 -> - <>; -serialize_variable(#mqtt_packet_puback{packet_id = PacketId, - reason_code = ReasonCode, - properties = Properties - }, - Ver = ?MQTT_PROTO_V5) -> - [<>, ReasonCode, - serialize_properties(Properties, Ver)]; - -serialize_variable(#mqtt_packet_subscribe{packet_id = PacketId, - properties = Properties, - topic_filters = TopicFilters}, Ver) -> - [<>, serialize_properties(Properties, Ver), - serialize_topic_filters(subscribe, TopicFilters, Ver)]; - -serialize_variable(#mqtt_packet_suback{packet_id = PacketId, - properties = Properties, - reason_codes = ReasonCodes}, Ver) -> - [<>, serialize_properties(Properties, Ver), - serialize_reason_codes(ReasonCodes)]; - -serialize_variable(#mqtt_packet_unsubscribe{packet_id = PacketId, - properties = Properties, - topic_filters = TopicFilters}, Ver) -> - [<>, serialize_properties(Properties, Ver), - serialize_topic_filters(unsubscribe, TopicFilters, Ver)]; - -serialize_variable(#mqtt_packet_unsuback{packet_id = PacketId, - properties = Properties, - reason_codes = ReasonCodes}, Ver) -> - [<>, serialize_properties(Properties, Ver), - serialize_reason_codes(ReasonCodes)]; - -serialize_variable(#mqtt_packet_disconnect{}, Ver) - when Ver == ?MQTT_PROTO_V3; Ver == ?MQTT_PROTO_V4 -> - <<>>; - -serialize_variable(#mqtt_packet_disconnect{reason_code = ReasonCode, - properties = Properties}, - Ver = ?MQTT_PROTO_V5) -> - [ReasonCode, serialize_properties(Properties, Ver)]; -serialize_variable(#mqtt_packet_disconnect{}, _Ver) -> - <<>>; - -serialize_variable(#mqtt_packet_auth{reason_code = ReasonCode, - properties = Properties}, - Ver = ?MQTT_PROTO_V5) -> - [ReasonCode, serialize_properties(Properties, Ver)]; - -serialize_variable(PacketId, ?MQTT_PROTO_V3) when is_integer(PacketId) -> - <>; -serialize_variable(PacketId, ?MQTT_PROTO_V4) when is_integer(PacketId) -> - <>; -serialize_variable(undefined, _Ver) -> - <<>>. - -serialize_payload(undefined) -> <<>>; -serialize_payload(Bin) -> Bin. - -serialize_properties(_Props, Ver) when Ver =/= ?MQTT_PROTO_V5 -> - <<>>; -serialize_properties(Props, ?MQTT_PROTO_V5) -> - serialize_properties(Props). - -serialize_properties(undefined) -> - <<0>>; -serialize_properties(Props) when map_size(Props) == 0 -> - <<0>>; -serialize_properties(Props) when is_map(Props) -> - Bin = << <<(serialize_property(Prop, Val))/binary>> || {Prop, Val} <- maps:to_list(Props) >>, - [serialize_variable_byte_integer(byte_size(Bin)), Bin]. - -serialize_property(_, undefined) -> - <<>>; -serialize_property('Payload-Format-Indicator', Val) -> - <<16#01, Val>>; -serialize_property('Message-Expiry-Interval', Val) -> - <<16#02, Val:32/big>>; -serialize_property('Content-Type', Val) -> - <<16#03, (serialize_utf8_string(Val))/binary>>; -serialize_property('Response-Topic', Val) -> - <<16#08, (serialize_utf8_string(Val))/binary>>; -serialize_property('Correlation-Data', Val) -> - <<16#09, (byte_size(Val)):16, Val/binary>>; -serialize_property('Subscription-Identifier', Val) -> - <<16#0B, (serialize_variable_byte_integer(Val))/binary>>; -serialize_property('Session-Expiry-Interval', Val) -> - <<16#11, Val:32/big>>; -serialize_property('Assigned-Client-Identifier', Val) -> - <<16#12, (serialize_utf8_string(Val))/binary>>; -serialize_property('Server-Keep-Alive', Val) -> - <<16#13, Val:16/big>>; -serialize_property('Authentication-Method', Val) -> - <<16#15, (serialize_utf8_string(Val))/binary>>; -serialize_property('Authentication-Data', Val) -> - <<16#16, (iolist_size(Val)):16, Val/binary>>; -serialize_property('Request-Problem-Information', Val) -> - <<16#17, Val>>; -serialize_property('Will-Delay-Interval', Val) -> - <<16#18, Val:32/big>>; -serialize_property('Request-Response-Information', Val) -> - <<16#19, Val>>; -serialize_property('Response-Information', Val) -> - <<16#1A, (serialize_utf8_string(Val))/binary>>; -serialize_property('Server-Reference', Val) -> - <<16#1C, (serialize_utf8_string(Val))/binary>>; -serialize_property('Reason-String', Val) -> - <<16#1F, (serialize_utf8_string(Val))/binary>>; -serialize_property('Receive-Maximum', Val) -> - <<16#21, Val:16/big>>; -serialize_property('Topic-Alias-Maximum', Val) -> - <<16#22, Val:16/big>>; -serialize_property('Topic-Alias', Val) -> - <<16#23, Val:16/big>>; -serialize_property('Maximum-QoS', Val) -> - <<16#24, Val>>; -serialize_property('Retain-Available', Val) -> - <<16#25, Val>>; -serialize_property('User-Property', {Key, Val}) -> - <<16#26, (serialize_utf8_pair({Key, Val}))/binary>>; -serialize_property('User-Property', Props) when is_list(Props) -> - << <<(serialize_property('User-Property', {Key, Val}))/binary>> - || {Key, Val} <- Props >>; -serialize_property('Maximum-Packet-Size', Val) -> - <<16#27, Val:32/big>>; -serialize_property('Wildcard-Subscription-Available', Val) -> - <<16#28, Val>>; -serialize_property('Subscription-Identifier-Available', Val) -> - <<16#29, Val>>; -serialize_property('Shared-Subscription-Available', Val) -> - <<16#2A, Val>>. - -serialize_topic_filters(subscribe, TopicFilters, ?MQTT_PROTO_V5) -> - << <<(serialize_utf8_string(Topic))/binary, - ?RESERVED:2, Rh:2, (flag(Rap)):1,(flag(Nl)):1, QoS:2 >> - || {Topic, #{rh := Rh, rap := Rap, nl := Nl, qos := QoS}} <- TopicFilters >>; - -serialize_topic_filters(subscribe, TopicFilters, _Ver) -> - << <<(serialize_utf8_string(Topic))/binary, ?RESERVED:6, QoS:2>> - || {Topic, #{qos := QoS}} <- TopicFilters >>; - -serialize_topic_filters(unsubscribe, TopicFilters, _Ver) -> - << <<(serialize_utf8_string(Topic))/binary>> || Topic <- TopicFilters >>. - -serialize_reason_codes(undefined) -> - <<>>; -serialize_reason_codes(ReasonCodes) when is_list(ReasonCodes) -> - << <> || Code <- ReasonCodes >>. - -serialize_utf8_pair({Name, Value}) -> - << (serialize_utf8_string(Name))/binary, (serialize_utf8_string(Value))/binary >>. - -serialize_binary_data(Bin) -> - [<<(byte_size(Bin)):16/big-unsigned-integer>>, Bin]. - -serialize_utf8_string(undefined, false) -> - error(utf8_string_undefined); -serialize_utf8_string(undefined, true) -> - <<>>; -serialize_utf8_string(String, _AllowNull) -> - serialize_utf8_string(String). - -serialize_utf8_string(String) -> - StringBin = unicode:characters_to_binary(String), - Len = byte_size(StringBin), - true = (Len =< 16#ffff), - <>. - -serialize_remaining_len(I) -> - serialize_variable_byte_integer(I). - -serialize_variable_byte_integer(N) when N =< ?LOWBITS -> - <<0:1, N:7>>; -serialize_variable_byte_integer(N) -> - <<1:1, (N rem ?HIGHBIT):7, (serialize_variable_byte_integer(N div ?HIGHBIT))/binary>>. - -%% Is the frame too large? --spec(is_too_large(iodata(), pos_integer()) -> boolean()). -is_too_large(IoData, MaxSize) -> - iolist_size(IoData) >= MaxSize. - -get_property(_Key, undefined, Default) -> - Default; -get_property(Key, Props, Default) -> - maps:get(Key, Props, Default). - -%% Validate header if sctrict mode. See: mqtt-v5.0: 2.1.3 Flags -validate_header(?CONNECT, 0, 0, 0) -> ok; -validate_header(?CONNACK, 0, 0, 0) -> ok; -validate_header(?PUBLISH, 0, ?QOS_0, _) -> ok; -validate_header(?PUBLISH, _, ?QOS_1, _) -> ok; -validate_header(?PUBLISH, 0, ?QOS_2, _) -> ok; -validate_header(?PUBACK, 0, 0, 0) -> ok; -validate_header(?PUBREC, 0, 0, 0) -> ok; -validate_header(?PUBREL, 0, 1, 0) -> ok; -validate_header(?PUBCOMP, 0, 0, 0) -> ok; -validate_header(?SUBSCRIBE, 0, 1, 0) -> ok; -validate_header(?SUBACK, 0, 0, 0) -> ok; -validate_header(?UNSUBSCRIBE, 0, 1, 0) -> ok; -validate_header(?UNSUBACK, 0, 0, 0) -> ok; -validate_header(?PINGREQ, 0, 0, 0) -> ok; -validate_header(?PINGRESP, 0, 0, 0) -> ok; -validate_header(?DISCONNECT, 0, 0, 0) -> ok; -validate_header(?AUTH, 0, 0, 0) -> ok; -validate_header(_Type, _Dup, _QoS, _Rt) -> error(bad_frame_header). - --compile({inline, [validate_packet_id/1]}). -validate_packet_id(0) -> error(bad_packet_id); -validate_packet_id(_) -> ok. - -validate_subqos([3|_]) -> error(bad_subqos); -validate_subqos([_|T]) -> validate_subqos(T); -validate_subqos([]) -> ok. - -bool(0) -> false; -bool(1) -> true. - -flag(undefined) -> ?RESERVED; -flag(false) -> 0; -flag(true) -> 1; -flag(X) when is_integer(X) -> X; -flag(B) when is_binary(B) -> 1. - -fixqos(?PUBREL, 0) -> 1; -fixqos(?SUBSCRIBE, 0) -> 1; -fixqos(?UNSUBSCRIBE, 0) -> 1; -fixqos(_Type, QoS) -> QoS. - diff --git a/apps/iot/src/emqtt/emqtt_props.erl b/apps/iot/src/emqtt/emqtt_props.erl deleted file mode 100644 index a30f037..0000000 --- a/apps/iot/src/emqtt/emqtt_props.erl +++ /dev/null @@ -1,172 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - -%% @doc MQTT5 Properties --module(emqtt_props). - --include("emqtt.hrl"). - --export([id/1, name/1, filter/2, validate/1]). - -%% For tests --export([all/0]). - --type(prop_name() :: atom()). --type(prop_id() :: pos_integer()). - --define(PROPS_TABLE, - #{16#01 => {'Payload-Format-Indicator', 'Byte', [?PUBLISH]}, - 16#02 => {'Message-Expiry-Interval', 'Four-Byte-Integer', [?PUBLISH]}, - 16#03 => {'Content-Type', 'UTF8-Encoded-String', [?PUBLISH]}, - 16#08 => {'Response-Topic', 'UTF8-Encoded-String', [?PUBLISH]}, - 16#09 => {'Correlation-Data', 'Binary-Data', [?PUBLISH]}, - 16#0B => {'Subscription-Identifier', 'Variable-Byte-Integer', [?PUBLISH, ?SUBSCRIBE]}, - 16#11 => {'Session-Expiry-Interval', 'Four-Byte-Integer', [?CONNECT, ?CONNACK, ?DISCONNECT]}, - 16#12 => {'Assigned-Client-Identifier', 'UTF8-Encoded-String', [?CONNACK]}, - 16#13 => {'Server-Keep-Alive', 'Two-Byte-Integer', [?CONNACK]}, - 16#15 => {'Authentication-Method', 'UTF8-Encoded-String', [?CONNECT, ?CONNACK, ?AUTH]}, - 16#16 => {'Authentication-Data', 'Binary-Data', [?CONNECT, ?CONNACK, ?AUTH]}, - 16#17 => {'Request-Problem-Information', 'Byte', [?CONNECT]}, - 16#18 => {'Will-Delay-Interval', 'Four-Byte-Integer', ['WILL']}, - 16#19 => {'Request-Response-Information', 'Byte', [?CONNECT]}, - 16#1A => {'Response-Information', 'UTF8-Encoded-String', [?CONNACK]}, - 16#1C => {'Server-Reference', 'UTF8-Encoded-String', [?CONNACK, ?DISCONNECT]}, - 16#1F => {'Reason-String', 'UTF8-Encoded-String', [?CONNACK, ?DISCONNECT, ?PUBACK, - ?PUBREC, ?PUBREL, ?PUBCOMP, - ?SUBACK, ?UNSUBACK, ?AUTH]}, - 16#21 => {'Receive-Maximum', 'Two-Byte-Integer', [?CONNECT, ?CONNACK]}, - 16#22 => {'Topic-Alias-Maximum', 'Two-Byte-Integer', [?CONNECT, ?CONNACK]}, - 16#23 => {'Topic-Alias', 'Two-Byte-Integer', [?PUBLISH]}, - 16#24 => {'Maximum-QoS', 'Byte', [?CONNACK]}, - 16#25 => {'Retain-Available', 'Byte', [?CONNACK]}, - 16#26 => {'User-Property', 'UTF8-String-Pair', 'ALL'}, - 16#27 => {'Maximum-Packet-Size', 'Four-Byte-Integer', [?CONNECT, ?CONNACK]}, - 16#28 => {'Wildcard-Subscription-Available', 'Byte', [?CONNACK]}, - 16#29 => {'Subscription-Identifier-Available', 'Byte', [?CONNACK]}, - 16#2A => {'Shared-Subscription-Available', 'Byte', [?CONNACK]} - }). - --spec(id(prop_name()) -> prop_id()). -id('Payload-Format-Indicator') -> 16#01; -id('Message-Expiry-Interval') -> 16#02; -id('Content-Type') -> 16#03; -id('Response-Topic') -> 16#08; -id('Correlation-Data') -> 16#09; -id('Subscription-Identifier') -> 16#0B; -id('Session-Expiry-Interval') -> 16#11; -id('Assigned-Client-Identifier') -> 16#12; -id('Server-Keep-Alive') -> 16#13; -id('Authentication-Method') -> 16#15; -id('Authentication-Data') -> 16#16; -id('Request-Problem-Information') -> 16#17; -id('Will-Delay-Interval') -> 16#18; -id('Request-Response-Information') -> 16#19; -id('Response-Information') -> 16#1A; -id('Server-Reference') -> 16#1C; -id('Reason-String') -> 16#1F; -id('Receive-Maximum') -> 16#21; -id('Topic-Alias-Maximum') -> 16#22; -id('Topic-Alias') -> 16#23; -id('Maximum-QoS') -> 16#24; -id('Retain-Available') -> 16#25; -id('User-Property') -> 16#26; -id('Maximum-Packet-Size') -> 16#27; -id('Wildcard-Subscription-Available') -> 16#28; -id('Subscription-Identifier-Available') -> 16#29; -id('Shared-Subscription-Available') -> 16#2A; -id(Name) -> error({bad_property, Name}). - --spec(name(prop_id()) -> prop_name()). -name(16#01) -> 'Payload-Format-Indicator'; -name(16#02) -> 'Message-Expiry-Interval'; -name(16#03) -> 'Content-Type'; -name(16#08) -> 'Response-Topic'; -name(16#09) -> 'Correlation-Data'; -name(16#0B) -> 'Subscription-Identifier'; -name(16#11) -> 'Session-Expiry-Interval'; -name(16#12) -> 'Assigned-Client-Identifier'; -name(16#13) -> 'Server-Keep-Alive'; -name(16#15) -> 'Authentication-Method'; -name(16#16) -> 'Authentication-Data'; -name(16#17) -> 'Request-Problem-Information'; -name(16#18) -> 'Will-Delay-Interval'; -name(16#19) -> 'Request-Response-Information'; -name(16#1A) -> 'Response-Information'; -name(16#1C) -> 'Server-Reference'; -name(16#1F) -> 'Reason-String'; -name(16#21) -> 'Receive-Maximum'; -name(16#22) -> 'Topic-Alias-Maximum'; -name(16#23) -> 'Topic-Alias'; -name(16#24) -> 'Maximum-QoS'; -name(16#25) -> 'Retain-Available'; -name(16#26) -> 'User-Property'; -name(16#27) -> 'Maximum-Packet-Size'; -name(16#28) -> 'Wildcard-Subscription-Available'; -name(16#29) -> 'Subscription-Identifier-Available'; -name(16#2A) -> 'Shared-Subscription-Available'; -name(Id) -> error({unsupported_property, Id}). - -filter(PacketType, Props) when is_map(Props) -> - maps:from_list(filter(PacketType, maps:to_list(Props))); - -filter(PacketType, Props) when ?CONNECT =< PacketType, PacketType =< ?AUTH, is_list(Props) -> - Filter = fun(Name) -> - case maps:find(id(Name), ?PROPS_TABLE) of - {ok, {Name, _Type, 'ALL'}} -> - true; - {ok, {Name, _Type, AllowedTypes}} -> - lists:member(PacketType, AllowedTypes); - error -> false - end - end, - [Prop || Prop = {Name, _} <- Props, Filter(Name)]. - -validate(Props) when is_map(Props) -> - lists:foreach(fun validate_prop/1, maps:to_list(Props)). - -validate_prop(Prop = {Name, Val}) -> - case maps:find(id(Name), ?PROPS_TABLE) of - {ok, {Name, Type, _}} -> - validate_value(Type, Val) - orelse error(bad_property, Prop); - error -> - error({bad_property, Prop}) - end. - -validate_value('Byte', Val) -> - is_integer(Val) andalso Val =< 16#FF; -validate_value('Two-Byte-Integer', Val) -> - is_integer(Val) andalso 0 =< Val andalso Val =< 16#FFFF; -validate_value('Four-Byte-Integer', Val) -> - is_integer(Val) andalso 0 =< Val andalso Val =< 16#FFFFFFFF; -validate_value('Variable-Byte-Integer', Val) -> - is_integer(Val) andalso 0 =< Val andalso Val =< 16#7FFFFFFF; -validate_value('UTF8-String-Pair', {Name, Val}) -> - validate_value('UTF8-Encoded-String', Name) - andalso validate_value('UTF8-Encoded-String', Val); -validate_value('UTF8-String-Pair', Pairs) when is_list(Pairs) -> - lists:foldl(fun(Pair, OK) -> - OK andalso validate_value('UTF8-String-Pair', Pair) - end, true, Pairs); -validate_value('UTF8-Encoded-String', Val) -> - is_binary(Val); -validate_value('Binary-Data', Val) -> - is_binary(Val); -validate_value(_Type, _Val) -> false. - --spec(all() -> map()). -all() -> ?PROPS_TABLE. - diff --git a/apps/iot/src/emqtt/emqtt_sock.erl b/apps/iot/src/emqtt/emqtt_sock.erl deleted file mode 100644 index 05b234f..0000000 --- a/apps/iot/src/emqtt/emqtt_sock.erl +++ /dev/null @@ -1,120 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqtt_sock). - --export([connect/4, send/2, recv/2, close/1 ]). - --export([ sockname/1, setopts/2, getstat/2 ]). - --record(ssl_socket, { - tcp, - ssl -}). - --type(socket() :: inet:socket() | #ssl_socket{}). - --type(sockname() :: {inet:ip_address(), inet:port_number()}). - --type(option() :: gen_tcp:connect_option() | {ssl_opts, [ssl:ssl_option()]}). - --export_type([socket/0, option/0]). - --define(DEFAULT_TCP_OPTIONS, [binary, {packet, raw}, {active, false}, - {nodelay, true}]). - --spec(connect(inet:ip_address() | inet:hostname(), - inet:port_number(), [option()], timeout()) - -> {ok, socket()} | {error, term()}). -connect(Host, Port, SockOpts, Timeout) -> - TcpOpts = merge_opts(?DEFAULT_TCP_OPTIONS, - lists:keydelete(ssl_opts, 1, SockOpts)), - case gen_tcp:connect(Host, Port, TcpOpts, Timeout) of - {ok, Sock} -> - case lists:keyfind(ssl_opts, 1, SockOpts) of - {ssl_opts, SslOpts} -> - ssl_upgrade(Sock, SslOpts, Timeout); - false -> - {ok, Sock} - end; - {error, Reason} -> - {error, Reason} - end. - -ssl_upgrade(Sock, SslOpts, Timeout) -> - TlsVersions = proplists:get_value(versions, SslOpts, []), - Ciphers = proplists:get_value(ciphers, SslOpts, default_ciphers(TlsVersions)), - SslOpts2 = merge_opts(SslOpts, [{ciphers, Ciphers}]), - case ssl:connect(Sock, SslOpts2, Timeout) of - {ok, SslSock} -> - ok = ssl:controlling_process(SslSock, self()), - {ok, #ssl_socket{tcp = Sock, ssl = SslSock}}; - {error, Reason} -> - {error, Reason} - end. - --spec(send(socket(), iodata()) -> ok | {error, einval | closed}). -send(Sock, Data) when is_port(Sock) -> - gen_tcp:send(Sock, Data); -send(#ssl_socket{ssl = SslSock}, Data) -> - ssl:send(SslSock, Data). - --spec(recv(socket(), non_neg_integer()) - -> {ok, iodata()} | {error, closed | inet:posix()}). -recv(Sock, Length) when is_port(Sock) -> - gen_tcp:recv(Sock, Length); -recv(#ssl_socket{ssl = SslSock}, Length) -> - ssl:recv(SslSock, Length). - --spec(close(socket()) -> ok). -close(Sock) when is_port(Sock) -> - gen_tcp:close(Sock); -close(#ssl_socket{ssl = SslSock}) -> - ssl:close(SslSock). - --spec(setopts(socket(), [gen_tcp:option() | ssl:socketoption()]) -> ok). -setopts(Sock, Opts) when is_port(Sock) -> - inet:setopts(Sock, Opts); -setopts(#ssl_socket{ssl = SslSock}, Opts) -> - ssl:setopts(SslSock, Opts). - --spec(getstat(socket(), [atom()]) - -> {ok, [{atom(), integer()}]} | {error, term()}). -getstat(Sock, Options) when is_port(Sock) -> - inet:getstat(Sock, Options); -getstat(#ssl_socket{tcp = Sock}, Options) -> - inet:getstat(Sock, Options). - --spec(sockname(socket()) -> {ok, sockname()} | {error, term()}). -sockname(Sock) when is_port(Sock) -> - inet:sockname(Sock); -sockname(#ssl_socket{ssl = SslSock}) -> - ssl:sockname(SslSock). - --spec(merge_opts(list(), list()) -> list()). -merge_opts(Defaults, Options) -> - lists:foldl( - fun({Opt, Val}, Acc) -> - lists:keystore(Opt, 1, Acc, {Opt, Val}); - (Opt, Acc) -> - lists:usort([Opt | Acc]) - end, Defaults, Options). - -default_ciphers(TlsVersions) -> - lists:foldl( - fun(TlsVer, Ciphers) -> - Ciphers ++ ssl:cipher_suites(all, TlsVer) - end, [], TlsVersions). \ No newline at end of file diff --git a/apps/iot/src/iot_app.erl b/apps/iot/src/iot_app.erl index e49b895..f1776da 100644 --- a/apps/iot/src/iot_app.erl +++ b/apps/iot/src/iot_app.erl @@ -6,8 +6,6 @@ -behaviour(application). --include("iot.hrl"). - -export([start/2, stop/1]). start(_StartType, _StartArgs) -> diff --git a/apps/iot/src/iot_cipher_aes.erl b/apps/iot/src/iot_cipher_aes.erl deleted file mode 100644 index 4941134..0000000 --- a/apps/iot/src/iot_cipher_aes.erl +++ /dev/null @@ -1,33 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author aresei -%%% @copyright (C) 2018, -%%% @doc -%%% -%%% @end -%%% Created : 29. 六月 2018 09:30 -%%%------------------------------------------------------------------- --module(iot_cipher_aes). --author("aresei"). - -%% API --export([encrypt/2, decrypt/2]). --export([test/0]). - -test() -> - Aes = list_to_binary(iot_util:rand_bytes(32)), - Enc = encrypt(Aes, <<"sdfsff hesdfs sfsdfsdffffffffffxyz yes call me">>), - Data = decrypt(Aes, Enc), - - lager:debug("enc: ~p, size: ~p, data len is: ~p, data: ~p", [Enc, byte_size(Enc), byte_size(Data), Data]). - -%% 基于aes的加密算法, aes_256_cbc --spec encrypt(binary(), binary()) -> binary(). -encrypt(Key, PlainText) when is_binary(Key), is_binary(PlainText) -> - IV = binary:part(Key, {0, 16}), - crypto:crypto_one_time(aes_256_cbc, Key, IV, PlainText, [{encrypt, true}, {padding, pkcs_padding}]). - -%% 基于aes的解密算法 --spec decrypt(binary(), binary()) -> binary(). -decrypt(Key, CipherText) when is_binary(Key), is_binary(CipherText) -> - IV = binary:part(Key, {0, 16}), - crypto:crypto_one_time(aes_256_cbc, Key, IV, CipherText, [{encrypt, false}, {padding, pkcs_padding}]). \ No newline at end of file diff --git a/apps/iot/src/iot_cipher_rsa.erl b/apps/iot/src/iot_cipher_rsa.erl deleted file mode 100644 index b962bf0..0000000 --- a/apps/iot/src/iot_cipher_rsa.erl +++ /dev/null @@ -1,34 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author aresei -%%% @copyright (C) 2018, -%%% @doc -%%% 采用的RSA 2048 PKCS1 -%%% @end -%%% Created : 21. 六月 2018 09:51 -%%%------------------------------------------------------------------- --module(iot_cipher_rsa). --author("aresei"). - -%% API --export([encode/2, decode/2, private_encode/2]). - -%% 解密数据 -decode(Data, PrivateKey) when is_binary(Data), is_binary(PrivateKey) -> - [Pri] = public_key:pem_decode(PrivateKey), - PriKeyEntry = public_key:pem_entry_decode(Pri), - public_key:decrypt_private(Data, PriKeyEntry). - -%% 解密数据 -encode(Data, PublicKey) when is_map(Data), is_binary(PublicKey) -> - BinData = jiffy:encode(Data, [force_utf8]), - encode(BinData, PublicKey); - -encode(Data, PublicKey) when is_binary(Data), is_binary(PublicKey) -> - [Pub] = public_key:pem_decode(PublicKey), - PubKey = public_key:pem_entry_decode(Pub), - public_key:encrypt_public(Data, PubKey). - -private_encode(Data, PrivateKey) when is_binary(Data), is_binary(PrivateKey) -> - [Private] = public_key:pem_decode(PrivateKey), - PriKey = public_key:pem_entry_decode(Private), - public_key:encrypt_private(Data, PriKey). \ No newline at end of file diff --git a/config/sys-dev.config b/config/sys-dev.config index 702a458..07da233 100644 --- a/config/sys-dev.config +++ b/config/sys-dev.config @@ -48,12 +48,12 @@ {mysql_iot, [{size, 10}, {max_overflow, 20}, {worker_module, mysql}], [ - {host, {39, 98, 184, 67}}, + {host, "47.111.101.3"}, {port, 3306}, - {user, "nannonguser"}, + {user, "root"}, {connect_mode, synchronous}, {keep_alive, true}, - {password, "nannong@Fe7w"}, + {password, "r3a-7Qrh#3Q"}, {database, "nannong"}, {queries, [<<"set names utf8">>]} ] diff --git a/rebar.config b/rebar.config index 5eca0ed..31e3fe9 100644 --- a/rebar.config +++ b/rebar.config @@ -9,6 +9,7 @@ {mysql, ".*", {git, "https://github.com/mysql-otp/mysql-otp", {tag, "1.8.0"}}}, {eredis, ".*", {git, "https://github.com/wooga/eredis.git", {tag, "v1.2.0"}}}, {gpb, ".*", {git, "https://github.com/tomas-abrahamsson/gpb.git", {tag, "4.20.0"}}}, + {emqtt, ".*", {git, "https://gitea.s5s8.com/anlicheng/emqtt.git", {branch, "main"}}}, {parse_trans, ".*", {git, "https://github.com/uwiger/parse_trans", {tag, "3.0.0"}}}, {lager, ".*", {git,"https://github.com/erlang-lager/lager.git", {tag, "3.9.2"}}} ]}. diff --git a/rebar.lock b/rebar.lock index 395fa9e..b604e34 100644 --- a/rebar.lock +++ b/rebar.lock @@ -8,6 +8,10 @@ {git,"https://github.com/ninenines/cowlib", {ref,"cc04201c1d0e1d5603cd1cde037ab729b192634c"}}, 1}, + {<<"emqtt">>, + {git,"https://gitea.s5s8.com/anlicheng/emqtt.git", + {ref,"5111914a9b1b92b0b497f825c77bdd365e3989b0"}}, + 0}, {<<"eredis">>, {git,"https://github.com/wooga/eredis.git", {ref,"9ad91f149310a7d002cb966f62b7e2c3330abb04"}},