add lab mqtt endpoint

This commit is contained in:
anlicheng 2024-12-14 23:09:24 +08:00
parent 31753ce4c6
commit e6cabce4f0
3 changed files with 280 additions and 5 deletions

View File

@ -0,0 +1,268 @@
%%%-------------------------------------------------------------------
%%% @author aresei
%%% @copyright (C) 2023, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 06. 7 2023 12:02
%%%-------------------------------------------------------------------
-module(iot_school_lab_endpoint).
-author("aresei").
-include("iot.hrl").
-behaviour(gen_statem).
%% API
-export([start_link/0]).
-export([get_pid/0, forward/4, get_stat/0, get_num/1]).
%% gen_statem callbacks
-export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]).
%%
-define(RETRY_INTERVAL, 5000).
%%
-define(MAX_QUEUE_SIZE, 5_000_000).
-record(state, {
mqtt_opts = [],
postman_pid :: undefined | pid(),
logger_pid :: pid(),
%%
iot_queue,
%%
timer_ref :: undefined | reference(),
%%
is_busy = false :: boolean(),
%% ,
acc_num = #{}
}).
%%%===================================================================
%%% API
%%%===================================================================
-spec get_pid() -> undefined | pid().
get_pid() ->
whereis(?MODULE).
-spec forward(LocationCode :: binary(), DynamicLocationCode :: binary(), Fields :: list(), Timestamp :: integer()) -> no_return().
forward(LocationCode, DynamicLocationCode, Fields, Timestamp) when is_binary(LocationCode), is_binary(DynamicLocationCode), is_list(Fields); is_binary(Fields), is_integer(Timestamp) ->
gen_statem:cast(?MODULE, {forward, LocationCode, DynamicLocationCode, Fields, Timestamp}).
-spec get_stat() -> {ok, Stat :: #{}}.
get_stat() ->
gen_statem:call(?MODULE, get_stat, 5000).
-spec get_num(Date :: string()) -> {ok, Num :: integer()}.
get_num(Date) when is_list(Date) ->
gen_statem:call(?MODULE, {get_num, Date}, 5000).
%% @doc Creates a gen_statem process which calls Module:init/1 to
%% initialize. To ensure a synchronized start-up procedure, this
%% function does not return until Module:init/1 has returned.
start_link() ->
gen_statem:start_link({local, ?MODULE}, ?MODULE, [], []).
%%%===================================================================
%%% gen_statem callbacks
%%%===================================================================
%% @private
%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or
%% gen_statem:start_link/[3,4], this function is called by the new
%% process to initialize.
init([]) ->
{ok, Opts} = application:get_env(iot, school_lab),
erlang:process_flag(trap_exit, true),
%% ,
erlang:start_timer(0, self(), create_postman),
%%
{ok, LoggerPid} = iot_logger:start_link("lab_data"),
{ok, disconnected, #state{mqtt_opts = Opts, iot_queue = iot_queue:new(?MAX_QUEUE_SIZE), postman_pid = undefined, logger_pid = LoggerPid}}.
%% @private
%% @doc This function is called by a gen_statem when it needs to find out
%% the callback mode of the callback module.
callback_mode() ->
handle_event_function.
%% @private
%% @doc There should be one instance of this function for each possible
%% state name. If callback_mode is state_functions, one of these
%% functions is called when gen_statem receives and event from
%% call/2, cast/2, or as a normal process message.
handle_event(cast, {forward, LocationCode, DynamicLocationCode, Fields, Timestamp}, StateName, State = #state{is_busy = IsBusy, iot_queue = Q}) ->
case iot_queue:is_full(Q) of
true ->
lager:notice("[iot_lab_endpoint] queue is full discard data: ~p", [{LocationCode, DynamicLocationCode, Fields, Timestamp}]),
{keep_state, State};
false ->
case format_data(LocationCode, DynamicLocationCode, Fields, Timestamp) of
{ok, Body} ->
%%
Actions = case StateName =:= connected andalso not IsBusy of
true -> [{next_event, info, fetch_next}];
false -> []
end,
{keep_state, State#state{iot_queue = iot_queue:in(Body, Q)}, Actions};
error ->
{keep_state, State}
end
end;
%%
handle_event(info, fetch_next, disconnected, State) ->
lager:debug("[iot_lab_endpoint] fetch_next postman offline, data in queue"),
{keep_state, State};
handle_event(info, fetch_next, connected, State = #state{is_busy = true}) ->
{keep_state, State};
handle_event(info, fetch_next, connected, State = #state{postman_pid = PostmanPid, iot_queue = Q}) ->
case iot_queue:out(Q) of
{{value, Body}, Q1} ->
PostmanPid ! {post, self(), Body},
TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, Body}),
{keep_state, State#state{iot_queue = Q1, timer_ref = TimerRef, is_busy = true}};
{empty, _} ->
{keep_state, State}
end;
%%
handle_event(info, {ack, AssocMessage}, StateName, State = #state{timer_ref = TimerRef, acc_num = AccNum, logger_pid = LoggerPid}) ->
%%
iot_logger:write(LoggerPid, AssocMessage),
Actions = case StateName =:= connected of
true -> [{next_event, info, fetch_next}];
false -> []
end,
is_reference(TimerRef) andalso erlang:cancel_timer(TimerRef),
Date = iot_util:date(),
Num = maps:get(Date, AccNum, 0),
NAccNum = AccNum#{Date => Num + 1},
{keep_state, State#state{timer_ref = undefined, acc_num = NAccNum, is_busy = false}, Actions};
%%
handle_event(info, {timeout, _, {repost_ticker, Body}}, connected, State = #state{postman_pid = PostmanPid}) ->
lager:debug("[iot_lab_endpoint] repost data: ~p", [Body]),
PostmanPid ! {post, self(), Body},
TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, Body}),
{keep_state, State#state{timer_ref = TimerRef, is_busy = true}};
%% 线
handle_event(info, {timeout, _, {repost_ticker, _}}, disconnected, State) ->
{keep_state, State};
handle_event(info, {timeout, _, create_postman}, disconnected, State = #state{mqtt_opts = Opts}) ->
lager:debug("[iot_lab_endpoint] create postman"),
try
{ok, PostmanPid} = create_postman(Opts),
{next_state, connected, State#state{postman_pid = PostmanPid, timer_ref = undefined, is_busy = false}, [{next_event, info, fetch_next}]}
catch _:Error:Stack ->
lager:warning("[iot_lab_endpoint] config: ~p, create postman get error: ~p, stack: ~p", [Opts, Error, Stack]),
erlang:start_timer(?RETRY_INTERVAL, self(), create_postman),
{keep_state, State#state{postman_pid = undefined}}
end;
%%
handle_event({call, From}, get_stat, StateName, State = #state{acc_num = AccNum, iot_queue = Q}) ->
Stat = #{
<<"acc_num">> => AccNum,
<<"queue_num">> => iot_queue:len(Q),
<<"state_name">> => atom_to_binary(StateName)
},
{keep_state, State, [{reply, From, Stat}]};
%%
handle_event({call, From}, {get_num, Date}, _StateName, State = #state{acc_num = AccNum}) ->
{keep_state, State, [{reply, From, maps:get(Date, AccNum, 0)}]};
%% postman进程挂掉时
handle_event(info, {'EXIT', PostmanPid, Reason}, connected, State = #state{timer_ref = TimerRef, postman_pid = PostmanPid}) ->
lager:warning("[iot_lab_endpoint] postman exited with reason: ~p", [Reason]),
is_reference(TimerRef) andalso erlang:cancel_timer(TimerRef),
erlang:start_timer(?RETRY_INTERVAL, self(), create_postman),
{next_state, disconnected, State#state{timer_ref = undefined, postman_pid = undefined}};
%% @private
%% @doc If callback_mode is handle_event_function, then whenever a
%% gen_statem receives an event from call/2, cast/2, or as a normal
%% process message, this function is called.
handle_event(EventType, Event, StateName, State) ->
lager:warning("[iot_lab_endpoint] unknown message, event_type: ~p, event: ~p, state_name: ~p, state: ~p", [EventType, Event, StateName, State]),
{keep_state, State}.
%% @private
%% @doc This function is called by a gen_statem when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_statem terminates with
%% Reason. The return value is ignored.
terminate(Reason, _StateName, #state{}) ->
lager:debug("[iot_lab_endpoint] terminate with reason: ~p", [Reason]),
ok.
%% @private
%% @doc Convert process state when code is changed
code_change(_OldVsn, StateName, State = #state{}, _Extra) ->
{ok, StateName, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
%% mqtt协议的支持,
create_postman(Opts) ->
Host = proplists:get_value(host, Opts),
Port = proplists:get_value(port, Opts),
Username = proplists:get_value(username, Opts),
Password = proplists:get_value(password, Opts),
Topic0 = proplists:get_value(topic, Opts),
Qos = proplists:get_value(qos, Opts),
Node = atom_to_binary(node()),
ClientId = <<"mqtt-client-", Node/binary, "-lab_mqtt">>,
PostmanOpts = [
{clientid, ClientId},
{host, Host},
{port, Port},
{tcp_opts, []},
{username, Username},
{password, Password},
{keepalive, 86400},
{auto_ack, true},
{connect_timeout, 5000},
{proto_ver, v5},
{retry_interval, 5000}
],
mqtt_postman:start_link(PostmanOpts, list_to_binary(Topic0), Qos).
-spec format_data(LocationCode :: binary(), DynamicLocationCode :: binary(), Fields :: list() | binary(), Timestamp :: integer()) -> {ok, Body :: binary()} | error.
format_data(LocationCode, DynamicLocationCode, Fields, Timestamp) when is_binary(LocationCode), is_binary(DynamicLocationCode), is_list(Fields); is_binary(Fields), is_integer(Timestamp) ->
Data = #{
<<"version">> => <<"1.0">>,
<<"location_code">> => LocationCode,
<<"dynamic_location_code">> => DynamicLocationCode,
<<"ts">> => Timestamp,
<<"properties">> => Fields
},
try
Body = iolist_to_binary(jiffy:encode(Data, [force_utf8])),
{ok, Body}
catch _:Reason ->
lager:warning("[iot_zd_endpoint] location_code: ~p, format_data get error: ~p", [LocationCode, Reason]),
error
end.

View File

@ -59,6 +59,16 @@
{qos, 2} {qos, 2}
]}, ]},
%% 配置实验室电表的数据转发, mqtt协议
{school_lab, [
{host, "39.98.184.67"},
{port, 1883},
{username, "test"},
{password, "test1234"},
{topic, "CET/NX/upload"},
{qos, 2}
]},
%% 金智调度系统 %% 金智调度系统
{jinzhi, [ {jinzhi, [
{pri_key, "jinzhi_pri.key"}, {pri_key, "jinzhi_pri.key"},

View File

@ -3,8 +3,5 @@
1. 事件需要分离需要通过post发送到对端 点位信息需要替换成中文, event_code也需要 1. 事件需要分离需要通过post发送到对端 点位信息需要替换成中文, event_code也需要
2. 有文档信息,按照文档 2. 有文档信息,按照文档
1. 需要增加时间限制多次上报5分钟内只报一次基于事件id来做过滤 done!
1. 需要增加时间限制多次上报5分钟内只报一次基于事件id来做过滤 2. 实验室的电表推送 todo mqtt
2. 实验室的电表推送 todo mqtt
实现一类推送器!! http的 mqtt的