简化模型
This commit is contained in:
parent
9f66f6c5ba
commit
2eb97210f4
@ -1,170 +0,0 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% @author aresei
|
||||
%%% @copyright (C) 2023, <COMPANY>
|
||||
%%% @doc
|
||||
%%%
|
||||
%%% @end
|
||||
%%% Created : 06. 7月 2023 12:02
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(iot_http_endpoint).
|
||||
-author("aresei").
|
||||
-include("iot.hrl").
|
||||
|
||||
-behaviour(gen_statem).
|
||||
|
||||
%% API
|
||||
-export([start_link/2]).
|
||||
-export([get_pid/1, forward/4, get_stat/0]).
|
||||
|
||||
%% gen_statem callbacks
|
||||
-export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]).
|
||||
|
||||
%% 消息重发间隔
|
||||
-define(RETRY_INTERVAL, 5000).
|
||||
|
||||
-record(state, {
|
||||
postman_pid :: undefined | pid(),
|
||||
pool_size = 0,
|
||||
flight_num = 0,
|
||||
id = 1,
|
||||
queue :: queue:queue(),
|
||||
%% 定时器对应关系
|
||||
timer_map = #{},
|
||||
%% 记录成功处理的消息数
|
||||
acc_num = 0
|
||||
}).
|
||||
|
||||
%%%===================================================================
|
||||
%%% API
|
||||
%%%===================================================================
|
||||
|
||||
-spec get_pid(Name :: atom()) -> undefined | pid().
|
||||
get_pid(Name) when is_atom(Name) ->
|
||||
whereis(Name).
|
||||
|
||||
-spec forward(Pid :: pid(), LocationCode :: binary(), EventType :: integer(), Params :: map()) -> no_return().
|
||||
forward(Pid, LocationCode, EventType, Params) when is_pid(Pid), is_binary(LocationCode), is_integer(EventType), is_map(Params) ->
|
||||
gen_statem:cast(Pid, {forward, LocationCode, EventType, Params}).
|
||||
|
||||
-spec get_stat() -> {ok, Stat :: #{}}.
|
||||
get_stat() ->
|
||||
gen_statem:call(?MODULE, get_stat, 5000).
|
||||
|
||||
%% @doc Creates a gen_statem process which calls Module:init/1 to
|
||||
%% initialize. To ensure a synchronized start-up procedure, this
|
||||
%% function does not return until Module:init/1 has returned.
|
||||
start_link(Name, Opts) when is_atom(Name), is_list(Opts) ->
|
||||
gen_statem:start_link({local, Name}, ?MODULE, [Opts], []).
|
||||
|
||||
%%%===================================================================
|
||||
%%% gen_statem callbacks
|
||||
%%%===================================================================
|
||||
|
||||
%% @private
|
||||
%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or
|
||||
%% gen_statem:start_link/[3,4], this function is called by the new
|
||||
%% process to initialize.
|
||||
init([Opts]) ->
|
||||
PoolSize = proplists:get_value(pool_size, Opts),
|
||||
{ok, PostmanPid} = broker_postman:start_link(http_postman, Opts, PoolSize),
|
||||
|
||||
{ok, connected, #state{postman_pid = PostmanPid, pool_size = PoolSize, queue = queue:new()}}.
|
||||
|
||||
%% @private
|
||||
%% @doc This function is called by a gen_statem when it needs to find out
|
||||
%% the callback mode of the callback module.
|
||||
callback_mode() ->
|
||||
handle_event_function.
|
||||
|
||||
%% @private
|
||||
%% @doc There should be one instance of this function for each possible
|
||||
%% state name. If callback_mode is state_functions, one of these
|
||||
%% functions is called when gen_statem receives and event from
|
||||
%% call/2, cast/2, or as a normal process message.
|
||||
|
||||
handle_event(cast, {forward, LocationCode, EventType, Params}, _, State = #state{id = Id, flight_num = FlightNum, pool_size = PoolSize, queue = Q}) ->
|
||||
EventData = #event_data{id = Id, location_code = LocationCode, event_type = EventType, params = Params},
|
||||
%% 避免不必要的内部消息
|
||||
Actions = case FlightNum < PoolSize of
|
||||
true -> [{next_event, info, fetch_next}];
|
||||
false -> []
|
||||
end,
|
||||
{keep_state, State#state{queue = queue:in(EventData, Q), id = Id + 1}, Actions};
|
||||
|
||||
%% 触发读取下一条数据
|
||||
handle_event(info, fetch_next, _, State = #state{postman_pid = PostmanPid, queue = Q, flight_num = FlightNum, timer_map = TimerMap}) ->
|
||||
case queue:out(Q) of
|
||||
{{value, EventData = #event_data{id = Id}}, Q1} ->
|
||||
lager:debug("[iot_http_endpoint] fetch_next success, event data is: ~p", [EventData]),
|
||||
do_post(PostmanPid, EventData),
|
||||
TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, EventData}),
|
||||
|
||||
{keep_state, State#state{timer_map = maps:put(Id, TimerRef, TimerMap), queue = Q1, flight_num = FlightNum + 1}};
|
||||
{empty, Q1} ->
|
||||
{keep_state, State#state{queue = Q1}}
|
||||
end;
|
||||
|
||||
%% 收到确认消息
|
||||
handle_event(info, {ack, Id}, _, State = #state{timer_map = TimerMap, acc_num = AccNum, flight_num = FlightNum}) ->
|
||||
lager:debug("[iot_zd_endpoint] get ack: ~p", [Id]),
|
||||
case maps:take(Id, TimerMap) of
|
||||
error ->
|
||||
{keep_state, State#state{acc_num = AccNum + 1, flight_num = FlightNum - 1}, [{next_event, info, fetch_next}]};
|
||||
{TimerRef, NTimerMap} ->
|
||||
is_reference(TimerRef) andalso erlang:cancel_timer(TimerRef),
|
||||
{keep_state, State#state{timer_map = NTimerMap, acc_num = AccNum + 1, flight_num = FlightNum - 1}, [{next_event, info, fetch_next}]}
|
||||
end;
|
||||
|
||||
%% 收到重发过期请求
|
||||
handle_event(info, {timeout, _, {repost_ticker, EventData = #event_data{id = Id}}}, _, State = #state{postman_pid = PostmanPid, timer_map = TimerMap}) ->
|
||||
lager:debug("[iot_zd_endpoint] repost data: ~p", [EventData]),
|
||||
do_post(PostmanPid, EventData),
|
||||
TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, EventData}),
|
||||
|
||||
{keep_state, State#state{timer_map = maps:put(Id, TimerRef, TimerMap)}};
|
||||
|
||||
%% 获取当前统计信息
|
||||
handle_event({call, From}, get_stat, StateName, State = #state{acc_num = AccNum}) ->
|
||||
Stat = #{
|
||||
<<"acc_num">> => AccNum,
|
||||
<<"queue_num">> => mnesia_queue:table_size(),
|
||||
<<"state_name">> => atom_to_binary(StateName)
|
||||
},
|
||||
{keep_state, State, [{reply, From, Stat}]};
|
||||
|
||||
%% @private
|
||||
%% @doc If callback_mode is handle_event_function, then whenever a
|
||||
%% gen_statem receives an event from call/2, cast/2, or as a normal
|
||||
%% process message, this function is called.
|
||||
handle_event(EventType, Event, StateName, State) ->
|
||||
lager:warning("[iot_zd_endpoint] unknown message, event_type: ~p, event: ~p, state_name: ~p, state: ~p", [EventType, Event, StateName, State]),
|
||||
{keep_state, State}.
|
||||
|
||||
%% @private
|
||||
%% @doc This function is called by a gen_statem when it is about to
|
||||
%% terminate. It should be the opposite of Module:init/1 and do any
|
||||
%% necessary cleaning up. When it returns, the gen_statem terminates with
|
||||
%% Reason. The return value is ignored.
|
||||
terminate(Reason, _StateName, #state{}) ->
|
||||
lager:debug("[iot_zd_endpoint] terminate with reason: ~p", [Reason]),
|
||||
ok.
|
||||
|
||||
%% @private
|
||||
%% @doc Convert process state when code is changed
|
||||
code_change(_OldVsn, StateName, State = #state{}, _Extra) ->
|
||||
{ok, StateName, State}.
|
||||
|
||||
%%%===================================================================
|
||||
%%% Internal functions
|
||||
%%%===================================================================
|
||||
|
||||
-spec do_post(PostmanPid :: pid(), EventData :: #event_data{}) -> no_return().
|
||||
do_post(PostmanPid, #event_data{id = Id, location_code = LocationCode, event_type = EventType, params = Params}) when is_pid(PostmanPid) ->
|
||||
Data = #{
|
||||
<<"version">> => <<"1.0">>,
|
||||
<<"event_type">> => EventType,
|
||||
<<"params">> => Params
|
||||
},
|
||||
Body = iolist_to_binary(jiffy:encode(Data, [force_utf8])),
|
||||
PostmanPid ! {post, self(), #post_data{id = Id, location_code = LocationCode, body = Body}},
|
||||
ok.
|
||||
@ -1,273 +0,0 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% @author aresei
|
||||
%%% @copyright (C) 2023, <COMPANY>
|
||||
%%% @doc
|
||||
%%%
|
||||
%%% @end
|
||||
%%% Created : 06. 7月 2023 12:02
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(iot_jinzhi_endpoint).
|
||||
-author("aresei").
|
||||
-include("iot.hrl").
|
||||
|
||||
-behaviour(gen_statem).
|
||||
|
||||
%% API
|
||||
-export([start_link/0]).
|
||||
-export([get_pid/0, forward/3, get_stat/0]).
|
||||
|
||||
%% gen_statem callbacks
|
||||
-export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]).
|
||||
|
||||
%% 消息重发间隔
|
||||
-define(RETRY_INTERVAL, 5000).
|
||||
%% 系统id
|
||||
-define(SYS_ID, <<"ZNWLZJJKXT">>).
|
||||
|
||||
-record(state, {
|
||||
url :: string(),
|
||||
logger_pid :: pid(),
|
||||
pool_size = 0,
|
||||
flight_num = 0,
|
||||
pri_key :: public_key:private_key(),
|
||||
id = 1,
|
||||
queue :: queue:queue(),
|
||||
%% 定时器对应关系
|
||||
timer_map = #{},
|
||||
%% 记录成功处理的消息数
|
||||
acc_num = 0
|
||||
}).
|
||||
|
||||
%%%===================================================================
|
||||
%%% API
|
||||
%%%===================================================================
|
||||
|
||||
-spec get_pid() -> undefined | pid().
|
||||
get_pid() ->
|
||||
whereis(?MODULE).
|
||||
|
||||
-spec forward(LocationCode :: binary(), EventType :: integer(), Params :: map()) -> no_return().
|
||||
forward(LocationCode, EventType, Params) when is_binary(LocationCode), is_integer(EventType), is_map(Params) ->
|
||||
gen_statem:cast(?MODULE, {forward, LocationCode, EventType, Params}).
|
||||
|
||||
-spec get_stat() -> {ok, Stat :: #{}}.
|
||||
get_stat() ->
|
||||
gen_statem:call(?MODULE, get_stat, 5000).
|
||||
|
||||
%% @doc Creates a gen_statem process which calls Module:init/1 to
|
||||
%% initialize. To ensure a synchronized start-up procedure, this
|
||||
%% function does not return until Module:init/1 has returned.
|
||||
start_link() ->
|
||||
gen_statem:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||
|
||||
%%%===================================================================
|
||||
%%% gen_statem callbacks
|
||||
%%%===================================================================
|
||||
|
||||
%% @private
|
||||
%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or
|
||||
%% gen_statem:start_link/[3,4], this function is called by the new
|
||||
%% process to initialize.
|
||||
init([]) ->
|
||||
{ok, Opts} = application:get_env(iot, jinzhi),
|
||||
|
||||
PoolSize = proplists:get_value(pool_size, Opts),
|
||||
PriFile = proplists:get_value(pri_key, Opts),
|
||||
Url = proplists:get_value(url, Opts),
|
||||
|
||||
{ok, LoggerPid} = iot_logger:start_link("ai_event_data"),
|
||||
PriKey = generate_private_key(PriFile),
|
||||
|
||||
{ok, connected, #state{url = Url, logger_pid = LoggerPid, pri_key = PriKey, pool_size = PoolSize, queue = queue:new()}}.
|
||||
|
||||
%% @private
|
||||
%% @doc This function is called by a gen_statem when it needs to find out
|
||||
%% the callback mode of the callback module.
|
||||
callback_mode() ->
|
||||
handle_event_function.
|
||||
|
||||
%% @private
|
||||
%% @doc There should be one instance of this function for each possible
|
||||
%% state name. If callback_mode is state_functions, one of these
|
||||
%% functions is called when gen_statem receives and event from
|
||||
%% call/2, cast/2, or as a normal process message.
|
||||
|
||||
handle_event(cast, {forward, LocationCode, EventType, Params}, _, State = #state{id = Id, flight_num = FlightNum, pool_size = PoolSize, queue = Q}) ->
|
||||
EventData = #event_data{id = Id, location_code = LocationCode, event_type = EventType, params = Params},
|
||||
%% 避免不必要的内部消息
|
||||
Actions = case FlightNum < PoolSize of
|
||||
true -> [{next_event, info, fetch_next}];
|
||||
false -> []
|
||||
end,
|
||||
{keep_state, State#state{queue = queue:in(EventData, Q), id = Id + 1}, Actions};
|
||||
|
||||
%% 触发读取下一条数据
|
||||
handle_event(info, fetch_next, _, State = #state{queue = Q, flight_num = FlightNum, timer_map = TimerMap}) ->
|
||||
case queue:out(Q) of
|
||||
{{value, EventData = #event_data{id = Id}}, Q1} ->
|
||||
lager:debug("[iot_http_endpoint] fetch_next success, event data is: ~p", [EventData]),
|
||||
catch do_post(EventData, State),
|
||||
TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, EventData}),
|
||||
|
||||
{keep_state, State#state{timer_map = maps:put(Id, TimerRef, TimerMap), queue = Q1, flight_num = FlightNum + 1}};
|
||||
{empty, Q1} ->
|
||||
{keep_state, State#state{queue = Q1}}
|
||||
end;
|
||||
|
||||
%% 收到确认消息
|
||||
handle_event(info, {ack, Id}, _, State = #state{timer_map = TimerMap, acc_num = AccNum, flight_num = FlightNum}) ->
|
||||
case maps:take(Id, TimerMap) of
|
||||
error ->
|
||||
{keep_state, State#state{acc_num = AccNum + 1, flight_num = FlightNum - 1}, [{next_event, info, fetch_next}]};
|
||||
{TimerRef, NTimerMap} ->
|
||||
is_reference(TimerRef) andalso erlang:cancel_timer(TimerRef),
|
||||
{keep_state, State#state{timer_map = NTimerMap, acc_num = AccNum + 1, flight_num = FlightNum - 1}, [{next_event, info, fetch_next}]}
|
||||
end;
|
||||
|
||||
%% 收到重发过期请求
|
||||
handle_event(info, {timeout, _, {repost_ticker, EventData = #event_data{id = Id}}}, _, State = #state{timer_map = TimerMap}) ->
|
||||
lager:debug("[iot_jinzhi_endpoint] repost data: ~p", [EventData]),
|
||||
catch do_post(EventData, State),
|
||||
TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, EventData}),
|
||||
|
||||
{keep_state, State#state{timer_map = maps:put(Id, TimerRef, TimerMap)}};
|
||||
|
||||
%% Task进程挂掉
|
||||
handle_event(info, {'DOWN', _MRef, process, _Pid, normal}, _, State) ->
|
||||
{keep_state, State};
|
||||
|
||||
handle_event(info, {'DOWN', _MRef, process, _Pid, Reason}, _, State) ->
|
||||
lager:notice("[iot_jinzhi_endpoint] task process down with reason: ~p", [Reason]),
|
||||
{keep_state, State};
|
||||
|
||||
%% 获取当前统计信息
|
||||
handle_event({call, From}, get_stat, StateName, State = #state{acc_num = AccNum}) ->
|
||||
Stat = #{
|
||||
<<"acc_num">> => AccNum,
|
||||
<<"queue_num">> => mnesia_queue:table_size(),
|
||||
<<"state_name">> => atom_to_binary(StateName)
|
||||
},
|
||||
{keep_state, State, [{reply, From, Stat}]};
|
||||
|
||||
%% @private
|
||||
%% @doc If callback_mode is handle_event_function, then whenever a
|
||||
%% gen_statem receives an event from call/2, cast/2, or as a normal
|
||||
%% process message, this function is called.
|
||||
handle_event(EventType, Event, StateName, State) ->
|
||||
lager:warning("[iot_jinzhi_endpoint] unknown message, event_type: ~p, event: ~p, state_name: ~p, state: ~p", [EventType, Event, StateName, State]),
|
||||
{keep_state, State}.
|
||||
|
||||
%% @private
|
||||
%% @doc This function is called by a gen_statem when it is about to
|
||||
%% terminate. It should be the opposite of Module:init/1 and do any
|
||||
%% necessary cleaning up. When it returns, the gen_statem terminates with
|
||||
%% Reason. The return value is ignored.
|
||||
terminate(Reason, _StateName, #state{}) ->
|
||||
lager:debug("[iot_jinzhi_endpoint] terminate with reason: ~p", [Reason]),
|
||||
ok.
|
||||
|
||||
%% @private
|
||||
%% @doc Convert process state when code is changed
|
||||
code_change(_OldVsn, StateName, State = #state{}, _Extra) ->
|
||||
{ok, StateName, State}.
|
||||
|
||||
%%%===================================================================
|
||||
%%% Internal functions
|
||||
%%%===================================================================
|
||||
|
||||
-spec do_post(EventData :: #event_data{}, State :: #state{}) -> no_return().
|
||||
do_post(#event_data{id = Id, location_code = LocationCode, event_type = EventType,
|
||||
params = Params = #{<<"event_code">> := EventCode, <<"description">> := Description, <<"datetime">> := Datetime, <<"attachments">> := Attachments0}},
|
||||
#state{pri_key = PriKey, url = Url, logger_pid = LoggerPid}) ->
|
||||
|
||||
% <<"occurrenceTime">> => <<"2023-06-10 12:00:00">>,
|
||||
|
||||
Attachments = lists:map(fun(#{<<"filename">> := Filename}) ->
|
||||
{ok, FileUrl} = iot_util:file_uri(Filename),
|
||||
Name = filename:basename(FileUrl),
|
||||
#{<<"name">> => Name, <<"url">> => FileUrl}
|
||||
end, Attachments0),
|
||||
|
||||
DeviceInfo = #{
|
||||
<<"location">> => LocationCode,
|
||||
<<"category">> => EventCode,
|
||||
<<"description">> => Description,
|
||||
<<"occurrenceTime">> => Datetime,
|
||||
<<"attachments">> => Attachments
|
||||
},
|
||||
|
||||
ReqData = #{
|
||||
<<"sign">> => sign(DeviceInfo, PriKey),
|
||||
<<"sysId">> => ?SYS_ID,
|
||||
<<"deviceInfo">> => DeviceInfo
|
||||
},
|
||||
Body = iolist_to_binary(jiffy:encode(ReqData, [force_utf8])),
|
||||
|
||||
lager:debug("[iot_jinzhi_endpoint] do_post url: ~p, event_type: ~p, params: ~p, location_code: ~p, body: ~p", [Url, EventType, Params, LocationCode, Body]),
|
||||
|
||||
ReceiverPid = self(),
|
||||
%% 异步提交任务
|
||||
spawn_monitor(fun() ->
|
||||
Headers = [
|
||||
{<<"content-type">>, <<"application/json">>}
|
||||
],
|
||||
case hackney:request(post, Url, Headers, Body, [{pool, false}]) of
|
||||
{ok, 200, _, ClientRef} ->
|
||||
{ok, RespBody} = hackney:body(ClientRef),
|
||||
hackney:close(ClientRef),
|
||||
iot_logger:write(LoggerPid, [Body, RespBody]),
|
||||
ReceiverPid ! {ack, Id};
|
||||
{ok, HttpCode, _, ClientRef} ->
|
||||
{ok, RespBody} = hackney:body(ClientRef),
|
||||
hackney:close(ClientRef),
|
||||
lager:warning("[iot_jinzhi_endpoint] send body: ~p, get error is: ~p", [Body, {HttpCode, RespBody}]);
|
||||
{error, Reason} ->
|
||||
lager:warning("[iot_jinzhi_endpoint] send body: ~p, get error is: ~p", [Body, Reason])
|
||||
end
|
||||
end).
|
||||
|
||||
-spec generate_private_key(PriFile :: string()) -> public_key:private_key().
|
||||
generate_private_key(PriFile) when is_list(PriFile) ->
|
||||
PriKeyFile = code:priv_dir(iot) ++ "/" ++ PriFile,
|
||||
%% 私钥保存解析后的
|
||||
{ok, PriKeyData} = file:read_file(PriKeyFile),
|
||||
PriDerData = base64:decode(PriKeyData),
|
||||
public_key:der_decode('PrivateKeyInfo', PriDerData).
|
||||
|
||||
%% 数据签名
|
||||
-spec sign(M :: #{}, PrivateKey :: public_key:private_key()) -> binary().
|
||||
sign(M, PrivateKey) when is_map(M) ->
|
||||
Json = serialize(M),
|
||||
Hash = iolist_to_binary(io_lib:format("~64.16.0b", [binary:decode_unsigned(crypto:hash(sha256, Json))])),
|
||||
RsaEncoded = public_key:encrypt_private(Hash, PrivateKey),
|
||||
|
||||
base64:encode(RsaEncoded).
|
||||
|
||||
%% 简单的序列号,sign签名
|
||||
-spec serialize(M :: map()) -> JsonString :: binary().
|
||||
serialize(M) when is_map(M) ->
|
||||
L = maps:to_list(M),
|
||||
L1 = lists:sort(fun({K, _}, {K1, _}) -> K < K1 end, L),
|
||||
serialize(L1, []).
|
||||
serialize([], Target) ->
|
||||
B = iolist_to_binary(lists:join(<<$,>>, lists:reverse(Target))),
|
||||
<<${, B/binary, $}>>;
|
||||
serialize([{K, V}|T], Target) ->
|
||||
V1 = if
|
||||
is_integer(V) ->
|
||||
integer_to_binary(V);
|
||||
is_float(V) ->
|
||||
float_to_binary(V);
|
||||
is_binary(V) ->
|
||||
<<$", V/binary, $">>;
|
||||
is_boolean(V) andalso V ->
|
||||
<<"true">>;
|
||||
is_boolean(V) andalso not V ->
|
||||
<<"false">>;
|
||||
is_list(V) ->
|
||||
Items = lists:map(fun(E) -> serialize(E) end, V),
|
||||
V0 = iolist_to_binary(lists:join(<<$,>>, Items)),
|
||||
<<$[, V0/binary, $]>>
|
||||
end,
|
||||
Item = <<$", K/binary, $", $:, V1/binary>>,
|
||||
serialize(T, [Item|Target]).
|
||||
@ -1,260 +0,0 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% @author aresei
|
||||
%%% @copyright (C) 2023, <COMPANY>
|
||||
%%% @doc
|
||||
%%%
|
||||
%%% @end
|
||||
%%% Created : 06. 7月 2023 12:02
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(iot_zd_endpoint).
|
||||
-author("aresei").
|
||||
-include("iot.hrl").
|
||||
|
||||
-behaviour(gen_statem).
|
||||
|
||||
%% API
|
||||
-export([start_link/0]).
|
||||
-export([get_pid/0, forward/3, get_stat/0]).
|
||||
-export([parse_json_file/0, export/0, export_n/1]).
|
||||
|
||||
%% gen_statem callbacks
|
||||
-export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]).
|
||||
|
||||
%% 消息重发间隔
|
||||
-define(RETRY_INTERVAL, 5000).
|
||||
|
||||
-record(state, {
|
||||
mqtt_opts = [],
|
||||
postman_pid :: undefined | pid(),
|
||||
logger_pid :: pid(),
|
||||
|
||||
%% 当前数据的游标, #north_data的id
|
||||
cursor = 0 :: integer(),
|
||||
%% 定时器
|
||||
timer_ref :: undefined | reference(),
|
||||
%% 是否繁忙
|
||||
is_busy = false :: boolean(),
|
||||
|
||||
%% 记录成功处理的消息数
|
||||
acc_num = 0,
|
||||
|
||||
queue = queue:new()
|
||||
}).
|
||||
|
||||
export() ->
|
||||
Lines = parse_json_file(),
|
||||
[begin
|
||||
case catch jiffy:decode(Line, [return_maps]) of
|
||||
M when is_map(M) ->
|
||||
export0(M);
|
||||
Error ->
|
||||
lager:notice("invalid data: ~p, error: ~p", [Line, Error])
|
||||
end
|
||||
end || Line <- Lines].
|
||||
|
||||
export_n(N) when N > 0 ->
|
||||
Lines0 = parse_json_file(),
|
||||
Lines = lists:sublist(Lines0, 1, N),
|
||||
[begin
|
||||
case catch jiffy:decode(Line, [return_maps]) of
|
||||
M when is_map(M) ->
|
||||
export0(M);
|
||||
Error ->
|
||||
lager:notice("invalid data: ~p, error: ~p", [Line, Error])
|
||||
end
|
||||
end || Line <- Lines].
|
||||
|
||||
export0(Fields0 = #{<<"device_uuid">> := DeviceUUID, <<"timestamp">> := Timestamp}) ->
|
||||
Fields = lists:foldl(fun(Key, M) -> maps:remove(Key, M) end, Fields0, [<<"device_uuid">>]),
|
||||
%% 查找终端设备对应的点位信息
|
||||
case redis_client:hget(DeviceUUID, <<"location_code">>) of
|
||||
{ok, undefined} ->
|
||||
lager:warning("[iot_host] the north_data hget location_code, uuid: ~p, not found, fields: ~p", [DeviceUUID, Fields]);
|
||||
{ok, LocationCode} when is_binary(LocationCode) ->
|
||||
iot_zd_endpoint:forward(LocationCode, [Fields], Timestamp);
|
||||
{error, Reason} ->
|
||||
lager:warning("[iot_host] the north_data hget location_code uuid: ~p, get error: ~p, fields: ~p", [DeviceUUID, Reason, Fields])
|
||||
end.
|
||||
|
||||
parse_json_file() ->
|
||||
File = code:priv_dir(iot) ++ "/2024-01-18-log.txt",
|
||||
{ok, Content} = file:read_file(File),
|
||||
binary:split(Content, <<$\n>>, [global, trim]).
|
||||
|
||||
%%%===================================================================
|
||||
%%% API
|
||||
%%%===================================================================
|
||||
|
||||
-spec get_pid() -> undefined | pid().
|
||||
get_pid() ->
|
||||
whereis(?MODULE).
|
||||
|
||||
-spec forward(LocationCode :: binary(), Fields :: list(), Timestamp :: integer()) -> no_return().
|
||||
forward(LocationCode, Fields, Timestamp) when is_binary(LocationCode), is_list(Fields); is_binary(Fields), is_integer(Timestamp) ->
|
||||
gen_statem:cast(?MODULE, {forward, LocationCode, Fields, Timestamp}).
|
||||
|
||||
-spec get_stat() -> {ok, Stat :: #{}}.
|
||||
get_stat() ->
|
||||
gen_statem:call(?MODULE, get_stat, 5000).
|
||||
|
||||
%% @doc Creates a gen_statem process which calls Module:init/1 to
|
||||
%% initialize. To ensure a synchronized start-up procedure, this
|
||||
%% function does not return until Module:init/1 has returned.
|
||||
start_link() ->
|
||||
gen_statem:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||
|
||||
%%%===================================================================
|
||||
%%% gen_statem callbacks
|
||||
%%%===================================================================
|
||||
|
||||
%% @private
|
||||
%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or
|
||||
%% gen_statem:start_link/[3,4], this function is called by the new
|
||||
%% process to initialize.
|
||||
init([]) ->
|
||||
{ok, Opts} = application:get_env(iot, zhongdian),
|
||||
|
||||
{ok, PostmanPid} = create_postman(Opts),
|
||||
%% 启动日志记录器
|
||||
{ok, LoggerPid} = iot_logger:start_link("north_data_fix"),
|
||||
|
||||
{ok, connected, #state{mqtt_opts = Opts, postman_pid = PostmanPid, logger_pid = LoggerPid}}.
|
||||
|
||||
%% @private
|
||||
%% @doc This function is called by a gen_statem when it needs to find out
|
||||
%% the callback mode of the callback module.
|
||||
callback_mode() ->
|
||||
handle_event_function.
|
||||
|
||||
%% @private
|
||||
%% @doc There should be one instance of this function for each possible
|
||||
%% state name. If callback_mode is state_functions, one of these
|
||||
%% functions is called when gen_statem receives and event from
|
||||
%% call/2, cast/2, or as a normal process message.
|
||||
|
||||
handle_event(cast, {forward, LocationCode, Fields, Timestamp}, StateName, State = #state{is_busy = IsBusy, queue = Q}) ->
|
||||
Q1 = queue:in(#north_data{location_code = LocationCode, fields = Fields, timestamp = Timestamp}, Q),
|
||||
%% 避免不必要的内部消息
|
||||
Actions = case StateName =:= connected andalso not IsBusy of
|
||||
true -> [{next_event, info, fetch_next}];
|
||||
false -> []
|
||||
end,
|
||||
{keep_state, State#state{queue = Q1}, Actions};
|
||||
|
||||
%% 触发读取下一条数据
|
||||
handle_event(info, fetch_next, connected, State = #state{is_busy = true}) ->
|
||||
{keep_state, State};
|
||||
handle_event(info, fetch_next, connected, State = #state{postman_pid = PostmanPid, queue = Q}) ->
|
||||
case queue:out(Q) of
|
||||
{{value, NorthData}, Q1} ->
|
||||
lager:debug("[iot_zd_endpoint] fetch_next success, north data is: ~p", [NorthData]),
|
||||
do_post(PostmanPid, NorthData),
|
||||
TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}),
|
||||
|
||||
{keep_state, State#state{queue = Q1, timer_ref = TimerRef, is_busy = true}};
|
||||
{empty, Q1} ->
|
||||
{keep_state, State#state{queue = Q1}}
|
||||
end;
|
||||
|
||||
%% 收到确认消息
|
||||
handle_event(info, {ack, Id, AssocMessage}, StateName, State = #state{timer_ref = TimerRef, acc_num = AccNum, logger_pid = LoggerPid}) ->
|
||||
%% 记录日志信息
|
||||
iot_logger:write(LoggerPid, AssocMessage),
|
||||
|
||||
lager:debug("[iot_zd_endpoint] get ack: ~p", [Id]),
|
||||
Actions = case StateName =:= connected of
|
||||
true -> [{next_event, info, fetch_next}];
|
||||
false -> []
|
||||
end,
|
||||
is_reference(TimerRef) andalso erlang:cancel_timer(TimerRef),
|
||||
|
||||
{keep_state, State#state{timer_ref = undefined, acc_num = AccNum + 1, is_busy = false}, Actions};
|
||||
|
||||
%% 收到重发过期请求
|
||||
handle_event(info, {timeout, _, {repost_ticker, NorthData}}, connected, State = #state{postman_pid = PostmanPid}) ->
|
||||
lager:debug("[iot_zd_endpoint] repost data: ~p", [NorthData]),
|
||||
do_post(PostmanPid, NorthData),
|
||||
TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}),
|
||||
|
||||
{keep_state, State#state{timer_ref = TimerRef}};
|
||||
|
||||
%% 离线时,忽略超时逻辑
|
||||
handle_event(info, {timeout, _, {repost_ticker, _}}, disconnected, State) ->
|
||||
{keep_state, State};
|
||||
|
||||
%% 获取当前统计信息
|
||||
handle_event({call, From}, get_stat, StateName, State = #state{acc_num = AccNum}) ->
|
||||
Stat = #{
|
||||
<<"acc_num">> => AccNum,
|
||||
<<"state_name">> => atom_to_binary(StateName)
|
||||
},
|
||||
{keep_state, State, [{reply, From, Stat}]};
|
||||
|
||||
%% @private
|
||||
%% @doc If callback_mode is handle_event_function, then whenever a
|
||||
%% gen_statem receives an event from call/2, cast/2, or as a normal
|
||||
%% process message, this function is called.
|
||||
handle_event(EventType, Event, StateName, State) ->
|
||||
lager:warning("[iot_zd_endpoint] unknown message, event_type: ~p, event: ~p, state_name: ~p, state: ~p", [EventType, Event, StateName, State]),
|
||||
{keep_state, State}.
|
||||
|
||||
%% @private
|
||||
%% @doc This function is called by a gen_statem when it is about to
|
||||
%% terminate. It should be the opposite of Module:init/1 and do any
|
||||
%% necessary cleaning up. When it returns, the gen_statem terminates with
|
||||
%% Reason. The return value is ignored.
|
||||
terminate(Reason, _StateName, #state{}) ->
|
||||
lager:debug("[iot_zd_endpoint] terminate with reason: ~p", [Reason]),
|
||||
ok.
|
||||
|
||||
%% @private
|
||||
%% @doc Convert process state when code is changed
|
||||
code_change(_OldVsn, StateName, State = #state{}, _Extra) ->
|
||||
{ok, StateName, State}.
|
||||
|
||||
%%%===================================================================
|
||||
%%% Internal functions
|
||||
%%%===================================================================
|
||||
|
||||
%% 对mqtt协议的支持, 只需要建立单个链接
|
||||
create_postman(Opts) ->
|
||||
Host = proplists:get_value(host, Opts),
|
||||
Port = proplists:get_value(port, Opts),
|
||||
Username = proplists:get_value(username, Opts),
|
||||
Password = proplists:get_value(password, Opts),
|
||||
Topic = proplists:get_value(topic, Opts),
|
||||
Qos = proplists:get_value(qos, Opts),
|
||||
|
||||
Node = atom_to_binary(node()),
|
||||
ClientId = <<"mqtt-client-", Node/binary, "-zhongdian_mqtt">>,
|
||||
PostmanOpts = [
|
||||
{clientid, ClientId},
|
||||
{host, Host},
|
||||
{port, Port},
|
||||
{tcp_opts, []},
|
||||
{username, Username},
|
||||
{password, Password},
|
||||
{keepalive, 86400},
|
||||
{auto_ack, true},
|
||||
{connect_timeout, 5000},
|
||||
{proto_ver, v5},
|
||||
{retry_interval, 5000}
|
||||
],
|
||||
|
||||
mqtt_postman:start_link(PostmanOpts, Topic, Qos).
|
||||
|
||||
-spec do_post(PostmanPid :: pid(), NorthData :: #north_data{}) -> no_return().
|
||||
do_post(PostmanPid, #north_data{id = Id, location_code = LocationCode, fields = Fields, timestamp = Timestamp}) when is_pid(PostmanPid) ->
|
||||
Data = #{
|
||||
<<"version">> => <<"1.0">>,
|
||||
<<"location_code">> => LocationCode,
|
||||
<<"ts">> => Timestamp,
|
||||
<<"repair">> => true,
|
||||
<<"properties">> => Fields
|
||||
},
|
||||
try
|
||||
Body = iolist_to_binary(jiffy:encode(Data, [force_utf8])),
|
||||
PostmanPid ! {post, self(), #post_data{id = Id, location_code = LocationCode, body = Body}}
|
||||
catch _:_ ->
|
||||
self() ! {ack, Id, <<"json error">>}
|
||||
end.
|
||||
@ -28,14 +28,7 @@ start_link() ->
|
||||
init([]) ->
|
||||
SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600},
|
||||
Specs = [
|
||||
#{
|
||||
id => 'iot_zd_endpoint',
|
||||
start => {'iot_zd_endpoint', start_link, []},
|
||||
restart => permanent,
|
||||
shutdown => 2000,
|
||||
type => worker,
|
||||
modules => ['iot_zd_endpoint']
|
||||
}
|
||||
|
||||
],
|
||||
|
||||
{ok, {SupFlags, pools() ++ Specs}}.
|
||||
|
||||
@ -1,26 +0,0 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% @author aresei
|
||||
%%% @copyright (C) 2023, <COMPANY>
|
||||
%%% @doc
|
||||
%%%
|
||||
%%% @end
|
||||
%%% Created : 04. 7月 2023 12:31
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(mnesia_id_generator).
|
||||
-author("aresei").
|
||||
-include("iot.hrl").
|
||||
|
||||
%% API
|
||||
-export([next_id/1, create_table/0]).
|
||||
|
||||
create_table() ->
|
||||
%% id生成器
|
||||
mnesia:create_table(id_generator, [
|
||||
{attributes, record_info(fields, id_generator)},
|
||||
{record_name, id_generator},
|
||||
{disc_copies, [node()]},
|
||||
{type, ordered_set}
|
||||
]).
|
||||
|
||||
next_id(Tab) when is_atom(Tab) ->
|
||||
mnesia:dirty_update_counter(id_generator, Tab, 1).
|
||||
@ -1,61 +0,0 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% @author aresei
|
||||
%%% @copyright (C) 2023, <COMPANY>
|
||||
%%% @doc
|
||||
%%%
|
||||
%%% @end
|
||||
%%% Created : 26. 7月 2023 10:40
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(mnesia_queue).
|
||||
-author("aresei").
|
||||
-include("iot.hrl").
|
||||
|
||||
-define(TAB_NAME, 'queue_data:zhongdian').
|
||||
|
||||
%% API
|
||||
-export([create_table/0]).
|
||||
-export([insert/1, delete/1, table_size/0, dirty_fetch_next/1]).
|
||||
|
||||
create_table() ->
|
||||
%% 数据转发缓存表
|
||||
mnesia:create_table(?TAB_NAME, [
|
||||
{attributes, record_info(fields, north_data)},
|
||||
{record_name, north_data},
|
||||
{disc_copies, [node()]},
|
||||
{type, ordered_set}
|
||||
]).
|
||||
|
||||
-spec insert(#north_data{}) -> ok | {error, Reason :: any()}.
|
||||
insert(Item = #north_data{}) ->
|
||||
Id = mnesia_id_generator:next_id(?TAB_NAME),
|
||||
NItem = Item#north_data{id = Id},
|
||||
case mnesia:transaction(fun() -> mnesia:write(?TAB_NAME, NItem, write) end) of
|
||||
{atomic, ok} ->
|
||||
ok;
|
||||
{aborted, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
-spec delete(Key :: any()) -> ok | {error, Reason :: any()}.
|
||||
delete(Key) when is_integer(Key) ->
|
||||
case mnesia:transaction(fun() -> mnesia:delete(?TAB_NAME, Key, write) end) of
|
||||
{atomic, ok} ->
|
||||
ok;
|
||||
{aborted, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
-spec table_size() -> integer().
|
||||
table_size() ->
|
||||
mnesia:table_info(?TAB_NAME, size).
|
||||
|
||||
-spec dirty_fetch_next(Cursor :: integer()) ->
|
||||
{ok, NCursor :: integer(), Item :: any()} | '$end_of_table'.
|
||||
dirty_fetch_next(Cursor) when is_integer(Cursor) ->
|
||||
case mnesia:dirty_next(?TAB_NAME, Cursor) of
|
||||
'$end_of_table' ->
|
||||
'$end_of_table';
|
||||
NextKey ->
|
||||
[Item] = mnesia:dirty_read(?TAB_NAME, NextKey),
|
||||
{ok, NextKey, Item}
|
||||
end.
|
||||
@ -1,103 +0,0 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% @author aresei
|
||||
%%% @copyright (C) 2023, <COMPANY>
|
||||
%%% @doc
|
||||
%%%
|
||||
%%% @end
|
||||
%%% Created : 26. 7月 2023 10:40
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(mnesia_totalizator).
|
||||
-author("aresei").
|
||||
-include("iot.hrl").
|
||||
-include_lib("stdlib/include/qlc.hrl").
|
||||
|
||||
-define(TAB_NAME, totalizator).
|
||||
|
||||
%% API
|
||||
-export([create_table/0]).
|
||||
-export([increment_success/2, increment_fail/2, delete/2, table_size/0, query/2]).
|
||||
|
||||
create_table() ->
|
||||
%% id生成器
|
||||
mnesia:create_table(?TAB_NAME, [
|
||||
{attributes, record_info(fields, totalizator)},
|
||||
{record_name, totalizator},
|
||||
{disc_copies, [node()]},
|
||||
{type, ordered_set}
|
||||
]).
|
||||
|
||||
-spec query(SceneIds :: [integer()], Dates :: [calendar:date()]) -> [map()].
|
||||
query(SceneIds, Dates) when is_list(SceneIds), is_list(Dates) ->
|
||||
lists:map(fun(Date) ->
|
||||
Scenes = lists:map(fun(SceneId) ->
|
||||
Key = {SceneId, Date},
|
||||
case mnesia:dirty_read(?TAB_NAME, Key) of
|
||||
[R | _] ->
|
||||
to_map(R);
|
||||
[] ->
|
||||
#{<<"scene_id">> => SceneId, <<"success_num">> => 0, <<"fail_num">> => 0}
|
||||
end
|
||||
end, SceneIds),
|
||||
#{<<"date">> => format_date(Date), <<"scenes">> => Scenes}
|
||||
end, Dates).
|
||||
|
||||
-spec increment_success(SceneId :: integer(), IncNum :: integer()) -> ok | {error, Reason :: any()}.
|
||||
increment_success(SceneId, IncNum) when is_integer(SceneId), is_integer(IncNum) ->
|
||||
increment(SceneId, success, IncNum).
|
||||
|
||||
-spec increment_fail(SceneId :: integer(), IncNum :: integer()) -> ok | {error, Reason :: any()}.
|
||||
increment_fail(SceneId, IncNum) when is_integer(SceneId), is_integer(IncNum) ->
|
||||
increment(SceneId, fail, IncNum).
|
||||
|
||||
-spec increment(SceneId :: integer(), Type :: atom(), IncNum :: integer()) -> ok | {error, Reason :: any()}.
|
||||
increment(SceneId, Type, IncNum) when is_integer(SceneId), is_integer(IncNum), is_atom(Type) ->
|
||||
{Date, _} = calendar:local_time(),
|
||||
Key = {SceneId, Date},
|
||||
Fun = fun() ->
|
||||
case mnesia:read(?TAB_NAME, Key) of
|
||||
[R = #totalizator{option = Option = #option{success_num = SuccessNum, fail_num = FailNum}} | _] ->
|
||||
NOption = case Type of
|
||||
success ->
|
||||
Option#option{success_num = SuccessNum + IncNum};
|
||||
fail ->
|
||||
Option#option{fail_num = FailNum + IncNum}
|
||||
end,
|
||||
NR = R#totalizator{option = NOption},
|
||||
mnesia:write(?TAB_NAME, NR, write);
|
||||
[] ->
|
||||
Option = case Type of
|
||||
success ->
|
||||
#option{success_num = IncNum};
|
||||
fail ->
|
||||
#option{fail_num = IncNum}
|
||||
end,
|
||||
R = #totalizator{key = Key, scene_id = SceneId, date = Date, option = Option},
|
||||
mnesia:write(?TAB_NAME, R, write)
|
||||
end
|
||||
end,
|
||||
|
||||
case mnesia:transaction(Fun) of
|
||||
{atomic, ok} ->
|
||||
ok;
|
||||
{aborted, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
-spec delete(SceneId :: integer(), Date :: calendar:date()) -> ok | {error, Reason :: any()}.
|
||||
delete(SceneId, Date) when is_integer(SceneId), is_tuple(Date) ->
|
||||
case mnesia:transaction(fun() -> mnesia:delete(?TAB_NAME, {SceneId, Date}, write) end) of
|
||||
{atomic, ok} ->
|
||||
ok;
|
||||
{aborted, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
-spec table_size() -> integer().
|
||||
table_size() ->
|
||||
mnesia:table_info(?TAB_NAME, size).
|
||||
|
||||
to_map(#totalizator{scene_id = SceneId, option = #option{success_num = SuccessNum, fail_num = FailNum}}) ->
|
||||
#{<<"scene_id">> => SceneId, <<"success_num">> => SuccessNum, <<"fail_num">> => FailNum}.
|
||||
|
||||
format_date({Year, Month, Day}) ->
|
||||
iolist_to_binary(io_lib:format("~b-~2..0b-~2..0b", [Year, Month, Day])).
|
||||
@ -7,7 +7,7 @@
|
||||
%%% @end
|
||||
%%% Created : 12. 3月 2023 21:27
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(iot_zd_consumer).
|
||||
-module(iot_mqtt_consumer).
|
||||
-author("aresei").
|
||||
-include("iot.hrl").
|
||||
|
||||
@ -36,23 +36,6 @@
|
||||
{<<"test">>, <<"iot2023">>}
|
||||
]},
|
||||
|
||||
%% 配置中电的数据转发, mqtt协议
|
||||
{zhongdian, [
|
||||
{host, "39.98.184.67"},
|
||||
{port, 1883},
|
||||
{username, "test"},
|
||||
{password, "test1234"},
|
||||
{topic, "CET/NX/upload"},
|
||||
{qos, 2}
|
||||
]},
|
||||
|
||||
%% 金智调度系统
|
||||
{jinzhi, [
|
||||
{pri_key, "jinzhi_pri.key"},
|
||||
{url, "http://172.30.6.177:9080/device/push"},
|
||||
{pool_size, 10}
|
||||
]},
|
||||
|
||||
{pools, [
|
||||
%% redis连接池
|
||||
{redis_pool,
|
||||
|
||||
@ -25,23 +25,6 @@
|
||||
|
||||
{api_url, "https://lgsiot.njau.edu.cn/api/v1/taskLog"},
|
||||
|
||||
%% 配置中电的数据转发, mqtt协议
|
||||
{zhongdian, [
|
||||
{host, "172.30.6.161"},
|
||||
{port, 1883},
|
||||
{username, "admin"},
|
||||
{password, "123456"},
|
||||
{topic, "CET/NX/upload"},
|
||||
{qos, 2}
|
||||
]},
|
||||
|
||||
%% 金智调度系统
|
||||
{jinzhi, [
|
||||
{pri_key, "jinzhi_pri.key"},
|
||||
{url, "http://172.30.6.177:9080/device/push"},
|
||||
{pool_size, 10}
|
||||
]},
|
||||
|
||||
{influxdb, [
|
||||
{host, "172.19.0.4"},
|
||||
{port, 8086},
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user