From 2eb97210f4347e80385e9d3736a0fcb8fcfefd7f Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Tue, 7 May 2024 22:36:12 +0800 Subject: [PATCH] =?UTF-8?q?=E7=AE=80=E5=8C=96=E6=A8=A1=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/iot/src/endpoint/iot_http_endpoint.erl | 170 ----------- apps/iot/src/endpoint/iot_jinzhi_endpoint.erl | 273 ------------------ apps/iot/src/endpoint/iot_zd_endpoint.erl | 260 ----------------- apps/iot/src/iot_sup.erl | 9 +- apps/iot/src/mnesia/mnesia_id_generator.erl | 26 -- apps/iot/src/mnesia/mnesia_queue.erl | 61 ---- apps/iot/src/mnesia/mnesia_totalizator.erl | 103 ------- .../iot_mqtt_consumer.erl} | 2 +- config/sys-dev.config | 17 -- config/sys-prod.config | 17 -- 10 files changed, 2 insertions(+), 936 deletions(-) delete mode 100644 apps/iot/src/endpoint/iot_http_endpoint.erl delete mode 100644 apps/iot/src/endpoint/iot_jinzhi_endpoint.erl delete mode 100644 apps/iot/src/endpoint/iot_zd_endpoint.erl delete mode 100644 apps/iot/src/mnesia/mnesia_id_generator.erl delete mode 100644 apps/iot/src/mnesia/mnesia_queue.erl delete mode 100644 apps/iot/src/mnesia/mnesia_totalizator.erl rename apps/iot/src/{consumer/iot_zd_consumer.erl => mocker/iot_mqtt_consumer.erl} (99%) diff --git a/apps/iot/src/endpoint/iot_http_endpoint.erl b/apps/iot/src/endpoint/iot_http_endpoint.erl deleted file mode 100644 index 03d7df9..0000000 --- a/apps/iot/src/endpoint/iot_http_endpoint.erl +++ /dev/null @@ -1,170 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author aresei -%%% @copyright (C) 2023, -%%% @doc -%%% -%%% @end -%%% Created : 06. 7月 2023 12:02 -%%%------------------------------------------------------------------- --module(iot_http_endpoint). --author("aresei"). --include("iot.hrl"). - --behaviour(gen_statem). - -%% API --export([start_link/2]). --export([get_pid/1, forward/4, get_stat/0]). - -%% gen_statem callbacks --export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]). - -%% 消息重发间隔 --define(RETRY_INTERVAL, 5000). - --record(state, { - postman_pid :: undefined | pid(), - pool_size = 0, - flight_num = 0, - id = 1, - queue :: queue:queue(), - %% 定时器对应关系 - timer_map = #{}, - %% 记录成功处理的消息数 - acc_num = 0 -}). - -%%%=================================================================== -%%% API -%%%=================================================================== - --spec get_pid(Name :: atom()) -> undefined | pid(). -get_pid(Name) when is_atom(Name) -> - whereis(Name). - --spec forward(Pid :: pid(), LocationCode :: binary(), EventType :: integer(), Params :: map()) -> no_return(). -forward(Pid, LocationCode, EventType, Params) when is_pid(Pid), is_binary(LocationCode), is_integer(EventType), is_map(Params) -> - gen_statem:cast(Pid, {forward, LocationCode, EventType, Params}). - --spec get_stat() -> {ok, Stat :: #{}}. -get_stat() -> - gen_statem:call(?MODULE, get_stat, 5000). - -%% @doc Creates a gen_statem process which calls Module:init/1 to -%% initialize. To ensure a synchronized start-up procedure, this -%% function does not return until Module:init/1 has returned. -start_link(Name, Opts) when is_atom(Name), is_list(Opts) -> - gen_statem:start_link({local, Name}, ?MODULE, [Opts], []). - -%%%=================================================================== -%%% gen_statem callbacks -%%%=================================================================== - -%% @private -%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or -%% gen_statem:start_link/[3,4], this function is called by the new -%% process to initialize. -init([Opts]) -> - PoolSize = proplists:get_value(pool_size, Opts), - {ok, PostmanPid} = broker_postman:start_link(http_postman, Opts, PoolSize), - - {ok, connected, #state{postman_pid = PostmanPid, pool_size = PoolSize, queue = queue:new()}}. - -%% @private -%% @doc This function is called by a gen_statem when it needs to find out -%% the callback mode of the callback module. -callback_mode() -> - handle_event_function. - -%% @private -%% @doc There should be one instance of this function for each possible -%% state name. If callback_mode is state_functions, one of these -%% functions is called when gen_statem receives and event from -%% call/2, cast/2, or as a normal process message. - -handle_event(cast, {forward, LocationCode, EventType, Params}, _, State = #state{id = Id, flight_num = FlightNum, pool_size = PoolSize, queue = Q}) -> - EventData = #event_data{id = Id, location_code = LocationCode, event_type = EventType, params = Params}, - %% 避免不必要的内部消息 - Actions = case FlightNum < PoolSize of - true -> [{next_event, info, fetch_next}]; - false -> [] - end, - {keep_state, State#state{queue = queue:in(EventData, Q), id = Id + 1}, Actions}; - -%% 触发读取下一条数据 -handle_event(info, fetch_next, _, State = #state{postman_pid = PostmanPid, queue = Q, flight_num = FlightNum, timer_map = TimerMap}) -> - case queue:out(Q) of - {{value, EventData = #event_data{id = Id}}, Q1} -> - lager:debug("[iot_http_endpoint] fetch_next success, event data is: ~p", [EventData]), - do_post(PostmanPid, EventData), - TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, EventData}), - - {keep_state, State#state{timer_map = maps:put(Id, TimerRef, TimerMap), queue = Q1, flight_num = FlightNum + 1}}; - {empty, Q1} -> - {keep_state, State#state{queue = Q1}} - end; - -%% 收到确认消息 -handle_event(info, {ack, Id}, _, State = #state{timer_map = TimerMap, acc_num = AccNum, flight_num = FlightNum}) -> - lager:debug("[iot_zd_endpoint] get ack: ~p", [Id]), - case maps:take(Id, TimerMap) of - error -> - {keep_state, State#state{acc_num = AccNum + 1, flight_num = FlightNum - 1}, [{next_event, info, fetch_next}]}; - {TimerRef, NTimerMap} -> - is_reference(TimerRef) andalso erlang:cancel_timer(TimerRef), - {keep_state, State#state{timer_map = NTimerMap, acc_num = AccNum + 1, flight_num = FlightNum - 1}, [{next_event, info, fetch_next}]} - end; - -%% 收到重发过期请求 -handle_event(info, {timeout, _, {repost_ticker, EventData = #event_data{id = Id}}}, _, State = #state{postman_pid = PostmanPid, timer_map = TimerMap}) -> - lager:debug("[iot_zd_endpoint] repost data: ~p", [EventData]), - do_post(PostmanPid, EventData), - TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, EventData}), - - {keep_state, State#state{timer_map = maps:put(Id, TimerRef, TimerMap)}}; - -%% 获取当前统计信息 -handle_event({call, From}, get_stat, StateName, State = #state{acc_num = AccNum}) -> - Stat = #{ - <<"acc_num">> => AccNum, - <<"queue_num">> => mnesia_queue:table_size(), - <<"state_name">> => atom_to_binary(StateName) - }, - {keep_state, State, [{reply, From, Stat}]}; - -%% @private -%% @doc If callback_mode is handle_event_function, then whenever a -%% gen_statem receives an event from call/2, cast/2, or as a normal -%% process message, this function is called. -handle_event(EventType, Event, StateName, State) -> - lager:warning("[iot_zd_endpoint] unknown message, event_type: ~p, event: ~p, state_name: ~p, state: ~p", [EventType, Event, StateName, State]), - {keep_state, State}. - -%% @private -%% @doc This function is called by a gen_statem when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any -%% necessary cleaning up. When it returns, the gen_statem terminates with -%% Reason. The return value is ignored. -terminate(Reason, _StateName, #state{}) -> - lager:debug("[iot_zd_endpoint] terminate with reason: ~p", [Reason]), - ok. - -%% @private -%% @doc Convert process state when code is changed -code_change(_OldVsn, StateName, State = #state{}, _Extra) -> - {ok, StateName, State}. - -%%%=================================================================== -%%% Internal functions -%%%=================================================================== - --spec do_post(PostmanPid :: pid(), EventData :: #event_data{}) -> no_return(). -do_post(PostmanPid, #event_data{id = Id, location_code = LocationCode, event_type = EventType, params = Params}) when is_pid(PostmanPid) -> - Data = #{ - <<"version">> => <<"1.0">>, - <<"event_type">> => EventType, - <<"params">> => Params - }, - Body = iolist_to_binary(jiffy:encode(Data, [force_utf8])), - PostmanPid ! {post, self(), #post_data{id = Id, location_code = LocationCode, body = Body}}, - ok. \ No newline at end of file diff --git a/apps/iot/src/endpoint/iot_jinzhi_endpoint.erl b/apps/iot/src/endpoint/iot_jinzhi_endpoint.erl deleted file mode 100644 index fd1a5d9..0000000 --- a/apps/iot/src/endpoint/iot_jinzhi_endpoint.erl +++ /dev/null @@ -1,273 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author aresei -%%% @copyright (C) 2023, -%%% @doc -%%% -%%% @end -%%% Created : 06. 7月 2023 12:02 -%%%------------------------------------------------------------------- --module(iot_jinzhi_endpoint). --author("aresei"). --include("iot.hrl"). - --behaviour(gen_statem). - -%% API --export([start_link/0]). --export([get_pid/0, forward/3, get_stat/0]). - -%% gen_statem callbacks --export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]). - -%% 消息重发间隔 --define(RETRY_INTERVAL, 5000). -%% 系统id --define(SYS_ID, <<"ZNWLZJJKXT">>). - --record(state, { - url :: string(), - logger_pid :: pid(), - pool_size = 0, - flight_num = 0, - pri_key :: public_key:private_key(), - id = 1, - queue :: queue:queue(), - %% 定时器对应关系 - timer_map = #{}, - %% 记录成功处理的消息数 - acc_num = 0 -}). - -%%%=================================================================== -%%% API -%%%=================================================================== - --spec get_pid() -> undefined | pid(). -get_pid() -> - whereis(?MODULE). - --spec forward(LocationCode :: binary(), EventType :: integer(), Params :: map()) -> no_return(). -forward(LocationCode, EventType, Params) when is_binary(LocationCode), is_integer(EventType), is_map(Params) -> - gen_statem:cast(?MODULE, {forward, LocationCode, EventType, Params}). - --spec get_stat() -> {ok, Stat :: #{}}. -get_stat() -> - gen_statem:call(?MODULE, get_stat, 5000). - -%% @doc Creates a gen_statem process which calls Module:init/1 to -%% initialize. To ensure a synchronized start-up procedure, this -%% function does not return until Module:init/1 has returned. -start_link() -> - gen_statem:start_link({local, ?MODULE}, ?MODULE, [], []). - -%%%=================================================================== -%%% gen_statem callbacks -%%%=================================================================== - -%% @private -%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or -%% gen_statem:start_link/[3,4], this function is called by the new -%% process to initialize. -init([]) -> - {ok, Opts} = application:get_env(iot, jinzhi), - - PoolSize = proplists:get_value(pool_size, Opts), - PriFile = proplists:get_value(pri_key, Opts), - Url = proplists:get_value(url, Opts), - - {ok, LoggerPid} = iot_logger:start_link("ai_event_data"), - PriKey = generate_private_key(PriFile), - - {ok, connected, #state{url = Url, logger_pid = LoggerPid, pri_key = PriKey, pool_size = PoolSize, queue = queue:new()}}. - -%% @private -%% @doc This function is called by a gen_statem when it needs to find out -%% the callback mode of the callback module. -callback_mode() -> - handle_event_function. - -%% @private -%% @doc There should be one instance of this function for each possible -%% state name. If callback_mode is state_functions, one of these -%% functions is called when gen_statem receives and event from -%% call/2, cast/2, or as a normal process message. - -handle_event(cast, {forward, LocationCode, EventType, Params}, _, State = #state{id = Id, flight_num = FlightNum, pool_size = PoolSize, queue = Q}) -> - EventData = #event_data{id = Id, location_code = LocationCode, event_type = EventType, params = Params}, - %% 避免不必要的内部消息 - Actions = case FlightNum < PoolSize of - true -> [{next_event, info, fetch_next}]; - false -> [] - end, - {keep_state, State#state{queue = queue:in(EventData, Q), id = Id + 1}, Actions}; - -%% 触发读取下一条数据 -handle_event(info, fetch_next, _, State = #state{queue = Q, flight_num = FlightNum, timer_map = TimerMap}) -> - case queue:out(Q) of - {{value, EventData = #event_data{id = Id}}, Q1} -> - lager:debug("[iot_http_endpoint] fetch_next success, event data is: ~p", [EventData]), - catch do_post(EventData, State), - TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, EventData}), - - {keep_state, State#state{timer_map = maps:put(Id, TimerRef, TimerMap), queue = Q1, flight_num = FlightNum + 1}}; - {empty, Q1} -> - {keep_state, State#state{queue = Q1}} - end; - -%% 收到确认消息 -handle_event(info, {ack, Id}, _, State = #state{timer_map = TimerMap, acc_num = AccNum, flight_num = FlightNum}) -> - case maps:take(Id, TimerMap) of - error -> - {keep_state, State#state{acc_num = AccNum + 1, flight_num = FlightNum - 1}, [{next_event, info, fetch_next}]}; - {TimerRef, NTimerMap} -> - is_reference(TimerRef) andalso erlang:cancel_timer(TimerRef), - {keep_state, State#state{timer_map = NTimerMap, acc_num = AccNum + 1, flight_num = FlightNum - 1}, [{next_event, info, fetch_next}]} - end; - -%% 收到重发过期请求 -handle_event(info, {timeout, _, {repost_ticker, EventData = #event_data{id = Id}}}, _, State = #state{timer_map = TimerMap}) -> - lager:debug("[iot_jinzhi_endpoint] repost data: ~p", [EventData]), - catch do_post(EventData, State), - TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, EventData}), - - {keep_state, State#state{timer_map = maps:put(Id, TimerRef, TimerMap)}}; - -%% Task进程挂掉 -handle_event(info, {'DOWN', _MRef, process, _Pid, normal}, _, State) -> - {keep_state, State}; - -handle_event(info, {'DOWN', _MRef, process, _Pid, Reason}, _, State) -> - lager:notice("[iot_jinzhi_endpoint] task process down with reason: ~p", [Reason]), - {keep_state, State}; - -%% 获取当前统计信息 -handle_event({call, From}, get_stat, StateName, State = #state{acc_num = AccNum}) -> - Stat = #{ - <<"acc_num">> => AccNum, - <<"queue_num">> => mnesia_queue:table_size(), - <<"state_name">> => atom_to_binary(StateName) - }, - {keep_state, State, [{reply, From, Stat}]}; - -%% @private -%% @doc If callback_mode is handle_event_function, then whenever a -%% gen_statem receives an event from call/2, cast/2, or as a normal -%% process message, this function is called. -handle_event(EventType, Event, StateName, State) -> - lager:warning("[iot_jinzhi_endpoint] unknown message, event_type: ~p, event: ~p, state_name: ~p, state: ~p", [EventType, Event, StateName, State]), - {keep_state, State}. - -%% @private -%% @doc This function is called by a gen_statem when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any -%% necessary cleaning up. When it returns, the gen_statem terminates with -%% Reason. The return value is ignored. -terminate(Reason, _StateName, #state{}) -> - lager:debug("[iot_jinzhi_endpoint] terminate with reason: ~p", [Reason]), - ok. - -%% @private -%% @doc Convert process state when code is changed -code_change(_OldVsn, StateName, State = #state{}, _Extra) -> - {ok, StateName, State}. - -%%%=================================================================== -%%% Internal functions -%%%=================================================================== - --spec do_post(EventData :: #event_data{}, State :: #state{}) -> no_return(). -do_post(#event_data{id = Id, location_code = LocationCode, event_type = EventType, - params = Params = #{<<"event_code">> := EventCode, <<"description">> := Description, <<"datetime">> := Datetime, <<"attachments">> := Attachments0}}, - #state{pri_key = PriKey, url = Url, logger_pid = LoggerPid}) -> - - % <<"occurrenceTime">> => <<"2023-06-10 12:00:00">>, - - Attachments = lists:map(fun(#{<<"filename">> := Filename}) -> - {ok, FileUrl} = iot_util:file_uri(Filename), - Name = filename:basename(FileUrl), - #{<<"name">> => Name, <<"url">> => FileUrl} - end, Attachments0), - - DeviceInfo = #{ - <<"location">> => LocationCode, - <<"category">> => EventCode, - <<"description">> => Description, - <<"occurrenceTime">> => Datetime, - <<"attachments">> => Attachments - }, - - ReqData = #{ - <<"sign">> => sign(DeviceInfo, PriKey), - <<"sysId">> => ?SYS_ID, - <<"deviceInfo">> => DeviceInfo - }, - Body = iolist_to_binary(jiffy:encode(ReqData, [force_utf8])), - - lager:debug("[iot_jinzhi_endpoint] do_post url: ~p, event_type: ~p, params: ~p, location_code: ~p, body: ~p", [Url, EventType, Params, LocationCode, Body]), - - ReceiverPid = self(), - %% 异步提交任务 - spawn_monitor(fun() -> - Headers = [ - {<<"content-type">>, <<"application/json">>} - ], - case hackney:request(post, Url, Headers, Body, [{pool, false}]) of - {ok, 200, _, ClientRef} -> - {ok, RespBody} = hackney:body(ClientRef), - hackney:close(ClientRef), - iot_logger:write(LoggerPid, [Body, RespBody]), - ReceiverPid ! {ack, Id}; - {ok, HttpCode, _, ClientRef} -> - {ok, RespBody} = hackney:body(ClientRef), - hackney:close(ClientRef), - lager:warning("[iot_jinzhi_endpoint] send body: ~p, get error is: ~p", [Body, {HttpCode, RespBody}]); - {error, Reason} -> - lager:warning("[iot_jinzhi_endpoint] send body: ~p, get error is: ~p", [Body, Reason]) - end - end). - --spec generate_private_key(PriFile :: string()) -> public_key:private_key(). -generate_private_key(PriFile) when is_list(PriFile) -> - PriKeyFile = code:priv_dir(iot) ++ "/" ++ PriFile, - %% 私钥保存解析后的 - {ok, PriKeyData} = file:read_file(PriKeyFile), - PriDerData = base64:decode(PriKeyData), - public_key:der_decode('PrivateKeyInfo', PriDerData). - -%% 数据签名 --spec sign(M :: #{}, PrivateKey :: public_key:private_key()) -> binary(). -sign(M, PrivateKey) when is_map(M) -> - Json = serialize(M), - Hash = iolist_to_binary(io_lib:format("~64.16.0b", [binary:decode_unsigned(crypto:hash(sha256, Json))])), - RsaEncoded = public_key:encrypt_private(Hash, PrivateKey), - - base64:encode(RsaEncoded). - -%% 简单的序列号,sign签名 --spec serialize(M :: map()) -> JsonString :: binary(). -serialize(M) when is_map(M) -> - L = maps:to_list(M), - L1 = lists:sort(fun({K, _}, {K1, _}) -> K < K1 end, L), - serialize(L1, []). -serialize([], Target) -> - B = iolist_to_binary(lists:join(<<$,>>, lists:reverse(Target))), - <<${, B/binary, $}>>; -serialize([{K, V}|T], Target) -> - V1 = if - is_integer(V) -> - integer_to_binary(V); - is_float(V) -> - float_to_binary(V); - is_binary(V) -> - <<$", V/binary, $">>; - is_boolean(V) andalso V -> - <<"true">>; - is_boolean(V) andalso not V -> - <<"false">>; - is_list(V) -> - Items = lists:map(fun(E) -> serialize(E) end, V), - V0 = iolist_to_binary(lists:join(<<$,>>, Items)), - <<$[, V0/binary, $]>> - end, - Item = <<$", K/binary, $", $:, V1/binary>>, - serialize(T, [Item|Target]). diff --git a/apps/iot/src/endpoint/iot_zd_endpoint.erl b/apps/iot/src/endpoint/iot_zd_endpoint.erl deleted file mode 100644 index fe2645e..0000000 --- a/apps/iot/src/endpoint/iot_zd_endpoint.erl +++ /dev/null @@ -1,260 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author aresei -%%% @copyright (C) 2023, -%%% @doc -%%% -%%% @end -%%% Created : 06. 7月 2023 12:02 -%%%------------------------------------------------------------------- --module(iot_zd_endpoint). --author("aresei"). --include("iot.hrl"). - --behaviour(gen_statem). - -%% API --export([start_link/0]). --export([get_pid/0, forward/3, get_stat/0]). --export([parse_json_file/0, export/0, export_n/1]). - -%% gen_statem callbacks --export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]). - -%% 消息重发间隔 --define(RETRY_INTERVAL, 5000). - --record(state, { - mqtt_opts = [], - postman_pid :: undefined | pid(), - logger_pid :: pid(), - - %% 当前数据的游标, #north_data的id - cursor = 0 :: integer(), - %% 定时器 - timer_ref :: undefined | reference(), - %% 是否繁忙 - is_busy = false :: boolean(), - - %% 记录成功处理的消息数 - acc_num = 0, - - queue = queue:new() -}). - -export() -> - Lines = parse_json_file(), - [begin - case catch jiffy:decode(Line, [return_maps]) of - M when is_map(M) -> - export0(M); - Error -> - lager:notice("invalid data: ~p, error: ~p", [Line, Error]) - end - end || Line <- Lines]. - -export_n(N) when N > 0 -> - Lines0 = parse_json_file(), - Lines = lists:sublist(Lines0, 1, N), - [begin - case catch jiffy:decode(Line, [return_maps]) of - M when is_map(M) -> - export0(M); - Error -> - lager:notice("invalid data: ~p, error: ~p", [Line, Error]) - end - end || Line <- Lines]. - -export0(Fields0 = #{<<"device_uuid">> := DeviceUUID, <<"timestamp">> := Timestamp}) -> - Fields = lists:foldl(fun(Key, M) -> maps:remove(Key, M) end, Fields0, [<<"device_uuid">>]), - %% 查找终端设备对应的点位信息 - case redis_client:hget(DeviceUUID, <<"location_code">>) of - {ok, undefined} -> - lager:warning("[iot_host] the north_data hget location_code, uuid: ~p, not found, fields: ~p", [DeviceUUID, Fields]); - {ok, LocationCode} when is_binary(LocationCode) -> - iot_zd_endpoint:forward(LocationCode, [Fields], Timestamp); - {error, Reason} -> - lager:warning("[iot_host] the north_data hget location_code uuid: ~p, get error: ~p, fields: ~p", [DeviceUUID, Reason, Fields]) - end. - -parse_json_file() -> - File = code:priv_dir(iot) ++ "/2024-01-18-log.txt", - {ok, Content} = file:read_file(File), - binary:split(Content, <<$\n>>, [global, trim]). - -%%%=================================================================== -%%% API -%%%=================================================================== - --spec get_pid() -> undefined | pid(). -get_pid() -> - whereis(?MODULE). - --spec forward(LocationCode :: binary(), Fields :: list(), Timestamp :: integer()) -> no_return(). -forward(LocationCode, Fields, Timestamp) when is_binary(LocationCode), is_list(Fields); is_binary(Fields), is_integer(Timestamp) -> - gen_statem:cast(?MODULE, {forward, LocationCode, Fields, Timestamp}). - --spec get_stat() -> {ok, Stat :: #{}}. -get_stat() -> - gen_statem:call(?MODULE, get_stat, 5000). - -%% @doc Creates a gen_statem process which calls Module:init/1 to -%% initialize. To ensure a synchronized start-up procedure, this -%% function does not return until Module:init/1 has returned. -start_link() -> - gen_statem:start_link({local, ?MODULE}, ?MODULE, [], []). - -%%%=================================================================== -%%% gen_statem callbacks -%%%=================================================================== - -%% @private -%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or -%% gen_statem:start_link/[3,4], this function is called by the new -%% process to initialize. -init([]) -> - {ok, Opts} = application:get_env(iot, zhongdian), - - {ok, PostmanPid} = create_postman(Opts), - %% 启动日志记录器 - {ok, LoggerPid} = iot_logger:start_link("north_data_fix"), - - {ok, connected, #state{mqtt_opts = Opts, postman_pid = PostmanPid, logger_pid = LoggerPid}}. - -%% @private -%% @doc This function is called by a gen_statem when it needs to find out -%% the callback mode of the callback module. -callback_mode() -> - handle_event_function. - -%% @private -%% @doc There should be one instance of this function for each possible -%% state name. If callback_mode is state_functions, one of these -%% functions is called when gen_statem receives and event from -%% call/2, cast/2, or as a normal process message. - -handle_event(cast, {forward, LocationCode, Fields, Timestamp}, StateName, State = #state{is_busy = IsBusy, queue = Q}) -> - Q1 = queue:in(#north_data{location_code = LocationCode, fields = Fields, timestamp = Timestamp}, Q), - %% 避免不必要的内部消息 - Actions = case StateName =:= connected andalso not IsBusy of - true -> [{next_event, info, fetch_next}]; - false -> [] - end, - {keep_state, State#state{queue = Q1}, Actions}; - -%% 触发读取下一条数据 -handle_event(info, fetch_next, connected, State = #state{is_busy = true}) -> - {keep_state, State}; -handle_event(info, fetch_next, connected, State = #state{postman_pid = PostmanPid, queue = Q}) -> - case queue:out(Q) of - {{value, NorthData}, Q1} -> - lager:debug("[iot_zd_endpoint] fetch_next success, north data is: ~p", [NorthData]), - do_post(PostmanPid, NorthData), - TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}), - - {keep_state, State#state{queue = Q1, timer_ref = TimerRef, is_busy = true}}; - {empty, Q1} -> - {keep_state, State#state{queue = Q1}} - end; - -%% 收到确认消息 -handle_event(info, {ack, Id, AssocMessage}, StateName, State = #state{timer_ref = TimerRef, acc_num = AccNum, logger_pid = LoggerPid}) -> - %% 记录日志信息 - iot_logger:write(LoggerPid, AssocMessage), - - lager:debug("[iot_zd_endpoint] get ack: ~p", [Id]), - Actions = case StateName =:= connected of - true -> [{next_event, info, fetch_next}]; - false -> [] - end, - is_reference(TimerRef) andalso erlang:cancel_timer(TimerRef), - - {keep_state, State#state{timer_ref = undefined, acc_num = AccNum + 1, is_busy = false}, Actions}; - -%% 收到重发过期请求 -handle_event(info, {timeout, _, {repost_ticker, NorthData}}, connected, State = #state{postman_pid = PostmanPid}) -> - lager:debug("[iot_zd_endpoint] repost data: ~p", [NorthData]), - do_post(PostmanPid, NorthData), - TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}), - - {keep_state, State#state{timer_ref = TimerRef}}; - -%% 离线时,忽略超时逻辑 -handle_event(info, {timeout, _, {repost_ticker, _}}, disconnected, State) -> - {keep_state, State}; - -%% 获取当前统计信息 -handle_event({call, From}, get_stat, StateName, State = #state{acc_num = AccNum}) -> - Stat = #{ - <<"acc_num">> => AccNum, - <<"state_name">> => atom_to_binary(StateName) - }, - {keep_state, State, [{reply, From, Stat}]}; - -%% @private -%% @doc If callback_mode is handle_event_function, then whenever a -%% gen_statem receives an event from call/2, cast/2, or as a normal -%% process message, this function is called. -handle_event(EventType, Event, StateName, State) -> - lager:warning("[iot_zd_endpoint] unknown message, event_type: ~p, event: ~p, state_name: ~p, state: ~p", [EventType, Event, StateName, State]), - {keep_state, State}. - -%% @private -%% @doc This function is called by a gen_statem when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any -%% necessary cleaning up. When it returns, the gen_statem terminates with -%% Reason. The return value is ignored. -terminate(Reason, _StateName, #state{}) -> - lager:debug("[iot_zd_endpoint] terminate with reason: ~p", [Reason]), - ok. - -%% @private -%% @doc Convert process state when code is changed -code_change(_OldVsn, StateName, State = #state{}, _Extra) -> - {ok, StateName, State}. - -%%%=================================================================== -%%% Internal functions -%%%=================================================================== - -%% 对mqtt协议的支持, 只需要建立单个链接 -create_postman(Opts) -> - Host = proplists:get_value(host, Opts), - Port = proplists:get_value(port, Opts), - Username = proplists:get_value(username, Opts), - Password = proplists:get_value(password, Opts), - Topic = proplists:get_value(topic, Opts), - Qos = proplists:get_value(qos, Opts), - - Node = atom_to_binary(node()), - ClientId = <<"mqtt-client-", Node/binary, "-zhongdian_mqtt">>, - PostmanOpts = [ - {clientid, ClientId}, - {host, Host}, - {port, Port}, - {tcp_opts, []}, - {username, Username}, - {password, Password}, - {keepalive, 86400}, - {auto_ack, true}, - {connect_timeout, 5000}, - {proto_ver, v5}, - {retry_interval, 5000} - ], - - mqtt_postman:start_link(PostmanOpts, Topic, Qos). - --spec do_post(PostmanPid :: pid(), NorthData :: #north_data{}) -> no_return(). -do_post(PostmanPid, #north_data{id = Id, location_code = LocationCode, fields = Fields, timestamp = Timestamp}) when is_pid(PostmanPid) -> - Data = #{ - <<"version">> => <<"1.0">>, - <<"location_code">> => LocationCode, - <<"ts">> => Timestamp, - <<"repair">> => true, - <<"properties">> => Fields - }, - try - Body = iolist_to_binary(jiffy:encode(Data, [force_utf8])), - PostmanPid ! {post, self(), #post_data{id = Id, location_code = LocationCode, body = Body}} - catch _:_ -> - self() ! {ack, Id, <<"json error">>} - end. \ No newline at end of file diff --git a/apps/iot/src/iot_sup.erl b/apps/iot/src/iot_sup.erl index f1e442c..fc8ac1b 100644 --- a/apps/iot/src/iot_sup.erl +++ b/apps/iot/src/iot_sup.erl @@ -28,14 +28,7 @@ start_link() -> init([]) -> SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600}, Specs = [ - #{ - id => 'iot_zd_endpoint', - start => {'iot_zd_endpoint', start_link, []}, - restart => permanent, - shutdown => 2000, - type => worker, - modules => ['iot_zd_endpoint'] - } + ], {ok, {SupFlags, pools() ++ Specs}}. diff --git a/apps/iot/src/mnesia/mnesia_id_generator.erl b/apps/iot/src/mnesia/mnesia_id_generator.erl deleted file mode 100644 index e7a9b6c..0000000 --- a/apps/iot/src/mnesia/mnesia_id_generator.erl +++ /dev/null @@ -1,26 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author aresei -%%% @copyright (C) 2023, -%%% @doc -%%% -%%% @end -%%% Created : 04. 7月 2023 12:31 -%%%------------------------------------------------------------------- --module(mnesia_id_generator). --author("aresei"). --include("iot.hrl"). - -%% API --export([next_id/1, create_table/0]). - -create_table() -> - %% id生成器 - mnesia:create_table(id_generator, [ - {attributes, record_info(fields, id_generator)}, - {record_name, id_generator}, - {disc_copies, [node()]}, - {type, ordered_set} - ]). - -next_id(Tab) when is_atom(Tab) -> - mnesia:dirty_update_counter(id_generator, Tab, 1). \ No newline at end of file diff --git a/apps/iot/src/mnesia/mnesia_queue.erl b/apps/iot/src/mnesia/mnesia_queue.erl deleted file mode 100644 index bae59eb..0000000 --- a/apps/iot/src/mnesia/mnesia_queue.erl +++ /dev/null @@ -1,61 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author aresei -%%% @copyright (C) 2023, -%%% @doc -%%% -%%% @end -%%% Created : 26. 7月 2023 10:40 -%%%------------------------------------------------------------------- --module(mnesia_queue). --author("aresei"). --include("iot.hrl"). - --define(TAB_NAME, 'queue_data:zhongdian'). - -%% API --export([create_table/0]). --export([insert/1, delete/1, table_size/0, dirty_fetch_next/1]). - -create_table() -> - %% 数据转发缓存表 - mnesia:create_table(?TAB_NAME, [ - {attributes, record_info(fields, north_data)}, - {record_name, north_data}, - {disc_copies, [node()]}, - {type, ordered_set} - ]). - --spec insert(#north_data{}) -> ok | {error, Reason :: any()}. -insert(Item = #north_data{}) -> - Id = mnesia_id_generator:next_id(?TAB_NAME), - NItem = Item#north_data{id = Id}, - case mnesia:transaction(fun() -> mnesia:write(?TAB_NAME, NItem, write) end) of - {atomic, ok} -> - ok; - {aborted, Reason} -> - {error, Reason} - end. - --spec delete(Key :: any()) -> ok | {error, Reason :: any()}. -delete(Key) when is_integer(Key) -> - case mnesia:transaction(fun() -> mnesia:delete(?TAB_NAME, Key, write) end) of - {atomic, ok} -> - ok; - {aborted, Reason} -> - {error, Reason} - end. - --spec table_size() -> integer(). -table_size() -> - mnesia:table_info(?TAB_NAME, size). - --spec dirty_fetch_next(Cursor :: integer()) -> - {ok, NCursor :: integer(), Item :: any()} | '$end_of_table'. -dirty_fetch_next(Cursor) when is_integer(Cursor) -> - case mnesia:dirty_next(?TAB_NAME, Cursor) of - '$end_of_table' -> - '$end_of_table'; - NextKey -> - [Item] = mnesia:dirty_read(?TAB_NAME, NextKey), - {ok, NextKey, Item} - end. \ No newline at end of file diff --git a/apps/iot/src/mnesia/mnesia_totalizator.erl b/apps/iot/src/mnesia/mnesia_totalizator.erl deleted file mode 100644 index 833b9a9..0000000 --- a/apps/iot/src/mnesia/mnesia_totalizator.erl +++ /dev/null @@ -1,103 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author aresei -%%% @copyright (C) 2023, -%%% @doc -%%% -%%% @end -%%% Created : 26. 7月 2023 10:40 -%%%------------------------------------------------------------------- --module(mnesia_totalizator). --author("aresei"). --include("iot.hrl"). --include_lib("stdlib/include/qlc.hrl"). - --define(TAB_NAME, totalizator). - -%% API --export([create_table/0]). --export([increment_success/2, increment_fail/2, delete/2, table_size/0, query/2]). - -create_table() -> - %% id生成器 - mnesia:create_table(?TAB_NAME, [ - {attributes, record_info(fields, totalizator)}, - {record_name, totalizator}, - {disc_copies, [node()]}, - {type, ordered_set} - ]). - --spec query(SceneIds :: [integer()], Dates :: [calendar:date()]) -> [map()]. -query(SceneIds, Dates) when is_list(SceneIds), is_list(Dates) -> - lists:map(fun(Date) -> - Scenes = lists:map(fun(SceneId) -> - Key = {SceneId, Date}, - case mnesia:dirty_read(?TAB_NAME, Key) of - [R | _] -> - to_map(R); - [] -> - #{<<"scene_id">> => SceneId, <<"success_num">> => 0, <<"fail_num">> => 0} - end - end, SceneIds), - #{<<"date">> => format_date(Date), <<"scenes">> => Scenes} - end, Dates). - --spec increment_success(SceneId :: integer(), IncNum :: integer()) -> ok | {error, Reason :: any()}. -increment_success(SceneId, IncNum) when is_integer(SceneId), is_integer(IncNum) -> - increment(SceneId, success, IncNum). - --spec increment_fail(SceneId :: integer(), IncNum :: integer()) -> ok | {error, Reason :: any()}. -increment_fail(SceneId, IncNum) when is_integer(SceneId), is_integer(IncNum) -> - increment(SceneId, fail, IncNum). - --spec increment(SceneId :: integer(), Type :: atom(), IncNum :: integer()) -> ok | {error, Reason :: any()}. -increment(SceneId, Type, IncNum) when is_integer(SceneId), is_integer(IncNum), is_atom(Type) -> - {Date, _} = calendar:local_time(), - Key = {SceneId, Date}, - Fun = fun() -> - case mnesia:read(?TAB_NAME, Key) of - [R = #totalizator{option = Option = #option{success_num = SuccessNum, fail_num = FailNum}} | _] -> - NOption = case Type of - success -> - Option#option{success_num = SuccessNum + IncNum}; - fail -> - Option#option{fail_num = FailNum + IncNum} - end, - NR = R#totalizator{option = NOption}, - mnesia:write(?TAB_NAME, NR, write); - [] -> - Option = case Type of - success -> - #option{success_num = IncNum}; - fail -> - #option{fail_num = IncNum} - end, - R = #totalizator{key = Key, scene_id = SceneId, date = Date, option = Option}, - mnesia:write(?TAB_NAME, R, write) - end - end, - - case mnesia:transaction(Fun) of - {atomic, ok} -> - ok; - {aborted, Reason} -> - {error, Reason} - end. - --spec delete(SceneId :: integer(), Date :: calendar:date()) -> ok | {error, Reason :: any()}. -delete(SceneId, Date) when is_integer(SceneId), is_tuple(Date) -> - case mnesia:transaction(fun() -> mnesia:delete(?TAB_NAME, {SceneId, Date}, write) end) of - {atomic, ok} -> - ok; - {aborted, Reason} -> - {error, Reason} - end. - --spec table_size() -> integer(). -table_size() -> - mnesia:table_info(?TAB_NAME, size). - -to_map(#totalizator{scene_id = SceneId, option = #option{success_num = SuccessNum, fail_num = FailNum}}) -> - #{<<"scene_id">> => SceneId, <<"success_num">> => SuccessNum, <<"fail_num">> => FailNum}. - -format_date({Year, Month, Day}) -> - iolist_to_binary(io_lib:format("~b-~2..0b-~2..0b", [Year, Month, Day])). \ No newline at end of file diff --git a/apps/iot/src/consumer/iot_zd_consumer.erl b/apps/iot/src/mocker/iot_mqtt_consumer.erl similarity index 99% rename from apps/iot/src/consumer/iot_zd_consumer.erl rename to apps/iot/src/mocker/iot_mqtt_consumer.erl index 581c9a7..f463380 100644 --- a/apps/iot/src/consumer/iot_zd_consumer.erl +++ b/apps/iot/src/mocker/iot_mqtt_consumer.erl @@ -7,7 +7,7 @@ %%% @end %%% Created : 12. 3月 2023 21:27 %%%------------------------------------------------------------------- --module(iot_zd_consumer). +-module(iot_mqtt_consumer). -author("aresei"). -include("iot.hrl"). diff --git a/config/sys-dev.config b/config/sys-dev.config index c3af650..d3d0b46 100644 --- a/config/sys-dev.config +++ b/config/sys-dev.config @@ -36,23 +36,6 @@ {<<"test">>, <<"iot2023">>} ]}, - %% 配置中电的数据转发, mqtt协议 - {zhongdian, [ - {host, "39.98.184.67"}, - {port, 1883}, - {username, "test"}, - {password, "test1234"}, - {topic, "CET/NX/upload"}, - {qos, 2} - ]}, - - %% 金智调度系统 - {jinzhi, [ - {pri_key, "jinzhi_pri.key"}, - {url, "http://172.30.6.177:9080/device/push"}, - {pool_size, 10} - ]}, - {pools, [ %% redis连接池 {redis_pool, diff --git a/config/sys-prod.config b/config/sys-prod.config index 43abf60..15c4e5a 100644 --- a/config/sys-prod.config +++ b/config/sys-prod.config @@ -25,23 +25,6 @@ {api_url, "https://lgsiot.njau.edu.cn/api/v1/taskLog"}, - %% 配置中电的数据转发, mqtt协议 - {zhongdian, [ - {host, "172.30.6.161"}, - {port, 1883}, - {username, "admin"}, - {password, "123456"}, - {topic, "CET/NX/upload"}, - {qos, 2} - ]}, - - %% 金智调度系统 - {jinzhi, [ - {pri_key, "jinzhi_pri.key"}, - {url, "http://172.30.6.177:9080/device/push"}, - {pool_size, 10} - ]}, - {influxdb, [ {host, "172.19.0.4"}, {port, 8086},