%%%------------------------------------------------------------------- %%% @author aresei %%% @copyright (C) 2023, %%% @doc %%% %%% @end %%% Created : 06. 7月 2023 12:02 %%%------------------------------------------------------------------- -module(iot_zd_endpoint). -author("aresei"). -include("iot.hrl"). -behaviour(gen_statem). %% API -export([start_link/0]). -export([get_pid/0, forward/3, get_stat/0]). %% gen_statem callbacks -export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]). %% 消息重发间隔 -define(RETRY_INTERVAL, 5000). -record(state, { mqtt_opts = [], postman_pid :: undefined | pid(), logger_pid :: pid(), %% 当前数据的游标, #north_data的id cursor = 0 :: integer(), %% 定时器 timer_ref :: undefined | reference(), %% 是否繁忙 is_busy = false :: boolean(), %% 记录成功处理的消息数 acc_num = 0, queue = queue:new() }). %%%=================================================================== %%% API %%%=================================================================== -spec get_pid() -> undefined | pid(). get_pid() -> whereis(?MODULE). -spec forward(LocationCode :: binary(), Fields :: list(), Timestamp :: integer()) -> no_return(). forward(LocationCode, Fields, Timestamp) when is_binary(LocationCode), is_list(Fields); is_binary(Fields), is_integer(Timestamp) -> gen_statem:cast(?MODULE, {forward, LocationCode, Fields, Timestamp}). -spec get_stat() -> {ok, Stat :: #{}}. get_stat() -> gen_statem:call(?MODULE, get_stat, 5000). %% @doc Creates a gen_statem process which calls Module:init/1 to %% initialize. To ensure a synchronized start-up procedure, this %% function does not return until Module:init/1 has returned. start_link() -> gen_statem:start_link({local, ?MODULE}, ?MODULE, [], []). %%%=================================================================== %%% gen_statem callbacks %%%=================================================================== %% @private %% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or %% gen_statem:start_link/[3,4], this function is called by the new %% process to initialize. init([]) -> {ok, Opts} = application:get_env(iot, zhongdian), {ok, PostmanPid} = create_postman(Opts), %% 启动日志记录器 {ok, LoggerPid} = iot_logger:start_link("north_data_fix"), {ok, connected, #state{mqtt_opts = Opts, postman_pid = PostmanPid, logger_pid = LoggerPid}}. %% @private %% @doc This function is called by a gen_statem when it needs to find out %% the callback mode of the callback module. callback_mode() -> handle_event_function. %% @private %% @doc There should be one instance of this function for each possible %% state name. If callback_mode is state_functions, one of these %% functions is called when gen_statem receives and event from %% call/2, cast/2, or as a normal process message. handle_event(cast, {forward, LocationCode, Fields, Timestamp}, StateName, State = #state{is_busy = IsBusy, queue = Q}) -> Q1 = queue:in(#north_data{location_code = LocationCode, fields = Fields, timestamp = Timestamp}, Q), %% 避免不必要的内部消息 Actions = case StateName =:= connected andalso not IsBusy of true -> [{next_event, info, fetch_next}]; false -> [] end, {keep_state, State#state{queue = Q1}, Actions}; %% 触发读取下一条数据 handle_event(info, fetch_next, connected, State = #state{is_busy = true}) -> {keep_state, State}; handle_event(info, fetch_next, connected, State = #state{postman_pid = PostmanPid, queue = Q}) -> case queue:out(Q) of {{value, NorthData}, Q1} -> lager:debug("[iot_zd_endpoint] fetch_next success, north data is: ~p", [NorthData]), do_post(PostmanPid, NorthData), TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}), {keep_state, State#state{queue = Q1, timer_ref = TimerRef, is_busy = true}}; {empty, Q1} -> {keep_state, State#state{queue = Q1}} end; %% 收到确认消息 handle_event(info, {ack, Id, AssocMessage}, StateName, State = #state{timer_ref = TimerRef, acc_num = AccNum, logger_pid = LoggerPid}) -> %% 记录日志信息 iot_logger:write(LoggerPid, AssocMessage), lager:debug("[iot_zd_endpoint] get ack: ~p", [Id]), Actions = case StateName =:= connected of true -> [{next_event, info, fetch_next}]; false -> [] end, is_reference(TimerRef) andalso erlang:cancel_timer(TimerRef), {keep_state, State#state{timer_ref = undefined, acc_num = AccNum + 1, is_busy = false}, Actions}; %% 收到重发过期请求 handle_event(info, {timeout, _, {repost_ticker, NorthData}}, connected, State = #state{postman_pid = PostmanPid}) -> lager:debug("[iot_zd_endpoint] repost data: ~p", [NorthData]), do_post(PostmanPid, NorthData), TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}), {keep_state, State#state{timer_ref = TimerRef}}; %% 离线时,忽略超时逻辑 handle_event(info, {timeout, _, {repost_ticker, _}}, disconnected, State) -> {keep_state, State}; %% 获取当前统计信息 handle_event({call, From}, get_stat, StateName, State = #state{acc_num = AccNum}) -> Stat = #{ <<"acc_num">> => AccNum, <<"state_name">> => atom_to_binary(StateName) }, {keep_state, State, [{reply, From, Stat}]}; %% @private %% @doc If callback_mode is handle_event_function, then whenever a %% gen_statem receives an event from call/2, cast/2, or as a normal %% process message, this function is called. handle_event(EventType, Event, StateName, State) -> lager:warning("[iot_zd_endpoint] unknown message, event_type: ~p, event: ~p, state_name: ~p, state: ~p", [EventType, Event, StateName, State]), {keep_state, State}. %% @private %% @doc This function is called by a gen_statem when it is about to %% terminate. It should be the opposite of Module:init/1 and do any %% necessary cleaning up. When it returns, the gen_statem terminates with %% Reason. The return value is ignored. terminate(Reason, _StateName, #state{}) -> lager:debug("[iot_zd_endpoint] terminate with reason: ~p", [Reason]), ok. %% @private %% @doc Convert process state when code is changed code_change(_OldVsn, StateName, State = #state{}, _Extra) -> {ok, StateName, State}. %%%=================================================================== %%% Internal functions %%%=================================================================== %% 对mqtt协议的支持, 只需要建立单个链接 create_postman(Opts) -> Host = proplists:get_value(host, Opts), Port = proplists:get_value(port, Opts), Username = proplists:get_value(username, Opts), Password = proplists:get_value(password, Opts), Topic = proplists:get_value(topic, Opts), Qos = proplists:get_value(qos, Opts), Node = atom_to_binary(node()), ClientId = <<"mqtt-client-", Node/binary, "-zhongdian_mqtt">>, PostmanOpts = [ {clientid, ClientId}, {host, Host}, {port, Port}, {tcp_opts, []}, {username, Username}, {password, Password}, {keepalive, 86400}, {auto_ack, true}, {connect_timeout, 5000}, {proto_ver, v5}, {retry_interval, 5000} ], mqtt_postman:start_link(PostmanOpts, Topic, Qos). -spec do_post(PostmanPid :: pid(), NorthData :: #north_data{}) -> no_return(). do_post(PostmanPid, #north_data{id = Id, location_code = LocationCode, fields = Fields, timestamp = Timestamp}) when is_pid(PostmanPid) -> Data = #{ <<"version">> => <<"1.0">>, <<"location_code">> => LocationCode, <<"ts">> => Timestamp, <<"properties">> => Fields }, try Body = iolist_to_binary(jiffy:encode(Data, [force_utf8])), PostmanPid ! {post, self(), #post_data{id = Id, location_code = LocationCode, body = Body}} catch _:_ -> self() ! {ack, Id, <<"json error">>} end.