diff --git a/apps/iot/src/endpoint/iot_school_lab_endpoint.erl b/apps/iot/src/endpoint/iot_school_lab_endpoint.erl new file mode 100644 index 0000000..25988a9 --- /dev/null +++ b/apps/iot/src/endpoint/iot_school_lab_endpoint.erl @@ -0,0 +1,268 @@ +%%%------------------------------------------------------------------- +%%% @author aresei +%%% @copyright (C) 2023, +%%% @doc +%%% 实验室数据推送 +%%% @end +%%% Created : 06. 7月 2023 12:02 +%%%------------------------------------------------------------------- +-module(iot_school_lab_endpoint). +-author("aresei"). +-include("iot.hrl"). + +-behaviour(gen_statem). + +%% API +-export([start_link/0]). +-export([get_pid/0, forward/4, get_stat/0, get_num/1]). + +%% gen_statem callbacks +-export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]). + +%% 消息重发间隔 +-define(RETRY_INTERVAL, 5000). + +%% 最大数据缓冲量 +-define(MAX_QUEUE_SIZE, 5_000_000). + +-record(state, { + mqtt_opts = [], + postman_pid :: undefined | pid(), + logger_pid :: pid(), + + %% 数据缓存队列 + iot_queue, + + %% 定时器 + timer_ref :: undefined | reference(), + %% 是否繁忙 + is_busy = false :: boolean(), + + %% 记录成功处理的消息数, 记日期和总数的映射关系 + acc_num = #{} +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +-spec get_pid() -> undefined | pid(). +get_pid() -> + whereis(?MODULE). + +-spec forward(LocationCode :: binary(), DynamicLocationCode :: binary(), Fields :: list(), Timestamp :: integer()) -> no_return(). +forward(LocationCode, DynamicLocationCode, Fields, Timestamp) when is_binary(LocationCode), is_binary(DynamicLocationCode), is_list(Fields); is_binary(Fields), is_integer(Timestamp) -> + gen_statem:cast(?MODULE, {forward, LocationCode, DynamicLocationCode, Fields, Timestamp}). + +-spec get_stat() -> {ok, Stat :: #{}}. +get_stat() -> + gen_statem:call(?MODULE, get_stat, 5000). + +-spec get_num(Date :: string()) -> {ok, Num :: integer()}. +get_num(Date) when is_list(Date) -> + gen_statem:call(?MODULE, {get_num, Date}, 5000). + +%% @doc Creates a gen_statem process which calls Module:init/1 to +%% initialize. To ensure a synchronized start-up procedure, this +%% function does not return until Module:init/1 has returned. +start_link() -> + gen_statem:start_link({local, ?MODULE}, ?MODULE, [], []). + +%%%=================================================================== +%%% gen_statem callbacks +%%%=================================================================== + +%% @private +%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or +%% gen_statem:start_link/[3,4], this function is called by the new +%% process to initialize. +init([]) -> + {ok, Opts} = application:get_env(iot, school_lab), + + erlang:process_flag(trap_exit, true), + %% 创建转发器, 避免阻塞当前进程的创建,因此采用了延时初始化的机制 + erlang:start_timer(0, self(), create_postman), + %% 启动日志记录器 + {ok, LoggerPid} = iot_logger:start_link("lab_data"), + + {ok, disconnected, #state{mqtt_opts = Opts, iot_queue = iot_queue:new(?MAX_QUEUE_SIZE), postman_pid = undefined, logger_pid = LoggerPid}}. + +%% @private +%% @doc This function is called by a gen_statem when it needs to find out +%% the callback mode of the callback module. +callback_mode() -> + handle_event_function. + +%% @private +%% @doc There should be one instance of this function for each possible +%% state name. If callback_mode is state_functions, one of these +%% functions is called when gen_statem receives and event from +%% call/2, cast/2, or as a normal process message. + +handle_event(cast, {forward, LocationCode, DynamicLocationCode, Fields, Timestamp}, StateName, State = #state{is_busy = IsBusy, iot_queue = Q}) -> + case iot_queue:is_full(Q) of + true -> + lager:notice("[iot_lab_endpoint] queue is full discard data: ~p", [{LocationCode, DynamicLocationCode, Fields, Timestamp}]), + {keep_state, State}; + false -> + case format_data(LocationCode, DynamicLocationCode, Fields, Timestamp) of + {ok, Body} -> + %% 避免不必要的内部消息 + Actions = case StateName =:= connected andalso not IsBusy of + true -> [{next_event, info, fetch_next}]; + false -> [] + end, + {keep_state, State#state{iot_queue = iot_queue:in(Body, Q)}, Actions}; + error -> + {keep_state, State} + end + end; + +%% 触发读取下一条数据 +handle_event(info, fetch_next, disconnected, State) -> + lager:debug("[iot_lab_endpoint] fetch_next postman offline, data in queue"), + {keep_state, State}; +handle_event(info, fetch_next, connected, State = #state{is_busy = true}) -> + {keep_state, State}; +handle_event(info, fetch_next, connected, State = #state{postman_pid = PostmanPid, iot_queue = Q}) -> + case iot_queue:out(Q) of + {{value, Body}, Q1} -> + PostmanPid ! {post, self(), Body}, + TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, Body}), + + {keep_state, State#state{iot_queue = Q1, timer_ref = TimerRef, is_busy = true}}; + {empty, _} -> + {keep_state, State} + end; + +%% 收到确认消息 +handle_event(info, {ack, AssocMessage}, StateName, State = #state{timer_ref = TimerRef, acc_num = AccNum, logger_pid = LoggerPid}) -> + %% 记录日志信息 + iot_logger:write(LoggerPid, AssocMessage), + + Actions = case StateName =:= connected of + true -> [{next_event, info, fetch_next}]; + false -> [] + end, + is_reference(TimerRef) andalso erlang:cancel_timer(TimerRef), + + Date = iot_util:date(), + Num = maps:get(Date, AccNum, 0), + NAccNum = AccNum#{Date => Num + 1}, + + {keep_state, State#state{timer_ref = undefined, acc_num = NAccNum, is_busy = false}, Actions}; + +%% 收到重发过期请求 +handle_event(info, {timeout, _, {repost_ticker, Body}}, connected, State = #state{postman_pid = PostmanPid}) -> + lager:debug("[iot_lab_endpoint] repost data: ~p", [Body]), + PostmanPid ! {post, self(), Body}, + TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, Body}), + + {keep_state, State#state{timer_ref = TimerRef, is_busy = true}}; + +%% 离线时,忽略超时逻辑 +handle_event(info, {timeout, _, {repost_ticker, _}}, disconnected, State) -> + {keep_state, State}; + +handle_event(info, {timeout, _, create_postman}, disconnected, State = #state{mqtt_opts = Opts}) -> + lager:debug("[iot_lab_endpoint] create postman"), + try + {ok, PostmanPid} = create_postman(Opts), + {next_state, connected, State#state{postman_pid = PostmanPid, timer_ref = undefined, is_busy = false}, [{next_event, info, fetch_next}]} + catch _:Error:Stack -> + lager:warning("[iot_lab_endpoint] config: ~p, create postman get error: ~p, stack: ~p", [Opts, Error, Stack]), + erlang:start_timer(?RETRY_INTERVAL, self(), create_postman), + + {keep_state, State#state{postman_pid = undefined}} + end; + +%% 获取当前统计信息 +handle_event({call, From}, get_stat, StateName, State = #state{acc_num = AccNum, iot_queue = Q}) -> + Stat = #{ + <<"acc_num">> => AccNum, + <<"queue_num">> => iot_queue:len(Q), + <<"state_name">> => atom_to_binary(StateName) + }, + {keep_state, State, [{reply, From, Stat}]}; + +%% 获取当前统计信息 +handle_event({call, From}, {get_num, Date}, _StateName, State = #state{acc_num = AccNum}) -> + {keep_state, State, [{reply, From, maps:get(Date, AccNum, 0)}]}; + +%% postman进程挂掉时,重新建立新的 +handle_event(info, {'EXIT', PostmanPid, Reason}, connected, State = #state{timer_ref = TimerRef, postman_pid = PostmanPid}) -> + lager:warning("[iot_lab_endpoint] postman exited with reason: ~p", [Reason]), + is_reference(TimerRef) andalso erlang:cancel_timer(TimerRef), + erlang:start_timer(?RETRY_INTERVAL, self(), create_postman), + + {next_state, disconnected, State#state{timer_ref = undefined, postman_pid = undefined}}; + +%% @private +%% @doc If callback_mode is handle_event_function, then whenever a +%% gen_statem receives an event from call/2, cast/2, or as a normal +%% process message, this function is called. +handle_event(EventType, Event, StateName, State) -> + lager:warning("[iot_lab_endpoint] unknown message, event_type: ~p, event: ~p, state_name: ~p, state: ~p", [EventType, Event, StateName, State]), + {keep_state, State}. + +%% @private +%% @doc This function is called by a gen_statem when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_statem terminates with +%% Reason. The return value is ignored. +terminate(Reason, _StateName, #state{}) -> + lager:debug("[iot_lab_endpoint] terminate with reason: ~p", [Reason]), + ok. + +%% @private +%% @doc Convert process state when code is changed +code_change(_OldVsn, StateName, State = #state{}, _Extra) -> + {ok, StateName, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +%% 对mqtt协议的支持, 只需要建立单个链接 +create_postman(Opts) -> + Host = proplists:get_value(host, Opts), + Port = proplists:get_value(port, Opts), + Username = proplists:get_value(username, Opts), + Password = proplists:get_value(password, Opts), + Topic0 = proplists:get_value(topic, Opts), + Qos = proplists:get_value(qos, Opts), + + Node = atom_to_binary(node()), + ClientId = <<"mqtt-client-", Node/binary, "-lab_mqtt">>, + PostmanOpts = [ + {clientid, ClientId}, + {host, Host}, + {port, Port}, + {tcp_opts, []}, + {username, Username}, + {password, Password}, + {keepalive, 86400}, + {auto_ack, true}, + {connect_timeout, 5000}, + {proto_ver, v5}, + {retry_interval, 5000} + ], + + mqtt_postman:start_link(PostmanOpts, list_to_binary(Topic0), Qos). + +-spec format_data(LocationCode :: binary(), DynamicLocationCode :: binary(), Fields :: list() | binary(), Timestamp :: integer()) -> {ok, Body :: binary()} | error. +format_data(LocationCode, DynamicLocationCode, Fields, Timestamp) when is_binary(LocationCode), is_binary(DynamicLocationCode), is_list(Fields); is_binary(Fields), is_integer(Timestamp) -> + Data = #{ + <<"version">> => <<"1.0">>, + <<"location_code">> => LocationCode, + <<"dynamic_location_code">> => DynamicLocationCode, + <<"ts">> => Timestamp, + <<"properties">> => Fields + }, + try + Body = iolist_to_binary(jiffy:encode(Data, [force_utf8])), + {ok, Body} + catch _:Reason -> + lager:warning("[iot_zd_endpoint] location_code: ~p, format_data get error: ~p", [LocationCode, Reason]), + error + end. \ No newline at end of file diff --git a/config/sys-dev.config b/config/sys-dev.config index cfa7ceb..49cb86f 100644 --- a/config/sys-dev.config +++ b/config/sys-dev.config @@ -59,6 +59,16 @@ {qos, 2} ]}, + %% 配置实验室电表的数据转发, mqtt协议 + {school_lab, [ + {host, "39.98.184.67"}, + {port, 1883}, + {username, "test"}, + {password, "test1234"}, + {topic, "CET/NX/upload"}, + {qos, 2} + ]}, + %% 金智调度系统 {jinzhi, [ {pri_key, "jinzhi_pri.key"}, diff --git a/docs/todo.txt b/docs/todo.txt index 9e1bdfb..20b587a 100644 --- a/docs/todo.txt +++ b/docs/todo.txt @@ -3,8 +3,5 @@ 1. 事件需要分离,需要通过post发送到对端; 点位信息需要替换成中文, event_code也需要 2. 有文档信息,按照文档 - -1. 需要增加时间限制,多次上报,5分钟内只报一次;基于事件id来做过滤 -2. 实验室的电表推送 todo mqtt - - 实现一类推送器!! http的 mqtt的 \ No newline at end of file +1. 需要增加时间限制,多次上报,5分钟内只报一次;基于事件id来做过滤 done! +2. 实验室的电表推送 todo mqtt \ No newline at end of file