fix zd consumer

This commit is contained in:
anlicheng 2023-12-25 11:27:27 +08:00
parent d318ed42f5
commit f7ceb88ce7

View File

@ -32,10 +32,9 @@
]). ]).
-record(state, { -record(state, {
conn_pid :: pid(), conn_pid :: undefined | pid(),
logger_pid :: pid(), logger_pid :: pid(),
mqtt_props :: list(), mqtt_props :: list()
is_connected = false
}). }).
%%%=================================================================== %%%===================================================================
@ -83,9 +82,9 @@ init([]) ->
%% , %% ,
erlang:start_timer(0, self(), create_consumer), erlang:start_timer(0, self(), create_consumer),
%% %%
{ok, LoggerPid} = iot_logger:start_link("directive_data"), {ok, LoggerPid} = iot_logger:start_link("zd_directive_data"),
{ok, #state{mqtt_props = Props, conn_pid = undefined, logger_pid = LoggerPid, is_connected = false}}. {ok, #state{mqtt_props = Props, conn_pid = undefined, logger_pid = LoggerPid}}.
%% @private %% @private
%% @doc Handling call messages %% @doc Handling call messages
@ -97,8 +96,8 @@ init([]) ->
{noreply, NewState :: #state{}, timeout() | hibernate} | {noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}). {stop, Reason :: term(), NewState :: #state{}}).
handle_call({mock, Request}, _From, State = #state{conn_pid = _ConnPid}) -> handle_call({mock, Request}, _From, State = #state{conn_pid = ConnPid}) when is_pid(ConnPid) ->
publish_directive(Request), publish_directive(Request, jiffy:encode(Request, [force_utf8])),
{reply, ok, State}. {reply, ok, State}.
%% @private %% @private
@ -120,19 +119,23 @@ handle_info({disconnect, ReasonCode, Properties}, State) ->
lager:debug("[iot_zd_consumer] Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [ReasonCode, Properties]), lager:debug("[iot_zd_consumer] Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [ReasonCode, Properties]),
{stop, disconnected, State}; {stop, disconnected, State};
%% json反序列需要在host进程进行 %% json反序列需要在host进程进行
handle_info({publish, #{packet_id := _PacketId, payload := Payload, qos := Qos, topic := Topic}}, State) -> handle_info({publish, #{packet_id := _PacketId, payload := Payload, qos := 2, topic := Topic}}, State) ->
lager:debug("[iot_zd_consumer] Recv a topic: ~p, publish packet: ~ts, qos: ~p", [Topic, Payload, Qos]), lager:debug("[iot_zd_consumer] Recv a topic: ~p, publish packet: ~ts, qos: 2", [Topic, Payload]),
Request = catch jiffy:decode(Payload, [return_maps]), Request = catch jiffy:decode(Payload, [return_maps]),
publish_directive(Request), publish_directive(Request, Payload),
{noreply, State}; {noreply, State};
handle_info({publish, #{packet_id := _PacketId, payload := Payload, qos := Qos, topic := Topic}}, State) ->
lager:notice("[iot_zd_consumer] Recv a topic: ~p, publish packet: ~ts, qos: ~p, qos is error", [Topic, Payload, Qos]),
{noreply, State};
handle_info({puback, Packet = #{packet_id := _PacketId}}, State = #state{}) -> handle_info({puback, Packet = #{packet_id := _PacketId}}, State = #state{}) ->
lager:debug("[iot_zd_consumer] receive puback packet: ~p", [Packet]), lager:debug("[iot_zd_consumer] receive puback packet: ~p", [Packet]),
{noreply, State}; {noreply, State};
handle_info({timeout, _, create_consumer}, State = #state{mqtt_props = Props, is_connected = false}) -> handle_info({timeout, _, create_consumer}, State = #state{mqtt_props = Props, conn_pid = undefined}) ->
try try
{ok, ConnPid} = create_consumer(Props), {ok, ConnPid} = create_consumer(Props),
{noreply, State#state{conn_pid = ConnPid}} {noreply, State#state{conn_pid = ConnPid}}
@ -149,12 +152,17 @@ handle_info({'EXIT', ConnPid, Reason}, State = #state{conn_pid = ConnPid}) ->
{noreply, State#state{conn_pid = undefined}}; {noreply, State#state{conn_pid = undefined}};
handle_info({directive_reply, Reply}, State = #state{}) -> handle_info({directive_reply, Reply}, State = #state{logger_pid = LoggerPid}) ->
lager:debug("[iot_zd_consumer] get directive_reply: ~ts", [Reply]), case Reply of
{ok, RawReq, DirectiveResult} ->
iot_logger:write(LoggerPid, [<<"[success]">>, RawReq, DirectiveResult]);
{error, {RawReq, Error}} ->
iot_logger:write(LoggerPid, [<<"[error]">>, RawReq, Error])
end,
{noreply, State}; {noreply, State};
handle_info(Info, State = #state{}) -> handle_info(Info, State = #state{}) ->
lager:debug("[iot_zd_consumer] get info: ~p", [Info]), lager:notice("[iot_zd_consumer] get a unknown info: ~p", [Info]),
{noreply, State}. {noreply, State}.
%% @private %% @private
@ -188,27 +196,28 @@ code_change(_OldVsn, State = #state{}, _Extra) ->
%%% Internal functions %%% Internal functions
%%%=================================================================== %%%===================================================================
publish_directive(#{<<"version">> := Version, <<"location_code">> := LocationCode, <<"properties">> := DirectiveParams}) -> publish_directive(#{<<"version">> := Version, <<"location_code">> := LocationCode, <<"properties">> := DirectiveParams}, RawReq) ->
%% LocationCode查找到主机和Device_uuid %% LocationCode查找到主机和Device_uuid
ReceiverPid = self(),
case redis_client:hgetall(LocationCode) of case redis_client:hgetall(LocationCode) of
{ok, #{<<"host_uuid">> := HostUUID, <<"device_uuid">> := DeviceUUID}} -> {ok, #{<<"host_uuid">> := HostUUID, <<"device_uuid">> := DeviceUUID}} ->
case iot_host:get_pid(HostUUID) of case iot_host:get_pid(HostUUID) of
undefined -> undefined ->
lager:notice("[iot_zd_consumer] host uuid: ~p, not found", [HostUUID]); ReceiverPid ! {directive_reply, {error, RawReq, <<"host uuid: ", HostUUID/binary, " not found">>}};
Pid -> Pid ->
ReceiverPid = self(),
spawn(fun() -> spawn(fun() ->
DirectiveResult = iot_host:publish_directive(Pid, DeviceUUID, ?DIRECTIVE_ZD_CTRL, Version, DirectiveParams, ?EXECUTE_TIMEOUT), DirectiveResult = iot_host:publish_directive(Pid, DeviceUUID, ?DIRECTIVE_ZD_CTRL, Version, DirectiveParams, ?EXECUTE_TIMEOUT),
ReceiverPid ! {directive_reply, DirectiveResult} ReceiverPid ! {directive_reply, {ok, RawReq, DirectiveResult}}
end) end)
end; end;
{ok, Map} -> {ok, Map} when is_map(Map) ->
lager:notice("[iot_zd_consumer] location_code: ~p, redis data invalid: ~p", [LocationCode, Map]); RedisData = iolist_to_binary(jiffy:encode(Map, [force_utf8])),
ReceiverPid ! {directive_reply, {error, RawReq, <<"invalid redis data: ", RedisData/binary>>}};
_ -> _ ->
lager:notice("[iot_zd_consumer] location_code: ~p, not found in redis", [LocationCode]) ReceiverPid ! {directive_reply, {error, RawReq, <<"location_code: ", LocationCode/binary, " not found in redis">>}}
end; end;
publish_directive(Other) -> publish_directive(_Other, RawReq) ->
lager:notice("[iot_zd_consumer] get a unknown directive", [Other]). self() ! {directive_reply, {error, RawReq, <<"unknown directive">>}}.
-spec create_consumer(Props :: list()) -> {ok, ConnPid :: pid()} | {error, Reason :: any()}. -spec create_consumer(Props :: list()) -> {ok, ConnPid :: pid()} | {error, Reason :: any()}.
create_consumer(Props) when is_list(Props) -> create_consumer(Props) when is_list(Props) ->