fix
This commit is contained in:
parent
231ddbad22
commit
e7044534a5
@ -62,7 +62,7 @@ init([]) ->
|
|||||||
%% 启动日志记录器
|
%% 启动日志记录器
|
||||||
{ok, LoggerPid} = iot_logger:start_link("directive_data"),
|
{ok, LoggerPid} = iot_logger:start_link("directive_data"),
|
||||||
|
|
||||||
{ok, disconnected, #state{mqtt_props = Props, conn_pid = undefined, logger_pid = LoggerPid, is_connected = false}}.
|
{ok, #state{mqtt_props = Props, conn_pid = undefined, logger_pid = LoggerPid, is_connected = false}}.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
%% @doc Handling call messages
|
%% @doc Handling call messages
|
||||||
@ -97,7 +97,7 @@ handle_info({disconnect, ReasonCode, Properties}, State) ->
|
|||||||
{stop, disconnected, State};
|
{stop, disconnected, State};
|
||||||
%% 必须要做到消息的快速分发,数据的json反序列需要在host进程进行
|
%% 必须要做到消息的快速分发,数据的json反序列需要在host进程进行
|
||||||
handle_info({publish, #{packet_id := _PacketId, payload := Payload, qos := Qos, topic := Topic}}, State) ->
|
handle_info({publish, #{packet_id := _PacketId, payload := Payload, qos := Qos, topic := Topic}}, State) ->
|
||||||
lager:debug("[iot_zd_consumer] Recv a topic: ~p, publish packet: ~p, qos: ~p", [Topic, Payload, Qos]),
|
lager:debug("[iot_zd_consumer] Recv a topic: ~p, publish packet: ~ts, qos: ~p", [Topic, Payload, Qos]),
|
||||||
%% 将消息分发到对应的host进程去处理
|
%% 将消息分发到对应的host进程去处理
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
handle_info({puback, Packet = #{packet_id := _PacketId}}, State = #state{}) ->
|
handle_info({puback, Packet = #{packet_id := _PacketId}}, State = #state{}) ->
|
||||||
@ -119,7 +119,7 @@ handle_info({'EXIT', ConnPid, Reason}, State = #state{conn_pid = ConnPid}) ->
|
|||||||
lager:warning("[iot_zd_consumer] consumer exited with reason: ~p", [Reason]),
|
lager:warning("[iot_zd_consumer] consumer exited with reason: ~p", [Reason]),
|
||||||
erlang:start_timer(?RETRY_INTERVAL, self(), create_consumer),
|
erlang:start_timer(?RETRY_INTERVAL, self(), create_consumer),
|
||||||
|
|
||||||
{next_state, disconnected, State#state{conn_pid = undefined}};
|
{noreply, State#state{conn_pid = undefined}};
|
||||||
|
|
||||||
handle_info(Info, State = #state{}) ->
|
handle_info(Info, State = #state{}) ->
|
||||||
lager:debug("[iot_zd_consumer] get info: ~p", [Info]),
|
lager:debug("[iot_zd_consumer] get info: ~p", [Info]),
|
||||||
|
|||||||
@ -235,6 +235,9 @@ do_post(PostmanPid, #north_data{id = Id, location_code = LocationCode, fields =
|
|||||||
<<"ts">> => Timestamp,
|
<<"ts">> => Timestamp,
|
||||||
<<"properties">> => Fields
|
<<"properties">> => Fields
|
||||||
},
|
},
|
||||||
|
try
|
||||||
Body = iolist_to_binary(jiffy:encode(Data, [force_utf8])),
|
Body = iolist_to_binary(jiffy:encode(Data, [force_utf8])),
|
||||||
PostmanPid ! {post, self(), #post_data{id = Id, location_code = LocationCode, body = Body}},
|
PostmanPid ! {post, self(), #post_data{id = Id, location_code = LocationCode, body = Body}}
|
||||||
ok.
|
catch _:_ ->
|
||||||
|
self() ! {ack, Id, <<"json error">>}
|
||||||
|
end.
|
||||||
@ -102,7 +102,7 @@ handle_info({post, ReceiverPid, #post_data{id = Id, location_code = LocationCode
|
|||||||
lager:debug("[mqtt_postman] will publish topic: ~p, message: ~ts, qos: ~p", [Topic, Message, Qos]),
|
lager:debug("[mqtt_postman] will publish topic: ~p, message: ~ts, qos: ~p", [Topic, Message, Qos]),
|
||||||
case emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, Qos}, {retain, true}]) of
|
case emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, Qos}, {retain, true}]) of
|
||||||
ok ->
|
ok ->
|
||||||
ReceiverPid ! {ack, Id},
|
ReceiverPid ! {ack, Id, Message},
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
{ok, PacketId} ->
|
{ok, PacketId} ->
|
||||||
{noreply, State#state{inflight = maps:put(PacketId, {Id, ReceiverPid, Message}, InFlight)}};
|
{noreply, State#state{inflight = maps:put(PacketId, {Id, ReceiverPid, Message}, InFlight)}};
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user