From 4956e46035a568b6355b44f2db752b130eaf661a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=89=E7=A4=BC=E6=88=90?= Date: Mon, 17 Apr 2023 15:53:34 +0800 Subject: [PATCH] fix mqtt --- apps/iot/src/iot_host.erl | 7 +++++-- apps/iot/src/iot_host_mocker.erl | 12 +++++++----- apps/iot/src/iot_mock.erl | 2 +- config/vm.args | 2 +- 4 files changed, 14 insertions(+), 9 deletions(-) diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index c77b57c..6780066 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -77,6 +77,8 @@ init([Host = #host{host_id = HostId}]) -> {username, Username}, {password, Password}, {keepalive, Keepalive}, + {auto_ack, false}, + {proto_ver, v5}, {retry_interval, RetryInterval} ], @@ -107,7 +109,7 @@ init([Host = #host{host_id = HostId}]) -> {stop, Reason :: term(), NewState :: #state{}}). handle_call({publish, Message}, _From, State = #state{emqx_pid = ConnPid, host = #host{host_id = HostId}}) -> Topic = <<"/host/", HostId/binary, "/downstream">>, - Result = emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, 2}]), + Result = emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, 2}, {retain, true}]), {reply, Result, State}. %% @private @@ -161,10 +163,11 @@ handle_info(Info, State = #state{host = #host{host_id = HostId}}) -> %% with Reason. The return value is ignored. -spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), State :: #state{}) -> term()). -terminate(_Reason, _State = #state{emqx_pid = ConnPid}) when is_pid(ConnPid) -> +terminate(Reason, _State = #state{emqx_pid = ConnPid}) when is_pid(ConnPid) -> {ok, _Props, _ReasonCode} = emqtt:unsubscribe(ConnPid, #{}, <<"hello">>), ok = emqtt:disconnect(ConnPid), ok = emqtt:stop(ConnPid), + lager:debug("[iot_host] terminate with reason: ~p", [Reason]), ok; terminate(Reason, _State) -> lager:debug("[iot_host] terminate with reason: ~p", [Reason]), diff --git a/apps/iot/src/iot_host_mocker.erl b/apps/iot/src/iot_host_mocker.erl index 26ff07d..9ec9099 100644 --- a/apps/iot/src/iot_host_mocker.erl +++ b/apps/iot/src/iot_host_mocker.erl @@ -37,7 +37,7 @@ publish(Pid, Message) when is_pid(Pid), is_binary(Message) -> -spec(start_link(HostId :: binary()) -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). start_link(HostId) when is_binary(HostId) -> - gen_server:start_link(?MODULE, [HostId], []). + gen_server:start_link({local, ?MODULE}, ?MODULE, [HostId], []). %%%=================================================================== %%% gen_server callbacks @@ -65,6 +65,8 @@ init([HostId]) -> {username, Username}, {password, Password}, {keepalive, Keepalive}, + {proto_ver, v5}, + {auto_ack, true}, {retry_interval, RetryInterval} ], @@ -130,17 +132,17 @@ handle_info({disconnect, ReasonCode, Properties}, State = #state{host_id = HostI lager:debug("[iot_host_mocker] host: ~p, Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [HostId, ReasonCode, Properties]), {stop, disconnected, State}; %% 收到qos为2的包 -handle_info({publish, Message = #{packet_id := PacketId, payload := Payload, qos := 2}}, State = #state{emqx_pid = ConnPid, host_id = HostId}) -> +handle_info({publish, Message = #{packet_id := PacketId, payload := Payload, reason_code := ReasonCode, qos := 2}}, State = #state{emqx_pid = ConnPid, host_id = HostId}) -> lager:debug("[iot_host_mocker] host: ~p, qos: 2, Recv a publish packet: ~p, payload: ~p", [HostId, Message, Payload]), %% 回复收到的请求信息 - emqtt:pubrec(ConnPid, PacketId), + % emqtt:pubrec(ConnPid, PacketId, ReasonCode), {noreply, State}; -handle_info({publish, Message = #{packet_id := PacketId, payload := Payload}}, State = #state{emqx_pid = _ConnPid, host_id = HostId}) -> +handle_info({publish, Message = #{packet_id := PacketId, payload := Payload}}, State = #state{emqx_pid = ConnPid, host_id = HostId}) -> lager:debug("[iot_host_mocker] host: ~p, Recv a publish packet: ~p, payload: ~p, packet_id: ~p", [HostId, Message, Payload, PacketId]), %% 回复收到的请求信息 - % emqtt:puback(ConnPid, PacketId), + % emqtt:pubrec(ConnPid, PacketId), {noreply, State}; handle_info({puback, #{packet_id := PacketId, reason_code := ReasonCode}}, State = #state{host_id = HostId}) -> lager:debug("[iot_host_mocker] host: ~p, receive puback packet_id: ~p, reason_code: ~p", [HostId, PacketId, ReasonCode]), diff --git a/apps/iot/src/iot_mock.erl b/apps/iot/src/iot_mock.erl index 8b44b75..2357106 100644 --- a/apps/iot/src/iot_mock.erl +++ b/apps/iot/src/iot_mock.erl @@ -41,7 +41,7 @@ insert_hosts() -> }, host_model:add_host(Host) - end, lists:seq(1, 100)). + end, lists:seq(1, 1)). insert_services(HostId) -> lists:foreach(fun(Id0) -> diff --git a/config/vm.args b/config/vm.args index b636b0b..0ad9675 100644 --- a/config/vm.args +++ b/config/vm.args @@ -5,7 +5,7 @@ +K true +A30 --mnesia dir '"//usr/local/code/data/iot"' +-mnesia dir '"/usr/local/code/data/iot"' -mnesia dump_log_write_threshold 50000 -mnesia dc_dump_limit 40