%%%------------------------------------------------------------------- %%% @author aresei %%% @copyright (C) 2023, %%% @doc %%% 1. 需要考虑集群部署的相关问题,上行的数据可能在集群中共享 %%% 2. host进程不能直接去监听topic,这样涉及到新增和下线的很多问题 %%% @end %%% Created : 12. 3月 2023 21:27 %%%------------------------------------------------------------------- -module(iot_mqtt_consumer). -author("aresei"). -include("iot.hrl"). -behaviour(gen_server). %% API -export([start_link/0]). -export([mock/5]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -define(SERVER, ?MODULE). -define(RETRY_INTERVAL, 5000). %% 执行超时时间 -define(EXECUTE_TIMEOUT, 10 * 1000). %% 需要订阅的主题信息 -define(Topics,[ {<<"CET/NX/download">>, 2} ]). -record(state, { conn_pid :: undefined | pid(), logger_pid :: pid(), mqtt_props :: list(), %% 执行中的任务数 flight_num = 0 }). %%%=================================================================== %%% API %%%=================================================================== mock(LocationCode, Para, SType, CType, Value) when is_binary(LocationCode), is_integer(SType), is_integer(CType), is_integer(Para) -> Req = #{ <<"version">> => <<"1.0">>, <<"ts">> => iot_util:current_time(), <<"properties">> => #{ <<"type">> => <<"ctrl">>, <<"para">> => Para, <<"stype">> => SType, <<"ctype">> => CType, <<"value">> => Value, <<"timestamp">> => iot_util:current_time() }, <<"location_code">> => LocationCode }, gen_server:call(?MODULE, {mock, Req}). %% @doc Spawns the server and registers the local name (unique) -spec(start_link() -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). %%%=================================================================== %%% gen_server callbacks %%%=================================================================== %% @private %% @doc Initializes the server -spec(init(Args :: term()) -> {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). init([]) -> erlang:process_flag(trap_exit, true), {ok, Props} = application:get_env(iot, zhongdian), %% 创建转发器, 避免阻塞当前进程的创建,因此采用了延时初始化的机制 erlang:start_timer(0, self(), create_consumer), %% 启动日志记录器 {ok, LoggerPid} = iot_logger:start_link("zd_directive_data"), {ok, #state{mqtt_props = Props, conn_pid = undefined, logger_pid = LoggerPid}}. %% @private %% @doc Handling call messages -spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, State :: #state{}) -> {reply, Reply :: term(), NewState :: #state{}} | {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | {stop, Reason :: term(), NewState :: #state{}}). handle_call({mock, Request}, _From, State = #state{conn_pid = ConnPid, flight_num = FlightNum}) when is_pid(ConnPid) -> publish_directive(Request, jiffy:encode(Request, [force_utf8])), {reply, ok, State#state{flight_num = FlightNum + 1}}. %% @private %% @doc Handling cast messages -spec(handle_cast(Request :: term(), State :: #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). handle_cast(_Request, State = #state{}) -> {noreply, State}. %% @private %% @doc Handling all non call/cast messages -spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). handle_info({disconnect, ReasonCode, Properties}, State) -> lager:debug("[iot_zd_consumer] Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [ReasonCode, Properties]), {stop, disconnected, State}; %% 必须要做到消息的快速分发,数据的json反序列需要在host进程进行 handle_info({publish, #{packet_id := _PacketId, payload := Payload, qos := 2, topic := Topic}}, State = #state{flight_num = FlightNum}) -> lager:debug("[iot_zd_consumer] Recv a topic: ~p, publish packet: ~ts, qos: 2", [Topic, Payload]), Request = catch jiffy:decode(Payload, [return_maps]), publish_directive(Request, Payload), {noreply, State#state{flight_num = FlightNum + 1}}; handle_info({publish, #{packet_id := _PacketId, payload := Payload, qos := Qos, topic := Topic}}, State) -> lager:notice("[iot_zd_consumer] Recv a topic: ~p, publish packet: ~ts, qos: ~p, qos is error", [Topic, Payload, Qos]), {noreply, State}; handle_info({puback, Packet = #{packet_id := _PacketId}}, State = #state{}) -> lager:debug("[iot_zd_consumer] receive puback packet: ~p", [Packet]), {noreply, State}; handle_info({timeout, _, create_consumer}, State = #state{mqtt_props = Props, conn_pid = undefined}) -> try {ok, ConnPid} = create_consumer(Props), {noreply, State#state{conn_pid = ConnPid}} catch _:Error:Stack -> lager:warning("[iot_zd_consumer] config: ~p, create consumer get error: ~p, stack: ~p", [Props, Error, Stack]), erlang:start_timer(?RETRY_INTERVAL, self(), create_consumer), {noreply, State#state{conn_pid = undefined}} end; %% postman进程挂掉时,重新建立新的 handle_info({'EXIT', ConnPid, Reason}, State = #state{conn_pid = ConnPid}) -> lager:warning("[iot_zd_consumer] consumer exited with reason: ~p", [Reason]), erlang:start_timer(?RETRY_INTERVAL, self(), create_consumer), {noreply, State#state{conn_pid = undefined}}; handle_info({'EXIT', LoggerPid, Reason}, State = #state{logger_pid = LoggerPid}) -> lager:warning("[iot_zd_consumer] logger exited with reason: ~p", [Reason]), {ok, LoggerPid} = iot_logger:start_link("zd_directive_data"), {noreply, State#state{logger_pid = LoggerPid}}; handle_info({directive_reply, Reply}, State = #state{logger_pid = LoggerPid, flight_num = FlightNum}) -> FlightInfo = <<"flight_num: ", (integer_to_binary(FlightNum - 1))/binary>>, case Reply of {ok, RawReq, DirectiveResult} -> case DirectiveResult of ok -> iot_logger:write(LoggerPid, [<<"[success]">>, RawReq, <<"OK">>, FlightInfo]); {ok, Response} when is_binary(Response) -> iot_logger:write(LoggerPid, [<<"[success]">>, RawReq, Response, FlightInfo]); {error, Reason0} -> Reason = if is_atom(Reason0) -> atom_to_binary(Reason0); is_binary(Reason0) -> Reason0; true -> <<"Unknow error">> end, iot_logger:write(LoggerPid, [<<"[error]">>, RawReq, Reason, FlightInfo]) end; {error, RawReq, Error} when is_binary(Error) -> iot_logger:write(LoggerPid, [<<"[error]">>, RawReq, Error, FlightInfo]) end, {noreply, State#state{flight_num = FlightNum - 1}}; handle_info(Info, State = #state{}) -> lager:notice("[iot_zd_consumer] get a unknown info: ~p", [Info]), {noreply, State}. %% @private %% @doc This function is called by a gen_server when it is about to %% terminate. It should be the opposite of Module:init/1 and do any %% necessary cleaning up. When it returns, the gen_server terminates %% with Reason. The return value is ignored. -spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), State :: #state{}) -> term()). terminate(Reason, _State = #state{conn_pid = ConnPid}) when is_pid(ConnPid) -> %% 取消topic的订阅 TopicNames = lists:map(fun({Name, _}) -> Name end, ?Topics), {ok, _Props, _ReasonCode} = emqtt:unsubscribe(ConnPid, #{}, TopicNames), ok = emqtt:disconnect(ConnPid), lager:debug("[iot_zd_consumer] terminate with reason: ~p", [Reason]), ok; terminate(Reason, _State) -> lager:debug("[iot_zd_consumer] terminate with reason: ~p", [Reason]), ok. %% @private %% @doc Convert process state when code is changed -spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, Extra :: term()) -> {ok, NewState :: #state{}} | {error, Reason :: term()}). code_change(_OldVsn, State = #state{}, _Extra) -> {ok, State}. %%%=================================================================== %%% Internal functions %%%=================================================================== publish_directive(#{<<"version">> := Version, <<"location_code">> := LocationCode, <<"properties">> := DirectiveParams}, RawReq) -> %% 通过LocationCode查找到主机和Device_uuid ReceiverPid = self(), case redis_client:hgetall(LocationCode) of {ok, #{<<"host_uuid">> := HostUUID, <<"device_uuid">> := DeviceUUID}} -> case iot_host:get_pid(HostUUID) of undefined -> ReceiverPid ! {directive_reply, {error, RawReq, <<"host uuid: ", HostUUID/binary, " not found">>}}; Pid when is_pid(Pid) -> ok end; {ok, Map} when is_map(Map) -> RedisData = iolist_to_binary(jiffy:encode(Map, [force_utf8])), ReceiverPid ! {directive_reply, {error, RawReq, <<"invalid redis data: ", RedisData/binary>>}}; _ -> ReceiverPid ! {directive_reply, {error, RawReq, <<"location_code: ", LocationCode/binary, " not found in redis">>}} end; publish_directive(Other, RawReq) -> lager:warning("[iot_zd_consumer] get a error message: ~p", [Other]), self() ! {directive_reply, {error, RawReq, <<"unknown directive">>}}. -spec create_consumer(Props :: list()) -> {ok, ConnPid :: pid()} | {error, Reason :: any()}. create_consumer(Props) when is_list(Props) -> Node = atom_to_binary(node()), ClientId = <<"mqtt-client-", Node/binary, "-zhongdian_mqtt_consumer">>, %% 建立到emqx服务器的连接 Host = proplists:get_value(host, Props), Port = proplists:get_value(port, Props, 18080), Username = proplists:get_value(username, Props), Password = proplists:get_value(password, Props), Keepalive = proplists:get_value(keepalive, Props, 86400), Opts = [ {clientid, ClientId}, {host, Host}, {port, Port}, {owner, self()}, {tcp_opts, []}, {username, Username}, {password, Password}, {keepalive, Keepalive}, {auto_ack, true}, {connect_timeout, 5000}, {proto_ver, v5}, {retry_interval, 5000} ], %% 建立到emqx服务器的连接 lager:debug("[iot_zd_consumer] opts is: ~p", [Opts]), case emqtt:start_link(Opts) of {ok, ConnPid} -> %% 监听和host相关的全部事件 lager:debug("[iot_zd_consumer] start conntecting, pid: ~p", [ConnPid]), {ok, _} = emqtt:connect(ConnPid), lager:debug("[iot_zd_consumer] connect success, pid: ~p", [ConnPid]), SubscribeResult = emqtt:subscribe(ConnPid, ?Topics), lager:debug("[iot_zd_consumer] subscribe topics: ~p, result is: ~p", [?Topics, SubscribeResult]), {ok, ConnPid}; ignore -> {error, ignore}; {error, Reason} -> {error, Reason} end.