iot_cloud/apps/iot/src/mocker/iot_mqtt_consumer.erl
2025-05-09 22:51:32 +08:00

281 lines
12 KiB
Erlang
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

%%%-------------------------------------------------------------------
%%% @author aresei
%%% @copyright (C) 2023, <COMPANY>
%%% @doc
%%% 1. 需要考虑集群部署的相关问题,上行的数据可能在集群中共享
%%% 2. host进程不能直接去监听topic这样涉及到新增和下线的很多问题
%%% @end
%%% Created : 12. 3月 2023 21:27
%%%-------------------------------------------------------------------
-module(iot_mqtt_consumer).
-author("aresei").
-include("iot.hrl").
-behaviour(gen_server).
%% API
-export([start_link/0]).
-export([mock/5]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-define(RETRY_INTERVAL, 5000).
%% 执行超时时间
-define(EXECUTE_TIMEOUT, 10 * 1000).
%% 需要订阅的主题信息
-define(Topics,[
{<<"CET/NX/download">>, 2}
]).
-record(state, {
conn_pid :: undefined | pid(),
logger_pid :: pid(),
mqtt_props :: list(),
%% 执行中的任务数
flight_num = 0
}).
%%%===================================================================
%%% API
%%%===================================================================
mock(LocationCode, Para, SType, CType, Value) when is_binary(LocationCode), is_integer(SType), is_integer(CType), is_integer(Para) ->
Req = #{
<<"version">> => <<"1.0">>,
<<"ts">> => iot_util:current_time(),
<<"properties">> => #{
<<"type">> => <<"ctrl">>,
<<"para">> => Para,
<<"stype">> => SType,
<<"ctype">> => CType,
<<"value">> => Value,
<<"timestamp">> => iot_util:current_time()
},
<<"location_code">> => LocationCode
},
gen_server:call(?MODULE, {mock, Req}).
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link() ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% @private
%% @doc Initializes the server
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([]) ->
erlang:process_flag(trap_exit, true),
{ok, Props} = application:get_env(iot, zhongdian),
%% 创建转发器, 避免阻塞当前进程的创建,因此采用了延时初始化的机制
erlang:start_timer(0, self(), create_consumer),
%% 启动日志记录器
{ok, LoggerPid} = iot_logger:start_link("zd_directive_data"),
{ok, #state{mqtt_props = Props, conn_pid = undefined, logger_pid = LoggerPid}}.
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call({mock, Request}, _From, State = #state{conn_pid = ConnPid, flight_num = FlightNum}) when is_pid(ConnPid) ->
publish_directive(Request, jiffy:encode(Request, [force_utf8])),
{reply, ok, State#state{flight_num = FlightNum + 1}}.
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast(_Request, State = #state{}) ->
{noreply, State}.
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info({disconnect, ReasonCode, Properties}, State) ->
lager:debug("[iot_zd_consumer] Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [ReasonCode, Properties]),
{stop, disconnected, State};
%% 必须要做到消息的快速分发数据的json反序列需要在host进程进行
handle_info({publish, #{packet_id := _PacketId, payload := Payload, qos := 2, topic := Topic}}, State = #state{flight_num = FlightNum}) ->
lager:debug("[iot_zd_consumer] Recv a topic: ~p, publish packet: ~ts, qos: 2", [Topic, Payload]),
Request = catch jiffy:decode(Payload, [return_maps]),
publish_directive(Request, Payload),
{noreply, State#state{flight_num = FlightNum + 1}};
handle_info({publish, #{packet_id := _PacketId, payload := Payload, qos := Qos, topic := Topic}}, State) ->
lager:notice("[iot_zd_consumer] Recv a topic: ~p, publish packet: ~ts, qos: ~p, qos is error", [Topic, Payload, Qos]),
{noreply, State};
handle_info({puback, Packet = #{packet_id := _PacketId}}, State = #state{}) ->
lager:debug("[iot_zd_consumer] receive puback packet: ~p", [Packet]),
{noreply, State};
handle_info({timeout, _, create_consumer}, State = #state{mqtt_props = Props, conn_pid = undefined}) ->
try
{ok, ConnPid} = create_consumer(Props),
{noreply, State#state{conn_pid = ConnPid}}
catch _:Error:Stack ->
lager:warning("[iot_zd_consumer] config: ~p, create consumer get error: ~p, stack: ~p", [Props, Error, Stack]),
erlang:start_timer(?RETRY_INTERVAL, self(), create_consumer),
{noreply, State#state{conn_pid = undefined}}
end;
%% postman进程挂掉时重新建立新的
handle_info({'EXIT', ConnPid, Reason}, State = #state{conn_pid = ConnPid}) ->
lager:warning("[iot_zd_consumer] consumer exited with reason: ~p", [Reason]),
erlang:start_timer(?RETRY_INTERVAL, self(), create_consumer),
{noreply, State#state{conn_pid = undefined}};
handle_info({'EXIT', LoggerPid, Reason}, State = #state{logger_pid = LoggerPid}) ->
lager:warning("[iot_zd_consumer] logger exited with reason: ~p", [Reason]),
{ok, LoggerPid} = iot_logger:start_link("zd_directive_data"),
{noreply, State#state{logger_pid = LoggerPid}};
handle_info({directive_reply, Reply}, State = #state{logger_pid = LoggerPid, flight_num = FlightNum}) ->
FlightInfo = <<"flight_num: ", (integer_to_binary(FlightNum - 1))/binary>>,
case Reply of
{ok, RawReq, DirectiveResult} ->
case DirectiveResult of
ok ->
iot_logger:write(LoggerPid, [<<"[success]">>, RawReq, <<"OK">>, FlightInfo]);
{ok, Response} when is_binary(Response) ->
iot_logger:write(LoggerPid, [<<"[success]">>, RawReq, Response, FlightInfo]);
{error, Reason0} ->
Reason = if
is_atom(Reason0) -> atom_to_binary(Reason0);
is_binary(Reason0) -> Reason0;
true -> <<"Unknow error">>
end,
iot_logger:write(LoggerPid, [<<"[error]">>, RawReq, Reason, FlightInfo])
end;
{error, RawReq, Error} when is_binary(Error) ->
iot_logger:write(LoggerPid, [<<"[error]">>, RawReq, Error, FlightInfo])
end,
{noreply, State#state{flight_num = FlightNum - 1}};
handle_info(Info, State = #state{}) ->
lager:notice("[iot_zd_consumer] get a unknown info: ~p", [Info]),
{noreply, State}.
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(Reason, _State = #state{conn_pid = ConnPid}) when is_pid(ConnPid) ->
%% 取消topic的订阅
TopicNames = lists:map(fun({Name, _}) -> Name end, ?Topics),
{ok, _Props, _ReasonCode} = emqtt:unsubscribe(ConnPid, #{}, TopicNames),
ok = emqtt:disconnect(ConnPid),
lager:debug("[iot_zd_consumer] terminate with reason: ~p", [Reason]),
ok;
terminate(Reason, _State) ->
lager:debug("[iot_zd_consumer] terminate with reason: ~p", [Reason]),
ok.
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
publish_directive(#{<<"version">> := Version, <<"location_code">> := LocationCode, <<"properties">> := DirectiveParams}, RawReq) ->
%% 通过LocationCode查找到主机和Device_uuid
ReceiverPid = self(),
case redis_client:hgetall(LocationCode) of
{ok, #{<<"host_uuid">> := HostUUID, <<"device_uuid">> := DeviceUUID}} ->
case iot_host:get_pid(HostUUID) of
undefined ->
ReceiverPid ! {directive_reply, {error, RawReq, <<"host uuid: ", HostUUID/binary, " not found">>}};
Pid when is_pid(Pid) ->
ok
end;
{ok, Map} when is_map(Map) ->
RedisData = iolist_to_binary(jiffy:encode(Map, [force_utf8])),
ReceiverPid ! {directive_reply, {error, RawReq, <<"invalid redis data: ", RedisData/binary>>}};
_ ->
ReceiverPid ! {directive_reply, {error, RawReq, <<"location_code: ", LocationCode/binary, " not found in redis">>}}
end;
publish_directive(Other, RawReq) ->
lager:warning("[iot_zd_consumer] get a error message: ~p", [Other]),
self() ! {directive_reply, {error, RawReq, <<"unknown directive">>}}.
-spec create_consumer(Props :: list()) -> {ok, ConnPid :: pid()} | {error, Reason :: any()}.
create_consumer(Props) when is_list(Props) ->
Node = atom_to_binary(node()),
ClientId = <<"mqtt-client-", Node/binary, "-zhongdian_mqtt_consumer">>,
%% 建立到emqx服务器的连接
Host = proplists:get_value(host, Props),
Port = proplists:get_value(port, Props, 18080),
Username = proplists:get_value(username, Props),
Password = proplists:get_value(password, Props),
Keepalive = proplists:get_value(keepalive, Props, 86400),
Opts = [
{clientid, ClientId},
{host, Host},
{port, Port},
{owner, self()},
{tcp_opts, []},
{username, Username},
{password, Password},
{keepalive, Keepalive},
{auto_ack, true},
{connect_timeout, 5000},
{proto_ver, v5},
{retry_interval, 5000}
],
%% 建立到emqx服务器的连接
lager:debug("[iot_zd_consumer] opts is: ~p", [Opts]),
case emqtt:start_link(Opts) of
{ok, ConnPid} ->
%% 监听和host相关的全部事件
lager:debug("[iot_zd_consumer] start conntecting, pid: ~p", [ConnPid]),
{ok, _} = emqtt:connect(ConnPid),
lager:debug("[iot_zd_consumer] connect success, pid: ~p", [ConnPid]),
SubscribeResult = emqtt:subscribe(ConnPid, ?Topics),
lager:debug("[iot_zd_consumer] subscribe topics: ~p, result is: ~p", [?Topics, SubscribeResult]),
{ok, ConnPid};
ignore ->
{error, ignore};
{error, Reason} ->
{error, Reason}
end.