diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index 848c3e4..c77b57c 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -21,17 +21,14 @@ -define(SERVER, ?MODULE). -%% 消息的qos等级 --define(QOS, 1). - -record(state, { host :: #host{}, emqx_pid :: pid() }). -test(Id) when is_binary(Id) -> +test(Msg) when is_binary(Msg) -> Pid = get_pid(<<"1">>), - publish(Pid, <<"hello world">>). + publish(Pid, Msg). %%%=================================================================== %%% API @@ -110,7 +107,7 @@ init([Host = #host{host_id = HostId}]) -> {stop, Reason :: term(), NewState :: #state{}}). handle_call({publish, Message}, _From, State = #state{emqx_pid = ConnPid, host = #host{host_id = HostId}}) -> Topic = <<"/host/", HostId/binary, "/downstream">>, - Result = emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, ?QOS}]), + Result = emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, 2}]), {reply, Result, State}. %% @private @@ -133,7 +130,7 @@ handle_info({timeout, _, subscribe_ticker}, State = #state{host = #host{host_id %% 监听和host相关的全部事件 {ok, _} = emqtt:connect(ConnPid), Topics = [ - {<<"/host/", HostId/binary, "/upstream">>, ?QOS} + {<<"/host/", HostId/binary, "/upstream">>, 1} ], SubscribeResult = emqtt:subscribe(ConnPid, Topics), lager:debug("[iot_host] host_id: ~p, subscribe result is: ~p", [HostId, SubscribeResult]), @@ -144,12 +141,13 @@ handle_info({disconnect, ReasonCode, Properties}, State = #state{host = #host{ho lager:debug("[iot_host] host: ~p, Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [HostId, ReasonCode, Properties]), {stop, disconnected, State}; -handle_info({publish, Message = #{packet_id := PacketId, payload := Payload}}, State = #state{emqx_pid = ConnPid, host = #host{host_id = HostId}}) -> +handle_info({publish, Message = #{packet_id := _PacketId, payload := Payload}}, State = #state{emqx_pid = _ConnPid, host = #host{host_id = HostId}}) -> lager:debug("[iot_host] host: ~p, Recv a publish packet: ~p, payload: ~p", [HostId, Message, Payload]), % emqtt:pubrec(ConnPid, PacketId), {noreply, State}; -handle_info({puback, #{packet_id := PacketId, reason_code := ReasonCode}}, State = #state{host = #host{host_id = HostId}}) -> - lager:debug("[iot_host] host: ~p, receive puback packet_id: ~p, reason_code: ~p", [HostId, PacketId, ReasonCode]), +handle_info({puback, Msg = #{packet_id := PacketId, reason_code := ReasonCode}}, State = #state{emqx_pid = ConnPid, host = #host{host_id = HostId}}) -> + lager:debug("[iot_host] host: ~p, receive puback packet_id: ~p, reason_code: ~p, msg: ~p", [HostId, PacketId, ReasonCode, Msg]), + emqtt:pubrel(ConnPid, PacketId), {noreply, State}; handle_info(Info, State = #state{host = #host{host_id = HostId}}) -> diff --git a/apps/iot/src/iot_host_mocker.erl b/apps/iot/src/iot_host_mocker.erl index 5dadb28..26ff07d 100644 --- a/apps/iot/src/iot_host_mocker.erl +++ b/apps/iot/src/iot_host_mocker.erl @@ -20,9 +20,6 @@ -define(SERVER, ?MODULE). -%% 消息的qos等级 --define(QOS, 1). - -record(state, { host_id :: binary(), emqx_pid :: pid() @@ -73,16 +70,16 @@ init([HostId]) -> case emqtt:start_link(Opts) of {ok, ConnPid} -> - lager:debug("[iot_host] connect success, pid: ~p", [ConnPid]), - %% 快速启动避免阻塞iot_host_sup的启动 + lager:debug("[iot_host_mocker] connect success, pid: ~p", [ConnPid]), + %% 快速启动避免阻塞iot_host_mocker_sup的启动 erlang:start_timer(0, self(), subscribe_ticker), {ok, #state{host_id = HostId, emqx_pid = ConnPid}}; ignore -> - lager:debug("[iot_host] connect emqx get ignore"), + lager:debug("[iot_host_mocker] connect emqx get ignore"), {stop, ignore}; {error, Reason} -> - lager:debug("[iot_host] connect emqx get error: ~p", [Reason]), + lager:debug("[iot_host_mocker] connect emqx get error: ~p", [Reason]), {stop, Reason} end. @@ -98,7 +95,7 @@ init([HostId]) -> {stop, Reason :: term(), NewState :: #state{}}). handle_call({publish, Message}, _From, State = #state{emqx_pid = ConnPid, host_id = HostId}) -> Topic = <<"/host/", HostId/binary, "/upstream">>, - Result = emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, ?QOS}]), + Result = emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, 1}]), {reply, Result, State}. %% @private @@ -121,23 +118,33 @@ handle_info({timeout, _, subscribe_ticker}, State = #state{emqx_pid = ConnPid, h %% 监听和host相关的全部事件 {ok, _} = emqtt:connect(ConnPid), Topics = [ - {<<"/host/", HostId/binary, "/downstream">>, ?QOS} + {<<"/host/", HostId/binary, "/downstream">>, 2} ], SubscribeResult = emqtt:subscribe(ConnPid, Topics), - lager:debug("[iot_host] host_id: ~p, subscribe result is: ~p", [HostId, SubscribeResult]), + lager:debug("[iot_host_mocker] host_id: ~p, subscribe result is: ~p", [HostId, SubscribeResult]), {noreply, State#state{emqx_pid = ConnPid}}; handle_info({disconnect, ReasonCode, Properties}, State = #state{host_id = HostId}) -> - lager:debug("[iot_host] host: ~p, Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [HostId, ReasonCode, Properties]), + lager:debug("[iot_host_mocker] host: ~p, Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [HostId, ReasonCode, Properties]), {stop, disconnected, State}; -handle_info({publish, Message = #{packet_id := PacketId, payload := Payload}}, State = #state{emqx_pid = ConnPid, host_id = HostId}) -> - lager:debug("[iot_host] host: ~p, Recv a publish packet: ~p, payload: ~p", [HostId, Message, Payload]), - % emqtt:pubrec(ConnPid, PacketId), +%% 收到qos为2的包 +handle_info({publish, Message = #{packet_id := PacketId, payload := Payload, qos := 2}}, State = #state{emqx_pid = ConnPid, host_id = HostId}) -> + lager:debug("[iot_host_mocker] host: ~p, qos: 2, Recv a publish packet: ~p, payload: ~p", [HostId, Message, Payload]), + %% 回复收到的请求信息 + emqtt:pubrec(ConnPid, PacketId), + + {noreply, State}; + +handle_info({publish, Message = #{packet_id := PacketId, payload := Payload}}, State = #state{emqx_pid = _ConnPid, host_id = HostId}) -> + lager:debug("[iot_host_mocker] host: ~p, Recv a publish packet: ~p, payload: ~p, packet_id: ~p", [HostId, Message, Payload, PacketId]), + %% 回复收到的请求信息 + % emqtt:puback(ConnPid, PacketId), {noreply, State}; handle_info({puback, #{packet_id := PacketId, reason_code := ReasonCode}}, State = #state{host_id = HostId}) -> - lager:debug("[iot_host] host: ~p, receive puback packet_id: ~p, reason_code: ~p", [HostId, PacketId, ReasonCode]), + lager:debug("[iot_host_mocker] host: ~p, receive puback packet_id: ~p, reason_code: ~p", [HostId, PacketId, ReasonCode]), + {noreply, State}; handle_info(Info, State = #state{host_id = HostId}) -> @@ -157,7 +164,7 @@ terminate(_Reason, _State = #state{emqx_pid = ConnPid}) when is_pid(ConnPid) -> ok = emqtt:stop(ConnPid), ok; terminate(Reason, _State) -> - lager:debug("[iot_host] terminate with reason: ~p", [Reason]), + lager:debug("[iot_host_mocker] terminate with reason: ~p", [Reason]), ok. %% @private