From e7044534a58f26b0e9cd19273ebf5787a14d63f5 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Sat, 23 Dec 2023 15:28:58 +0800 Subject: [PATCH] fix --- apps/iot/src/consumer/iot_zd_consumer.erl | 6 +++--- apps/iot/src/endpoint/iot_zd_endpoint.erl | 9 ++++++--- apps/iot/src/postman/mqtt_postman.erl | 2 +- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/apps/iot/src/consumer/iot_zd_consumer.erl b/apps/iot/src/consumer/iot_zd_consumer.erl index 8d79d6d..eae3ad5 100644 --- a/apps/iot/src/consumer/iot_zd_consumer.erl +++ b/apps/iot/src/consumer/iot_zd_consumer.erl @@ -62,7 +62,7 @@ init([]) -> %% 启动日志记录器 {ok, LoggerPid} = iot_logger:start_link("directive_data"), - {ok, disconnected, #state{mqtt_props = Props, conn_pid = undefined, logger_pid = LoggerPid, is_connected = false}}. + {ok, #state{mqtt_props = Props, conn_pid = undefined, logger_pid = LoggerPid, is_connected = false}}. %% @private %% @doc Handling call messages @@ -97,7 +97,7 @@ handle_info({disconnect, ReasonCode, Properties}, State) -> {stop, disconnected, State}; %% 必须要做到消息的快速分发,数据的json反序列需要在host进程进行 handle_info({publish, #{packet_id := _PacketId, payload := Payload, qos := Qos, topic := Topic}}, State) -> - lager:debug("[iot_zd_consumer] Recv a topic: ~p, publish packet: ~p, qos: ~p", [Topic, Payload, Qos]), + lager:debug("[iot_zd_consumer] Recv a topic: ~p, publish packet: ~ts, qos: ~p", [Topic, Payload, Qos]), %% 将消息分发到对应的host进程去处理 {noreply, State}; handle_info({puback, Packet = #{packet_id := _PacketId}}, State = #state{}) -> @@ -119,7 +119,7 @@ handle_info({'EXIT', ConnPid, Reason}, State = #state{conn_pid = ConnPid}) -> lager:warning("[iot_zd_consumer] consumer exited with reason: ~p", [Reason]), erlang:start_timer(?RETRY_INTERVAL, self(), create_consumer), - {next_state, disconnected, State#state{conn_pid = undefined}}; + {noreply, State#state{conn_pid = undefined}}; handle_info(Info, State = #state{}) -> lager:debug("[iot_zd_consumer] get info: ~p", [Info]), diff --git a/apps/iot/src/endpoint/iot_zd_endpoint.erl b/apps/iot/src/endpoint/iot_zd_endpoint.erl index e089ed7..eeeaefd 100644 --- a/apps/iot/src/endpoint/iot_zd_endpoint.erl +++ b/apps/iot/src/endpoint/iot_zd_endpoint.erl @@ -235,6 +235,9 @@ do_post(PostmanPid, #north_data{id = Id, location_code = LocationCode, fields = <<"ts">> => Timestamp, <<"properties">> => Fields }, - Body = iolist_to_binary(jiffy:encode(Data, [force_utf8])), - PostmanPid ! {post, self(), #post_data{id = Id, location_code = LocationCode, body = Body}}, - ok. \ No newline at end of file + try + Body = iolist_to_binary(jiffy:encode(Data, [force_utf8])), + PostmanPid ! {post, self(), #post_data{id = Id, location_code = LocationCode, body = Body}} + catch _:_ -> + self() ! {ack, Id, <<"json error">>} + end. \ No newline at end of file diff --git a/apps/iot/src/postman/mqtt_postman.erl b/apps/iot/src/postman/mqtt_postman.erl index 17dc044..6e0177b 100644 --- a/apps/iot/src/postman/mqtt_postman.erl +++ b/apps/iot/src/postman/mqtt_postman.erl @@ -102,7 +102,7 @@ handle_info({post, ReceiverPid, #post_data{id = Id, location_code = LocationCode lager:debug("[mqtt_postman] will publish topic: ~p, message: ~ts, qos: ~p", [Topic, Message, Qos]), case emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, Qos}, {retain, true}]) of ok -> - ReceiverPid ! {ack, Id}, + ReceiverPid ! {ack, Id, Message}, {noreply, State}; {ok, PacketId} -> {noreply, State#state{inflight = maps:put(PacketId, {Id, ReceiverPid, Message}, InFlight)}};