From 9b2f75f9d1b4e91da7e8317f3b3f22d26b072c43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=89=E7=A4=BC=E6=88=90?= Date: Mon, 17 Apr 2023 17:28:22 +0800 Subject: [PATCH] fix --- apps/iot/src/iot_config.erl | 2 +- apps/iot/src/iot_mqtt_publisher.erl | 2 ++ apps/iot/src/iot_mqtt_subscriber.erl | 22 +++++++++++----------- 3 files changed, 14 insertions(+), 12 deletions(-) diff --git a/apps/iot/src/iot_config.erl b/apps/iot/src/iot_config.erl index 660c5bb..e11b8a2 100644 --- a/apps/iot/src/iot_config.erl +++ b/apps/iot/src/iot_config.erl @@ -29,7 +29,7 @@ emqt_opts() -> {username, Username}, {password, Password}, {keepalive, Keepalive}, - {auto_ack, false}, + {auto_ack, true}, {proto_ver, v5}, {retry_interval, RetryInterval} ]. \ No newline at end of file diff --git a/apps/iot/src/iot_mqtt_publisher.erl b/apps/iot/src/iot_mqtt_publisher.erl index a3c9fef..a8bfb8f 100644 --- a/apps/iot/src/iot_mqtt_publisher.erl +++ b/apps/iot/src/iot_mqtt_publisher.erl @@ -66,6 +66,7 @@ init([]) -> case emqtt:start_link(Opts) of {ok, ConnPid} -> lager:debug("[iot_mqtt_publisher] connect success, pid: ~p", [ConnPid]), + {ok, _} = emqtt:connect(ConnPid), {ok, #state{conn_pid = ConnPid}}; ignore -> lager:debug("[iot_mqtt_publisher] connect emqx get ignore"), @@ -86,6 +87,7 @@ init([]) -> {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | {stop, Reason :: term(), NewState :: #state{}}). handle_call({publish, ReceiverPid, Topic, Message, Qos}, _From, State = #state{conn_pid = ConnPid, inflight = InFlight}) -> + lager:debug("call me publish"), case emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, Qos}, {retain, true}]) of {ok, PacketId} -> Ref = make_ref(), diff --git a/apps/iot/src/iot_mqtt_subscriber.erl b/apps/iot/src/iot_mqtt_subscriber.erl index fe08c76..7704662 100644 --- a/apps/iot/src/iot_mqtt_subscriber.erl +++ b/apps/iot/src/iot_mqtt_subscriber.erl @@ -75,16 +75,16 @@ init([]) -> Opts = iot_config:emqt_opts(), case emqtt:start_link(Opts) of {ok, ConnPid} -> - lager:debug("[iot_host] connect success, pid: ~p", [ConnPid]), + lager:debug("[iot_mqtt_subscriber] connect success, pid: ~p", [ConnPid]), %% 快速启动避免阻塞iot_host_sup的启动 erlang:start_timer(0, self(), subscribe_ticker), {ok, #state{conn_pid = ConnPid}}; ignore -> - lager:debug("[iot_host] connect emqx get ignore"), + lager:debug("[iot_mqtt_subscriber] connect emqx get ignore"), {stop, ignore}; {error, Reason} -> - lager:debug("[iot_host] connect emqx get error: ~p", [Reason]), + lager:debug("[iot_mqtt_subscriber] connect emqx get error: ~p", [Reason]), {stop, Reason} end. @@ -121,25 +121,25 @@ handle_info({timeout, _, subscribe_ticker}, State = #state{conn_pid = ConnPid}) %% 监听和host相关的全部事件 {ok, _} = emqtt:connect(ConnPid), Topics = [ - {<<"$share/server/register">>, 1} + {<<"$share/nodes//server/register">>, 1} ], SubscribeResult = emqtt:subscribe(ConnPid, Topics), - lager:debug("[iot_mqtt_share_subscriber] subscribe result is: ~p", [SubscribeResult]), + lager:debug("[iot_mqtt_subscriber] subscribe result is: ~p", [SubscribeResult]), {noreply, State#state{conn_pid = ConnPid}}; handle_info({disconnect, ReasonCode, Properties}, State = #state{}) -> - lager:debug("[iot_mqtt_share_subscriber] Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [ReasonCode, Properties]), + lager:debug("[iot_mqtt_subscriber] Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [ReasonCode, Properties]), {stop, disconnected, State}; handle_info({publish, Message = #{packet_id := _PacketId, payload := Payload}}, State = #state{conn_pid = _ConnPid}) -> - lager:debug("[iot_mqtt_share_subscriber] Recv a publish packet: ~p, payload: ~p", [Message, Payload]), + lager:debug("[iot_mqtt_subscriber] Recv a publish packet: ~p, payload: ~p", [Message, Payload]), {noreply, State}; handle_info({puback, Packet = #{packet_id := _PacketId}}, State = #state{}) -> - lager:debug("[iot_mqtt_share_subscriber] receive puback packet: ~p", [Packet]), + lager:debug("[iot_mqtt_subscriber] receive puback packet: ~p", [Packet]), {noreply, State}; handle_info(Info, State = #state{}) -> - lager:debug("[iot_mqtt_share_subscriber] get info: ~p", [Info]), + lager:debug("[iot_mqtt_subscriber] get info: ~p", [Info]), {noreply, State}. %% @private @@ -153,10 +153,10 @@ terminate(Reason, _State = #state{conn_pid = ConnPid}) when is_pid(ConnPid) -> {ok, _Props, _ReasonCode} = emqtt:unsubscribe(ConnPid, #{}, <<"hello">>), ok = emqtt:disconnect(ConnPid), ok = emqtt:stop(ConnPid), - lager:debug("[iot_host] terminate with reason: ~p", [Reason]), + lager:debug("[iot_mqtt_subscriber] terminate with reason: ~p", [Reason]), ok; terminate(Reason, _State) -> - lager:debug("[iot_host] terminate with reason: ~p", [Reason]), + lager:debug("[iot_mqtt_subscriber] terminate with reason: ~p", [Reason]), ok. %% @private