diff --git a/apps/iot/src/consumer/iot_zd_consumer.erl b/apps/iot/src/consumer/iot_zd_consumer.erl index bd9996f..cdca9cb 100644 --- a/apps/iot/src/consumer/iot_zd_consumer.erl +++ b/apps/iot/src/consumer/iot_zd_consumer.erl @@ -34,7 +34,9 @@ -record(state, { conn_pid :: undefined | pid(), logger_pid :: pid(), - mqtt_props :: list() + mqtt_props :: list(), + %% 执行中的任务数 + flight_num = 0 }). %%%=================================================================== @@ -119,13 +121,13 @@ handle_info({disconnect, ReasonCode, Properties}, State) -> lager:debug("[iot_zd_consumer] Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [ReasonCode, Properties]), {stop, disconnected, State}; %% 必须要做到消息的快速分发,数据的json反序列需要在host进程进行 -handle_info({publish, #{packet_id := _PacketId, payload := Payload, qos := 2, topic := Topic}}, State) -> +handle_info({publish, #{packet_id := _PacketId, payload := Payload, qos := 2, topic := Topic}}, State = #state{flight_num = FlightNum}) -> lager:debug("[iot_zd_consumer] Recv a topic: ~p, publish packet: ~ts, qos: 2", [Topic, Payload]), Request = catch jiffy:decode(Payload, [return_maps]), publish_directive(Request, Payload), - {noreply, State}; + {noreply, State#state{flight_num = FlightNum + 1}}; handle_info({publish, #{packet_id := _PacketId, payload := Payload, qos := Qos, topic := Topic}}, State) -> lager:notice("[iot_zd_consumer] Recv a topic: ~p, publish packet: ~ts, qos: ~p, qos is error", [Topic, Payload, Qos]), @@ -152,14 +154,15 @@ handle_info({'EXIT', ConnPid, Reason}, State = #state{conn_pid = ConnPid}) -> {noreply, State#state{conn_pid = undefined}}; -handle_info({directive_reply, Reply}, State = #state{logger_pid = LoggerPid}) -> +handle_info({directive_reply, Reply}, State = #state{logger_pid = LoggerPid, flight_num = FlightNum}) -> + FlightInfo = <<"flight_num: ", (integer_to_binary(FlightNum - 1))/binary>>, case Reply of {ok, RawReq, DirectiveResult} -> - iot_logger:write(LoggerPid, [<<"[success]">>, RawReq, DirectiveResult]); + iot_logger:write(LoggerPid, [<<"[success]">>, RawReq, DirectiveResult, FlightInfo]); {error, {RawReq, Error}} -> - iot_logger:write(LoggerPid, [<<"[error]">>, RawReq, Error]) + iot_logger:write(LoggerPid, [<<"[error]">>, RawReq, Error, FlightInfo]) end, - {noreply, State}; + {noreply, State#state{flight_num = FlightNum - 1}}; handle_info(Info, State = #state{}) -> lager:notice("[iot_zd_consumer] get a unknown info: ~p", [Info]),