fix zd consumer flight
This commit is contained in:
parent
f7ceb88ce7
commit
1046ee8979
@ -34,7 +34,9 @@
|
|||||||
-record(state, {
|
-record(state, {
|
||||||
conn_pid :: undefined | pid(),
|
conn_pid :: undefined | pid(),
|
||||||
logger_pid :: pid(),
|
logger_pid :: pid(),
|
||||||
mqtt_props :: list()
|
mqtt_props :: list(),
|
||||||
|
%% 执行中的任务数
|
||||||
|
flight_num = 0
|
||||||
}).
|
}).
|
||||||
|
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
@ -119,13 +121,13 @@ handle_info({disconnect, ReasonCode, Properties}, State) ->
|
|||||||
lager:debug("[iot_zd_consumer] Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [ReasonCode, Properties]),
|
lager:debug("[iot_zd_consumer] Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [ReasonCode, Properties]),
|
||||||
{stop, disconnected, State};
|
{stop, disconnected, State};
|
||||||
%% 必须要做到消息的快速分发,数据的json反序列需要在host进程进行
|
%% 必须要做到消息的快速分发,数据的json反序列需要在host进程进行
|
||||||
handle_info({publish, #{packet_id := _PacketId, payload := Payload, qos := 2, topic := Topic}}, State) ->
|
handle_info({publish, #{packet_id := _PacketId, payload := Payload, qos := 2, topic := Topic}}, State = #state{flight_num = FlightNum}) ->
|
||||||
lager:debug("[iot_zd_consumer] Recv a topic: ~p, publish packet: ~ts, qos: 2", [Topic, Payload]),
|
lager:debug("[iot_zd_consumer] Recv a topic: ~p, publish packet: ~ts, qos: 2", [Topic, Payload]),
|
||||||
|
|
||||||
Request = catch jiffy:decode(Payload, [return_maps]),
|
Request = catch jiffy:decode(Payload, [return_maps]),
|
||||||
publish_directive(Request, Payload),
|
publish_directive(Request, Payload),
|
||||||
|
|
||||||
{noreply, State};
|
{noreply, State#state{flight_num = FlightNum + 1}};
|
||||||
|
|
||||||
handle_info({publish, #{packet_id := _PacketId, payload := Payload, qos := Qos, topic := Topic}}, State) ->
|
handle_info({publish, #{packet_id := _PacketId, payload := Payload, qos := Qos, topic := Topic}}, State) ->
|
||||||
lager:notice("[iot_zd_consumer] Recv a topic: ~p, publish packet: ~ts, qos: ~p, qos is error", [Topic, Payload, Qos]),
|
lager:notice("[iot_zd_consumer] Recv a topic: ~p, publish packet: ~ts, qos: ~p, qos is error", [Topic, Payload, Qos]),
|
||||||
@ -152,14 +154,15 @@ handle_info({'EXIT', ConnPid, Reason}, State = #state{conn_pid = ConnPid}) ->
|
|||||||
|
|
||||||
{noreply, State#state{conn_pid = undefined}};
|
{noreply, State#state{conn_pid = undefined}};
|
||||||
|
|
||||||
handle_info({directive_reply, Reply}, State = #state{logger_pid = LoggerPid}) ->
|
handle_info({directive_reply, Reply}, State = #state{logger_pid = LoggerPid, flight_num = FlightNum}) ->
|
||||||
|
FlightInfo = <<"flight_num: ", (integer_to_binary(FlightNum - 1))/binary>>,
|
||||||
case Reply of
|
case Reply of
|
||||||
{ok, RawReq, DirectiveResult} ->
|
{ok, RawReq, DirectiveResult} ->
|
||||||
iot_logger:write(LoggerPid, [<<"[success]">>, RawReq, DirectiveResult]);
|
iot_logger:write(LoggerPid, [<<"[success]">>, RawReq, DirectiveResult, FlightInfo]);
|
||||||
{error, {RawReq, Error}} ->
|
{error, {RawReq, Error}} ->
|
||||||
iot_logger:write(LoggerPid, [<<"[error]">>, RawReq, Error])
|
iot_logger:write(LoggerPid, [<<"[error]">>, RawReq, Error, FlightInfo])
|
||||||
end,
|
end,
|
||||||
{noreply, State};
|
{noreply, State#state{flight_num = FlightNum - 1}};
|
||||||
|
|
||||||
handle_info(Info, State = #state{}) ->
|
handle_info(Info, State = #state{}) ->
|
||||||
lager:notice("[iot_zd_consumer] get a unknown info: ~p", [Info]),
|
lager:notice("[iot_zd_consumer] get a unknown info: ~p", [Info]),
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user