From f7ceb88ce710565882c11c4b03622fc202a08f3d Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Mon, 25 Dec 2023 11:27:27 +0800 Subject: [PATCH] fix zd consumer --- apps/iot/src/consumer/iot_zd_consumer.erl | 55 +++++++++++++---------- 1 file changed, 32 insertions(+), 23 deletions(-) diff --git a/apps/iot/src/consumer/iot_zd_consumer.erl b/apps/iot/src/consumer/iot_zd_consumer.erl index 98f80b4..bd9996f 100644 --- a/apps/iot/src/consumer/iot_zd_consumer.erl +++ b/apps/iot/src/consumer/iot_zd_consumer.erl @@ -32,10 +32,9 @@ ]). -record(state, { - conn_pid :: pid(), + conn_pid :: undefined | pid(), logger_pid :: pid(), - mqtt_props :: list(), - is_connected = false + mqtt_props :: list() }). %%%=================================================================== @@ -83,9 +82,9 @@ init([]) -> %% 创建转发器, 避免阻塞当前进程的创建,因此采用了延时初始化的机制 erlang:start_timer(0, self(), create_consumer), %% 启动日志记录器 - {ok, LoggerPid} = iot_logger:start_link("directive_data"), + {ok, LoggerPid} = iot_logger:start_link("zd_directive_data"), - {ok, #state{mqtt_props = Props, conn_pid = undefined, logger_pid = LoggerPid, is_connected = false}}. + {ok, #state{mqtt_props = Props, conn_pid = undefined, logger_pid = LoggerPid}}. %% @private %% @doc Handling call messages @@ -97,8 +96,8 @@ init([]) -> {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | {stop, Reason :: term(), NewState :: #state{}}). -handle_call({mock, Request}, _From, State = #state{conn_pid = _ConnPid}) -> - publish_directive(Request), +handle_call({mock, Request}, _From, State = #state{conn_pid = ConnPid}) when is_pid(ConnPid) -> + publish_directive(Request, jiffy:encode(Request, [force_utf8])), {reply, ok, State}. %% @private @@ -120,19 +119,23 @@ handle_info({disconnect, ReasonCode, Properties}, State) -> lager:debug("[iot_zd_consumer] Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [ReasonCode, Properties]), {stop, disconnected, State}; %% 必须要做到消息的快速分发,数据的json反序列需要在host进程进行 -handle_info({publish, #{packet_id := _PacketId, payload := Payload, qos := Qos, topic := Topic}}, State) -> - lager:debug("[iot_zd_consumer] Recv a topic: ~p, publish packet: ~ts, qos: ~p", [Topic, Payload, Qos]), +handle_info({publish, #{packet_id := _PacketId, payload := Payload, qos := 2, topic := Topic}}, State) -> + lager:debug("[iot_zd_consumer] Recv a topic: ~p, publish packet: ~ts, qos: 2", [Topic, Payload]), Request = catch jiffy:decode(Payload, [return_maps]), - publish_directive(Request), + publish_directive(Request, Payload), {noreply, State}; +handle_info({publish, #{packet_id := _PacketId, payload := Payload, qos := Qos, topic := Topic}}, State) -> + lager:notice("[iot_zd_consumer] Recv a topic: ~p, publish packet: ~ts, qos: ~p, qos is error", [Topic, Payload, Qos]), + {noreply, State}; + handle_info({puback, Packet = #{packet_id := _PacketId}}, State = #state{}) -> lager:debug("[iot_zd_consumer] receive puback packet: ~p", [Packet]), {noreply, State}; -handle_info({timeout, _, create_consumer}, State = #state{mqtt_props = Props, is_connected = false}) -> +handle_info({timeout, _, create_consumer}, State = #state{mqtt_props = Props, conn_pid = undefined}) -> try {ok, ConnPid} = create_consumer(Props), {noreply, State#state{conn_pid = ConnPid}} @@ -149,12 +152,17 @@ handle_info({'EXIT', ConnPid, Reason}, State = #state{conn_pid = ConnPid}) -> {noreply, State#state{conn_pid = undefined}}; -handle_info({directive_reply, Reply}, State = #state{}) -> - lager:debug("[iot_zd_consumer] get directive_reply: ~ts", [Reply]), +handle_info({directive_reply, Reply}, State = #state{logger_pid = LoggerPid}) -> + case Reply of + {ok, RawReq, DirectiveResult} -> + iot_logger:write(LoggerPid, [<<"[success]">>, RawReq, DirectiveResult]); + {error, {RawReq, Error}} -> + iot_logger:write(LoggerPid, [<<"[error]">>, RawReq, Error]) + end, {noreply, State}; handle_info(Info, State = #state{}) -> - lager:debug("[iot_zd_consumer] get info: ~p", [Info]), + lager:notice("[iot_zd_consumer] get a unknown info: ~p", [Info]), {noreply, State}. %% @private @@ -188,27 +196,28 @@ code_change(_OldVsn, State = #state{}, _Extra) -> %%% Internal functions %%%=================================================================== -publish_directive(#{<<"version">> := Version, <<"location_code">> := LocationCode, <<"properties">> := DirectiveParams}) -> +publish_directive(#{<<"version">> := Version, <<"location_code">> := LocationCode, <<"properties">> := DirectiveParams}, RawReq) -> %% 通过LocationCode查找到主机和Device_uuid + ReceiverPid = self(), case redis_client:hgetall(LocationCode) of {ok, #{<<"host_uuid">> := HostUUID, <<"device_uuid">> := DeviceUUID}} -> case iot_host:get_pid(HostUUID) of undefined -> - lager:notice("[iot_zd_consumer] host uuid: ~p, not found", [HostUUID]); + ReceiverPid ! {directive_reply, {error, RawReq, <<"host uuid: ", HostUUID/binary, " not found">>}}; Pid -> - ReceiverPid = self(), spawn(fun() -> DirectiveResult = iot_host:publish_directive(Pid, DeviceUUID, ?DIRECTIVE_ZD_CTRL, Version, DirectiveParams, ?EXECUTE_TIMEOUT), - ReceiverPid ! {directive_reply, DirectiveResult} + ReceiverPid ! {directive_reply, {ok, RawReq, DirectiveResult}} end) end; - {ok, Map} -> - lager:notice("[iot_zd_consumer] location_code: ~p, redis data invalid: ~p", [LocationCode, Map]); + {ok, Map} when is_map(Map) -> + RedisData = iolist_to_binary(jiffy:encode(Map, [force_utf8])), + ReceiverPid ! {directive_reply, {error, RawReq, <<"invalid redis data: ", RedisData/binary>>}}; _ -> - lager:notice("[iot_zd_consumer] location_code: ~p, not found in redis", [LocationCode]) + ReceiverPid ! {directive_reply, {error, RawReq, <<"location_code: ", LocationCode/binary, " not found in redis">>}} end; -publish_directive(Other) -> - lager:notice("[iot_zd_consumer] get a unknown directive", [Other]). +publish_directive(_Other, RawReq) -> + self() ! {directive_reply, {error, RawReq, <<"unknown directive">>}}. -spec create_consumer(Props :: list()) -> {ok, ConnPid :: pid()} | {error, Reason :: any()}. create_consumer(Props) when is_list(Props) ->