对接金智系统

This commit is contained in:
anlicheng 2023-12-04 11:30:34 +08:00
parent 7f0b5e546c
commit 73ff03de9c
20 changed files with 1113 additions and 57 deletions

View File

@ -34,6 +34,8 @@
-define(METHOD_FEEDBACK_STEP, 16#05).
-define(METHOD_FEEDBACK_RESULT, 16#06).
-define(METHOD_EVENT, 16#07).
%% ai识别的事件上报
-define(METHOD_AI_EVENT, 16#08).
%%
-define(PACKET_REQUEST, 16#01).
@ -46,6 +48,9 @@
%%
-define(EVENT_HOST, 16#02).
%% ai相关的事件
-define(EVENT_AI, 16#03).
%%
-record(kv, {
key :: binary(),
@ -69,6 +74,14 @@
timestamp = 0 :: integer()
}).
%%
-record(event_data, {
id = 0 :: integer(),
location_code :: binary(),
event_type :: integer(),
params :: map()
}).
%%
-record(post_data, {
id = 0 :: integer(),

View File

@ -0,0 +1 @@
MIICeAIBADANBgkqhkiG9w0BAQEFAASCAmIwggJeAgEAAoGBALHOer3l1/Op2N9m8SGeoryvumNjcz7yD41YmqTjIEptA20l4k3MIT5R6iCwLeky2QGk/ZHn1es6Z7SCUFk6x4+dFZ40HuT7CeRPpeRo2U/vxPt/FzChClpo79TclCvJBemnOJ8bC0z/Afm/kfs3LSYNbNIA6qy+IitifIKg2DfpAgMBAAECgYAz0+rlNXz4Encbz2bUFOh8tYBP/ioWm/o6iiwxid7cst//zb4kTS8XeksTkicfxWmJ2CztfbVWJqUZ8a44BDEsxrbLwVvuAPNdUChyoOkT0LeYEaeVaV35m6Hv3EkCeTUne8GQA8Z4Fx4ndpO9YkttQuu/8UQZ0FM73wrNkN0zrQJBAPcDeO61ZgnC6jlbrHj82224g9AXT2UBYzP14TaWWElbF9y3lxMrQ+f/KYzDaE3BR2UZdihv601lze0MsxeCzR8CQQC4RnT6ekvAi9CCktCVV1HJ5kpzpqejNFTs9x4WJYKG14CwbMyDIaKobB/N4Ylv0qliPPDPs4V3DAuFZtnEEtH3AkEAioCE73wBAdor0QuJErHdK5F5P1XCq8TyZfEpXZ1BVahhId5DNHle8xeMqaPruSV1rcdwDE5s5pH9vDwRs04hSwJBAJ8QmotYI6maRqtfhdNTo5MPSbcY5V24n5JJIdxmFozE2x3vXH3Y++o8Ixv5kkRHaNUW25u+T/faGtvVUyawRDMCQQD1ApVjihrgogCGyk00shzBcEzA7ZUGZrI6Fwjf5oanbR2SLUUfnbGWnvdURV6Luq6YsIiFzCL69rjY5aB7EqEp

View File

@ -0,0 +1,26 @@
%%%-------------------------------------------------------------------
%%% @author aresei
%%% @copyright (C) 2023, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 16. 5 2023 12:48
%%%-------------------------------------------------------------------
-module(ai_event_logs_bo).
-author("aresei").
-include("iot.hrl").
-export([insert/4]).
%% API
-spec insert(HostUUID :: binary(), DeviceUUID :: binary(), EventType :: integer(), Content :: binary()) ->
ok | {ok, InsertId :: integer()} | {error, Reason :: any()}.
insert(HostUUID, DeviceUUID, EventType, Content) when is_integer(EventType), is_binary(HostUUID), is_binary(DeviceUUID), is_binary(Content) ->
mysql_pool:insert(mysql_iot, <<"ai_event_logs">>, #{
<<"event_type">> => EventType,
<<"host_uuid">> => HostUUID,
<<"device_uuid">> => DeviceUUID,
<<"content">> => Content,
<<"created_at">> => calendar:local_time()
}, true).

View File

@ -0,0 +1,170 @@
%%%-------------------------------------------------------------------
%%% @author aresei
%%% @copyright (C) 2023, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 06. 7 2023 12:02
%%%-------------------------------------------------------------------
-module(iot_http_endpoint).
-author("aresei").
-include("iot.hrl").
-behaviour(gen_statem).
%% API
-export([start_link/2]).
-export([get_pid/1, forward/4, get_stat/0]).
%% gen_statem callbacks
-export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]).
%%
-define(RETRY_INTERVAL, 5000).
-record(state, {
postman_pid :: undefined | pid(),
pool_size = 0,
flight_num = 0,
id = 1,
queue :: queue:queue(),
%%
timer_map = #{},
%%
acc_num = 0
}).
%%%===================================================================
%%% API
%%%===================================================================
-spec get_pid(Name :: atom()) -> undefined | pid().
get_pid(Name) when is_atom(Name) ->
whereis(Name).
-spec forward(Pid :: pid(), LocationCode :: binary(), EventType :: integer(), Params :: map()) -> no_return().
forward(Pid, LocationCode, EventType, Params) when is_pid(Pid), is_binary(LocationCode), is_integer(EventType), is_map(Params) ->
gen_statem:cast(Pid, {forward, LocationCode, EventType, Params}).
-spec get_stat() -> {ok, Stat :: #{}}.
get_stat() ->
gen_statem:call(?MODULE, get_stat, 5000).
%% @doc Creates a gen_statem process which calls Module:init/1 to
%% initialize. To ensure a synchronized start-up procedure, this
%% function does not return until Module:init/1 has returned.
start_link(Name, Opts) when is_atom(Name), is_list(Opts) ->
gen_statem:start_link({local, Name}, ?MODULE, [Opts], []).
%%%===================================================================
%%% gen_statem callbacks
%%%===================================================================
%% @private
%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or
%% gen_statem:start_link/[3,4], this function is called by the new
%% process to initialize.
init([Opts]) ->
PoolSize = proplists:get_value(pool_size, Opts),
{ok, PostmanPid} = broker_postman:start_link(http_postman, Opts, PoolSize),
{ok, connected, #state{postman_pid = PostmanPid, pool_size = PoolSize, queue = queue:new()}}.
%% @private
%% @doc This function is called by a gen_statem when it needs to find out
%% the callback mode of the callback module.
callback_mode() ->
handle_event_function.
%% @private
%% @doc There should be one instance of this function for each possible
%% state name. If callback_mode is state_functions, one of these
%% functions is called when gen_statem receives and event from
%% call/2, cast/2, or as a normal process message.
handle_event(cast, {forward, LocationCode, EventType, Params}, _, State = #state{id = Id, flight_num = FlightNum, pool_size = PoolSize, queue = Q}) ->
EventData = #event_data{id = Id, location_code = LocationCode, event_type = EventType, params = Params},
%%
Actions = case FlightNum < PoolSize of
true -> [{next_event, info, fetch_next}];
false -> []
end,
{keep_state, State#state{queue = queue:in(EventData, Q), id = Id + 1}, Actions};
%%
handle_event(info, fetch_next, _, State = #state{postman_pid = PostmanPid, queue = Q, flight_num = FlightNum, timer_map = TimerMap}) ->
case queue:out(Q) of
{{value, EventData = #event_data{id = Id}}, Q1} ->
lager:debug("[iot_http_endpoint] fetch_next success, event data is: ~p", [EventData]),
do_post(PostmanPid, EventData),
TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, EventData}),
{keep_state, State#state{timer_map = maps:put(Id, TimerRef, TimerMap), queue = Q1, flight_num = FlightNum + 1}};
{empty, Q1} ->
{keep_state, State#state{queue = Q1}}
end;
%%
handle_event(info, {ack, Id}, _, State = #state{timer_map = TimerMap, acc_num = AccNum, flight_num = FlightNum}) ->
lager:debug("[iot_zd_endpoint] get ack: ~p", [Id]),
case maps:take(Id, TimerMap) of
error ->
{keep_state, State#state{acc_num = AccNum + 1, flight_num = FlightNum - 1}, [{next_event, info, fetch_next}]};
{TimerRef, NTimerMap} ->
is_reference(TimerRef) andalso erlang:cancel_timer(TimerRef),
{keep_state, State#state{timer_map = NTimerMap, acc_num = AccNum + 1, flight_num = FlightNum - 1}, [{next_event, info, fetch_next}]}
end;
%%
handle_event(info, {timeout, _, {repost_ticker, EventData = #event_data{id = Id}}}, _, State = #state{postman_pid = PostmanPid, timer_map = TimerMap}) ->
lager:debug("[iot_zd_endpoint] repost data: ~p", [EventData]),
do_post(PostmanPid, EventData),
TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, EventData}),
{keep_state, State#state{timer_map = maps:put(Id, TimerRef, TimerMap)}};
%%
handle_event({call, From}, get_stat, StateName, State = #state{acc_num = AccNum}) ->
Stat = #{
<<"acc_num">> => AccNum,
<<"queue_num">> => mnesia_queue:table_size(),
<<"state_name">> => atom_to_binary(StateName)
},
{keep_state, State, [{reply, From, Stat}]};
%% @private
%% @doc If callback_mode is handle_event_function, then whenever a
%% gen_statem receives an event from call/2, cast/2, or as a normal
%% process message, this function is called.
handle_event(EventType, Event, StateName, State) ->
lager:warning("[iot_zd_endpoint] unknown message, event_type: ~p, event: ~p, state_name: ~p, state: ~p", [EventType, Event, StateName, State]),
{keep_state, State}.
%% @private
%% @doc This function is called by a gen_statem when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_statem terminates with
%% Reason. The return value is ignored.
terminate(Reason, _StateName, #state{}) ->
lager:debug("[iot_zd_endpoint] terminate with reason: ~p", [Reason]),
ok.
%% @private
%% @doc Convert process state when code is changed
code_change(_OldVsn, StateName, State = #state{}, _Extra) ->
{ok, StateName, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
-spec do_post(PostmanPid :: pid(), EventData :: #event_data{}) -> no_return().
do_post(PostmanPid, #event_data{id = Id, location_code = LocationCode, event_type = EventType, params = Params}) when is_pid(PostmanPid) ->
Data = #{
<<"version">> => <<"1.0">>,
<<"event_type">> => EventType,
<<"params">> => Params
},
Body = iolist_to_binary(jiffy:encode(Data, [force_utf8])),
PostmanPid ! {post, self(), #post_data{id = Id, location_code = LocationCode, body = Body}},
ok.

View File

@ -0,0 +1,271 @@
%%%-------------------------------------------------------------------
%%% @author aresei
%%% @copyright (C) 2023, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 06. 7 2023 12:02
%%%-------------------------------------------------------------------
-module(iot_jinzhi_endpoint).
-author("aresei").
-include("iot.hrl").
-behaviour(gen_statem).
%% API
-export([start_link/1]).
-export([get_pid/0, forward/3, get_stat/0]).
%% gen_statem callbacks
-export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]).
%%
-define(RETRY_INTERVAL, 5000).
%% id
-define(SYS_ID, <<"ZNWLZJJKXT">>).
-record(state, {
url :: string(),
logger_pid :: pid(),
pool_size = 0,
flight_num = 0,
pri_key :: public_key:private_key(),
id = 1,
queue :: queue:queue(),
%%
timer_map = #{},
%%
acc_num = 0
}).
%%%===================================================================
%%% API
%%%===================================================================
-spec get_pid() -> undefined | pid().
get_pid() ->
whereis(?MODULE).
-spec forward(LocationCode :: binary(), EventType :: integer(), Params :: map()) -> no_return().
forward(LocationCode, EventType, Params) when is_binary(LocationCode), is_integer(EventType), is_map(Params) ->
gen_statem:cast(?MODULE, {forward, LocationCode, EventType, Params}).
-spec get_stat() -> {ok, Stat :: #{}}.
get_stat() ->
gen_statem:call(?MODULE, get_stat, 5000).
%% @doc Creates a gen_statem process which calls Module:init/1 to
%% initialize. To ensure a synchronized start-up procedure, this
%% function does not return until Module:init/1 has returned.
start_link(Opts) when is_list(Opts) ->
gen_statem:start_link({local, ?MODULE}, ?MODULE, [Opts], []).
%%%===================================================================
%%% gen_statem callbacks
%%%===================================================================
%% @private
%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or
%% gen_statem:start_link/[3,4], this function is called by the new
%% process to initialize.
init([Opts]) ->
PoolSize = proplists:get_value(pool_size, Opts),
PriFile = proplists:get_value(pri_key, Opts),
Url = proplists:get_value(url, Opts),
{ok, LoggerPid} = iot_logger:start_link("ai_event_data"),
PriKey = generate_private_key(PriFile),
{ok, connected, #state{url = Url, logger_pid = LoggerPid, pri_key = PriKey, pool_size = PoolSize, queue = queue:new()}}.
%% @private
%% @doc This function is called by a gen_statem when it needs to find out
%% the callback mode of the callback module.
callback_mode() ->
handle_event_function.
%% @private
%% @doc There should be one instance of this function for each possible
%% state name. If callback_mode is state_functions, one of these
%% functions is called when gen_statem receives and event from
%% call/2, cast/2, or as a normal process message.
handle_event(cast, {forward, LocationCode, EventType, Params}, _, State = #state{id = Id, flight_num = FlightNum, pool_size = PoolSize, queue = Q}) ->
EventData = #event_data{id = Id, location_code = LocationCode, event_type = EventType, params = Params},
%%
Actions = case FlightNum < PoolSize of
true -> [{next_event, info, fetch_next}];
false -> []
end,
{keep_state, State#state{queue = queue:in(EventData, Q), id = Id + 1}, Actions};
%%
handle_event(info, fetch_next, _, State = #state{queue = Q, flight_num = FlightNum, timer_map = TimerMap}) ->
case queue:out(Q) of
{{value, EventData = #event_data{id = Id}}, Q1} ->
lager:debug("[iot_http_endpoint] fetch_next success, event data is: ~p", [EventData]),
do_post(EventData, State),
TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, EventData}),
{keep_state, State#state{timer_map = maps:put(Id, TimerRef, TimerMap), queue = Q1, flight_num = FlightNum + 1}};
{empty, Q1} ->
{keep_state, State#state{queue = Q1}}
end;
%%
handle_event(info, {ack, Id}, _, State = #state{timer_map = TimerMap, acc_num = AccNum, flight_num = FlightNum}) ->
case maps:take(Id, TimerMap) of
error ->
{keep_state, State#state{acc_num = AccNum + 1, flight_num = FlightNum - 1}, [{next_event, info, fetch_next}]};
{TimerRef, NTimerMap} ->
is_reference(TimerRef) andalso erlang:cancel_timer(TimerRef),
{keep_state, State#state{timer_map = NTimerMap, acc_num = AccNum + 1, flight_num = FlightNum - 1}, [{next_event, info, fetch_next}]}
end;
%%
handle_event(info, {timeout, _, {repost_ticker, EventData = #event_data{id = Id}}}, _, State = #state{timer_map = TimerMap}) ->
lager:debug("[iot_jinzhi_endpoint] repost data: ~p", [EventData]),
do_post(EventData, State),
TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, EventData}),
{keep_state, State#state{timer_map = maps:put(Id, TimerRef, TimerMap)}};
%% Task进程挂掉
handle_event(info, {'DOWN', _MRef, process, _Pid, normal}, _, State) ->
{keep_state, State};
handle_event(info, {'DOWN', _MRef, process, _Pid, Reason}, _, State) ->
lager:notice("[iot_jinzhi_endpoint] task process down with reason: ~p", [Reason]),
{keep_state, State};
%%
handle_event({call, From}, get_stat, StateName, State = #state{acc_num = AccNum}) ->
Stat = #{
<<"acc_num">> => AccNum,
<<"queue_num">> => mnesia_queue:table_size(),
<<"state_name">> => atom_to_binary(StateName)
},
{keep_state, State, [{reply, From, Stat}]};
%% @private
%% @doc If callback_mode is handle_event_function, then whenever a
%% gen_statem receives an event from call/2, cast/2, or as a normal
%% process message, this function is called.
handle_event(EventType, Event, StateName, State) ->
lager:warning("[iot_jinzhi_endpoint] unknown message, event_type: ~p, event: ~p, state_name: ~p, state: ~p", [EventType, Event, StateName, State]),
{keep_state, State}.
%% @private
%% @doc This function is called by a gen_statem when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_statem terminates with
%% Reason. The return value is ignored.
terminate(Reason, _StateName, #state{}) ->
lager:debug("[iot_jinzhi_endpoint] terminate with reason: ~p", [Reason]),
ok.
%% @private
%% @doc Convert process state when code is changed
code_change(_OldVsn, StateName, State = #state{}, _Extra) ->
{ok, StateName, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
-spec do_post(EventData :: #event_data{}, State :: #state{}) -> no_return().
do_post(#event_data{id = Id, location_code = LocationCode, event_type = EventType, params = Params},
#state{pri_key = PriKey, url = Url, logger_pid = LoggerPid}) ->
lager:debug("[iot_jinzhi_endpoint] do_post, event_type: ~p, params: ~p, location_code: ~p", [EventType, Params, LocationCode]),
<<Loc:12/binary, Category:5/binary, _/binary>> = LocationCode,
DeviceInfo = #{
<<"location">> => Loc,
<<"category">> => Category,
<<"description">> => <<"校门口花坛损坏"/utf8>>,
<<"occurrenceTime">> => <<"2023-06-10 12:00:00">>,
<<"attachments">> => [
#{
<<"name">> => <<"损坏图片.jpg"/utf8>>,
<<"url">> => <<"http://www.baidu.com">>
},
#{
<<"name">> => <<"损坏图片.jpg"/utf8>>,
<<"url">> => <<"http://www.baidu.com">>
}
]
},
ReqData = #{
<<"sign">> => sign(DeviceInfo, PriKey),
<<"sysId">> => ?SYS_ID,
<<"deviceInfo">> => DeviceInfo
},
Body = iolist_to_binary(jiffy:encode(ReqData, [force_utf8])),
ReceiverPid = self(),
%%
spawn_monitor(fun() ->
Headers = [
{<<"content-type">>, <<"application/json">>}
],
case hackney:request(post, Url, Headers, Body, [{pool, false}]) of
{ok, 200, _, ClientRef} ->
{ok, RespBody} = hackney:body(ClientRef),
hackney:close(ClientRef),
iot_logger:write(LoggerPid, [Body, RespBody]),
ReceiverPid ! {ack, Id};
{ok, HttpCode, _, ClientRef} ->
{ok, RespBody} = hackney:body(ClientRef),
hackney:close(ClientRef),
lager:warning("[iot_jinzhi_endpoint] send body: ~p, get error is: ~p", [Body, {HttpCode, RespBody}]);
{error, Reason} ->
lager:warning("[iot_jinzhi_endpoint] send body: ~p, get error is: ~p", [Body, Reason])
end
end).
-spec generate_private_key(PriFile :: string()) -> public_key:private_key().
generate_private_key(PriFile) when is_list(PriFile) ->
PriKeyFile = code:priv_dir(iot) ++ "/" ++ PriFile,
%%
{ok, PriKeyData} = file:read_file(PriKeyFile),
PriDerData = base64:decode(PriKeyData),
public_key:der_decode('PrivateKeyInfo', PriDerData).
%%
-spec sign(M :: #{}, PrivateKey :: public_key:private_key()) -> binary().
sign(M, PrivateKey) when is_map(M) ->
Json = serialize(M),
Hash = iolist_to_binary(io_lib:format("~64.16.0b", [binary:decode_unsigned(crypto:hash(sha256, Json))])),
RsaEncoded = public_key:encrypt_private(Hash, PrivateKey),
base64:encode(RsaEncoded).
%% sign签名
-spec serialize(M :: map()) -> JsonString :: binary().
serialize(M) when is_map(M) ->
L = maps:to_list(M),
L1 = lists:sort(fun({K, _}, {K1, _}) -> K < K1 end, L),
serialize(L1, []).
serialize([], Target) ->
B = iolist_to_binary(lists:join(<<$,>>, lists:reverse(Target))),
<<${, B/binary, $}>>;
serialize([{K, V}|T], Target) ->
V1 = if
is_integer(V) ->
integer_to_binary(V);
is_float(V) ->
float_to_binary(V);
is_binary(V) ->
<<$", V/binary, $">>;
is_boolean(V) andalso V ->
<<"true">>;
is_boolean(V) andalso not V ->
<<"false">>;
is_list(V) ->
Items = lists:map(fun(E) -> serialize(E) end, V),
V0 = iolist_to_binary(lists:join(<<$,>>, Items)),
<<$[, V0/binary, $]>>
end,
Item = <<$", K/binary, $", $:, V1/binary>>,
serialize(T, [Item|Target]).

View File

@ -25,6 +25,7 @@
-record(state, {
mqtt_opts = [],
postman_pid :: undefined | pid(),
logger_pid :: pid(),
%% #north_data的id
cursor = 0 :: integer(),
@ -71,8 +72,10 @@ init([Opts]) ->
erlang:process_flag(trap_exit, true),
%% ,
erlang:start_timer(0, self(), create_postman),
%%
{ok, LoggerPid} = iot_logger:start_link("north_data"),
{ok, disconnected, #state{mqtt_opts = Opts, postman_pid = undefined}}.
{ok, disconnected, #state{mqtt_opts = Opts, postman_pid = undefined, logger_pid = LoggerPid}}.
%% @private
%% @doc This function is called by a gen_statem when it needs to find out
@ -114,7 +117,10 @@ handle_event(info, fetch_next, connected, State = #state{postman_pid = PostmanPi
end;
%%
handle_event(info, {ack, Id}, StateName, State = #state{timer_ref = TimerRef, acc_num = AccNum}) ->
handle_event(info, {ack, Id, AssocMessage}, StateName, State = #state{timer_ref = TimerRef, acc_num = AccNum, logger_pid = LoggerPid}) ->
%%
iot_logger:write(LoggerPid, AssocMessage),
ok = mnesia_queue:delete(Id),
lager:debug("[iot_zd_endpoint] get ack: ~p", [Id]),
Actions = case StateName =:= connected of

View File

@ -0,0 +1,26 @@
%%%-------------------------------------------------------------------
%%% @author aresei
%%% @copyright (C) 2023, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 04. 7 2023 11:30
%%%-------------------------------------------------------------------
-module(iot_ai_router).
-author("aresei").
-include("iot.hrl").
%% API
-export([route_uuid/3]).
-spec route_uuid(RouterUUID :: binary(), EventType :: integer(), Params :: map()) -> no_return().
route_uuid(RouterUUID, EventType, Params) when is_binary(RouterUUID), is_integer(EventType), is_map(Params) ->
%%
case redis_client:hget(RouterUUID, <<"location_code">>) of
{ok, undefined} ->
lager:debug("[iot_ai_router] the event_data hget location_code, uuid: ~p, not found", [RouterUUID]);
{ok, LocationCode} when is_binary(LocationCode) ->
iot_jinzhi_endpoint:forward(LocationCode, EventType, Params);
{error, Reason} ->
lager:debug("[iot_ai_router] the event_data hget location_code uuid: ~p, get error: ~p", [RouterUUID, Reason])
end.

View File

@ -76,8 +76,10 @@ start_mnesia() ->
LoadTables = [id_generator, QueueTab],
case lists:all(fun(Tab) -> lists:member(Tab, Tables) end, LoadTables) of
true ->
lager:debug("[iot_app] waiting for mnesia start: ~p", [LoadTables]),
%%
mnesia:wait_for_tables(LoadTables, infinity);
mnesia:wait_for_tables(LoadTables, infinity),
lager:debug("[iot_app] waiting for mnesia end");
false ->
lager:warning("[iot_app] tables: ~p not exists, recreate mnesia schema", [LoadTables]),
%% schema

View File

@ -10,7 +10,7 @@
-author("aresei").
%% API
-export([encode/2, decode/2]).
-export([encode/2, decode/2, private_encode/2]).
%%
decode(Data, PrivateKey) when is_binary(Data), is_binary(PrivateKey) ->
@ -27,3 +27,8 @@ encode(Data, PublicKey) when is_binary(Data), is_binary(PublicKey) ->
[Pub] = public_key:pem_decode(PublicKey),
PubKey = public_key:pem_entry_decode(Pub),
public_key:encrypt_public(Data, PubKey).
private_encode(Data, PrivateKey) when is_binary(Data), is_binary(PrivateKey) ->
[Private] = public_key:pem_decode(PrivateKey),
PriKey = public_key:pem_entry_decode(Private),
public_key:encrypt_private(Data, PriKey).

View File

@ -13,7 +13,7 @@
%% API
-export([get_name/1, get_pid/1]).
-export([start_link/2, is_activated/1, change_status/2, reload/1, auth/2]).
-export([start_link/2, is_activated/1, is_alive/1, change_status/2, reload/1, auth/2]).
%% gen_statem callbacks
-export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]).
@ -35,6 +35,20 @@
%%% API
%%%===================================================================
-spec is_alive(DeviceUUID :: binary()) -> error | {ok, Pid :: pid()}.
is_alive(DeviceUUID) when is_binary(DeviceUUID) ->
case iot_device:get_pid(DeviceUUID) of
undefined ->
error;
DevicePid when is_pid(DevicePid) ->
case iot_device:is_activated(DevicePid) of
true ->
{ok, DevicePid};
false ->
error
end
end.
-spec get_pid(DeviceUUID :: binary()) -> Pid :: pid() | undefined.
get_pid(DeviceUUID) when is_binary(DeviceUUID) ->
whereis(get_name(DeviceUUID)).

View File

@ -406,6 +406,37 @@ handle_event(cast, {handle, {event, Event0}}, ?STATE_ACTIVATED, State = #state{u
end,
{keep_state, State};
handle_event(cast, {handle, {ai_event, Event0}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, aes = AES, has_session = true}) ->
EventText = iot_cipher_aes:decrypt(AES, Event0),
lager:debug("[iot_host] uuid: ~p, get ai_event: ~p", [UUID, EventText]),
case catch jiffy:decode(EventText, [return_maps]) of
#{<<"event_type">> := EventType, <<"params">> := Params = #{<<"device_uuid">> := DeviceUUID, <<"filename">> := Filename}} ->
case iot_device:is_alive(DeviceUUID) of
error ->
ok;
{ok, DevicePid} ->
case iot_util:file_uri(Filename) of
{ok, FileUri} ->
Params1 = maps:put(<<"url">>, FileUri, Params),
%% mysql
Message = iolist_to_binary(jiffy:encode(Params1, [force_utf8])),
ai_event_logs_bo:insert(UUID, DeviceUUID, EventType, Message),
iot_device:change_status(DevicePid, ?DEVICE_ONLINE);
%%
%iot_ai_router:route_uuid(DeviceUUID, EventType, Params);
error ->
lager:warning("[iot_host] host: ~p, event: ~p, filename: ~p invalid or device is not activated", [UUID, EventType, Filename])
end
end;
Event when is_map(Event) ->
lager:warning("[iot_host] host: ~p, event: ~p, not supported", [UUID, Event]);
Other ->
lager:warning("[iot_host] host: ~p, event error: ~p", [UUID, Other])
end,
{keep_state, State};
%%
handle_event(cast, heartbeat, _, State = #state{heartbeat_counter = HeartbeatCounter}) ->
{keep_state, State#state{heartbeat_counter = HeartbeatCounter + 1}};

View File

@ -12,16 +12,16 @@
-behaviour(gen_server).
%% API
-export([start_link/0, write/1]).
-export([start_link/1, write/2]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-define(LOG_FILE, "north_data").
-record(state, {
file_path :: string(),
file_name :: string(),
date :: calendar:date(),
file
}).
@ -29,14 +29,14 @@
%%% API
%%%===================================================================
write(Data) ->
gen_server:cast(?MODULE, {write, Data}).
write(Pid, Data) when is_pid(Pid) ->
gen_server:cast(Pid, {write, Data}).
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link() ->
-spec(start_link(FileName :: string()) ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
start_link(FileName) when is_list(FileName) ->
gen_server:start_link(?MODULE, [FileName], []).
%%%===================================================================
%%% gen_server callbacks
@ -47,12 +47,12 @@ start_link() ->
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([]) ->
init([FileName]) ->
ensure_dir(),
FilePath = make_file(),
FilePath = make_file(FileName),
{ok, File} = file:open(FilePath, [append, binary]),
{ok, #state{file = File, file_path = FilePath}}.
{ok, #state{file = File, file_name = FileName, date = get_date()}}.
%% @private
%% @doc Handling call messages
@ -73,19 +73,19 @@ handle_call(_Request, _From, State = #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast({write, Data}, State = #state{file_path = OldFilePath, file = OldFile}) ->
handle_cast({write, Data}, State = #state{file = OldFile, file_name = FileName, date = Date}) ->
Line = <<(time_prefix())/binary, " ", (format(Data))/binary, $\n>>,
FilePath = make_file(),
case FilePath =:= OldFilePath of
case maybe_new_file(Date) of
true ->
ok = file:write(OldFile, Line),
{noreply, State};
false ->
file:close(OldFile),
FilePath = make_file(FileName),
{ok, File} = file:open(FilePath, [append, binary]),
ok = file:write(File, Line),
{noreply, State#state{file_path = FilePath, file = File}}
{noreply, State#state{file = File, date = get_date()}};
false ->
ok = file:write(OldFile, Line),
{noreply, State}
end.
%% @private
@ -128,12 +128,12 @@ time_prefix() ->
{{Y, M, D}, {H, I, S}} = calendar:local_time(),
iolist_to_binary(io_lib:format("[~b-~2..0b-~2..0b ~2..0b:~2..0b:~2..0b]", [Y, M, D, H, I, S])).
-spec make_file() -> string().
make_file() ->
-spec make_file(LogFile :: string()) -> string().
make_file(LogFile) when is_list(LogFile) ->
{Year, Month, Day} = erlang:date(),
Suffix = io_lib:format("~b~2..0b~2..0b-", [Year, Month, Day]),
Suffix = io_lib:format("~b~2..0b~2..0b", [Year, Month, Day]),
RootDir = code:root_dir() ++ "/log/",
lists:flatten(RootDir ++ ?LOG_FILE ++ "." ++ Suffix).
lists:flatten(RootDir ++ LogFile ++ "." ++ Suffix).
ensure_dir() ->
RootDir = code:root_dir() ++ "/log/",
@ -143,3 +143,15 @@ ensure_dir() ->
false ->
file:make_dir(RootDir)
end.
%%
-spec get_date() -> Date :: calendar:date().
get_date() ->
{Date, _} = calendar:local_time(),
Date.
%%
-spec maybe_new_file(Date :: calendar:date()) -> boolean().
maybe_new_file({Y, M, D}) ->
{{Y0, M0, D0}, _} = calendar:local_time(),
not (Y =:= Y0 andalso M =:= M0 andalso D =:= D0).

View File

@ -27,18 +27,10 @@ start_link() ->
%% modules => modules()} % optional
init([]) ->
{ok, MqttOpts} = application:get_env(iot, zhongdian),
{ok, JinZhiOpts} = application:get_env(iot, jinzhi),
SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600},
Specs = [
#{
id => 'iot_logger',
start => {'iot_logger', start_link, []},
restart => permanent,
shutdown => 2000,
type => worker,
modules => ['iot_logger']
},
#{
id => 'iot_database_buffer',
start => {'iot_database_buffer', start_link, []},
@ -73,6 +65,15 @@ init([]) ->
shutdown => 2000,
type => worker,
modules => ['iot_zd_endpoint']
},
#{
id => 'iot_jinzhi_endpoint',
start => {'iot_jinzhi_endpoint', start_link, [JinZhiOpts]},
restart => permanent,
shutdown => 2000,
type => worker,
modules => ['iot_jinzhi_endpoint']
}
],

View File

@ -10,7 +10,7 @@
-author("licheng5").
%% API
-export([timestamp/0, number_format/2, current_time/0, timestamp_of_seconds/0, float_to_binary/2, int_format/2]).
-export([timestamp/0, number_format/2, current_time/0, timestamp_of_seconds/0, float_to_binary/2, int_format/2, file_uri/1]).
-export([step/3, chunks/2, rand_bytes/1, uuid/0, md5/1, parse_mapper/1]).
-export([json_data/1, json_error/2]).
-export([queue_limited_in/3, assert_call/2, assert/2]).
@ -138,3 +138,14 @@ assert(false, F) when is_function(F) ->
F();
assert(false, Msg) ->
throw(Msg).
-spec file_uri(Filename :: binary()) -> error | {ok, FileUri :: binary()}.
file_uri(Filename) when is_binary(Filename) ->
case binary:split(Filename, <<"-">>, [global]) of
[Year, Month, Day | _] ->
{ok, <<"/upload/", Year/binary, $/, Month/binary, $/, Day/binary, $/, Filename/binary>>};
_ ->
error
end.

View File

@ -26,6 +26,7 @@
%%% API
%%%===================================================================
%% 便poolboy调用proplist
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link(Args :: proplists:proplist()) ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
@ -81,16 +82,15 @@ handle_info({post, ReceiverPid, #post_data{id = Id, body = Body}}, State = #stat
{ok, 200, _, ClientRef} ->
{ok, RespBody} = hackney:body(ClientRef),
hackney:close(ClientRef),
lager:debug("[http_postman] url: ~p, response is: ~p", [Url, RespBody]),
ReceiverPid ! {ack, Id},
ReceiverPid ! {ack, Id, {ok, Body, RespBody}},
{noreply, State};
{ok, HttpCode, _, ClientRef} ->
{ok, RespBody} = hackney:body(ClientRef),
hackney:close(ClientRef),
lager:debug("[http_postman] url: ~p, http_code: ~p, response is: ~p", [Url, HttpCode, RespBody]),
ReceiverPid ! {ack, Id, {error, Body, {HttpCode, RespBody}}},
{noreply, State};
{error, Reason} ->
lager:warning("[http_postman] url: ~p, get error: ~p", [Url, Reason]),
ReceiverPid ! {ack, Id, {error, Body, Reason}},
{noreply, State}
end.

View File

@ -90,8 +90,7 @@ handle_info({publish, Message = #{packet_id := _PacketId, payload := Payload}},
handle_info({puback, #{packet_id := PacketId}}, State = #state{inflight = Inflight}) ->
case maps:take(PacketId, Inflight) of
{{Id, ReceiverPid, AssocMessage}, RestInflight} ->
iot_logger:write(AssocMessage),
ReceiverPid ! {ack, Id},
ReceiverPid ! {ack, Id, AssocMessage},
{noreply, State#state{inflight = RestInflight}};
error ->
{noreply, State}

View File

@ -114,6 +114,10 @@ websocket_handle({binary, <<?PACKET_REQUEST, _PacketId:32, ?METHOD_EVENT:8, Ciph
iot_host:handle(HostPid, {event, CipherEvent}),
{ok, State};
websocket_handle({binary, <<?PACKET_REQUEST, _PacketId:32, ?METHOD_AI_EVENT:8, CipherEvent/binary>>}, State = #state{host_pid = HostPid}) when is_pid(HostPid) ->
iot_host:handle(HostPid, {ai_event, CipherEvent}),
{ok, State};
%%
websocket_handle({binary, <<?PACKET_PUBLISH_RESPONSE, 0:32, Body/binary>>}, State = #state{uuid = UUID}) ->
lager:debug("[ws_channel] uuid: ~p, get send response message: ~p", [UUID, Body]),

View File

@ -44,6 +44,13 @@
{qos, 2}
]},
%% 金智调度系统
{jinzhi, [
{pri_key, "jinzhi_pri.key"},
{url, "http://172.30.6.177:9080/device/push"},
{pool_size, 10}
]},
{pools, [
%% mysql连接池配置
{mysql_iot,
@ -64,7 +71,7 @@
{redis_pool,
[{size, 10}, {max_overflow, 20}, {worker_module, eredis}],
[
{host, "127.0.0.1"},
{host, "39.98.184.67"},
{port, 26379},
{database, 1}
]
@ -113,9 +120,10 @@
{handlers, [
%% debug | info | warning | error, 日志级别
{lager_console_backend, debug},
{lager_file_backend, [{file, "error.log"}, {level, error}]},
{lager_file_backend, [{file, "debug.log"}, {level, debug}]},
{lager_file_backend, [{file, "info.log"}, {level, info}]}
{lager_file_backend, [{file, "debug.log"}, {level, debug}, {size, 314572800}]},
{lager_file_backend, [{file, "notice.log"}, {level, notice}, {size, 314572800}]},
{lager_file_backend, [{file, "error.log"}, {level, error}, {size, 314572800}]},
{lager_file_backend, [{file, "info.log"}, {level, info}, {size, 314572800}]}
]}
]}

View File

@ -33,6 +33,13 @@
{qos, 2}
]},
%% 金智调度系统
{jinzhi, [
{pri_key, "jinzhi_pri.key"},
{url, "http://172.30.6.177:9080/device/push"},
{pool_size, 10}
]},
{pools, [
%% mysql连接池配置
{mysql_iot,
@ -102,9 +109,10 @@
{handlers, [
%% debug | info | warning | error, 日志级别
{lager_console_backend, debug},
{lager_file_backend, [{file, "error.log"}, {level, error}]},
{lager_file_backend, [{file, "debug.log"}, {level, debug}]},
{lager_file_backend, [{file, "info.log"}, {level, info}]}
{lager_file_backend, [{file, "debug.log"}, {level, debug}, {size, 314572800}]},
{lager_file_backend, [{file, "notice.log"}, {level, notice}, {size, 314572800}]},
{lager_file_backend, [{file, "error.log"}, {level, error}, {size, 314572800}]},
{lager_file_backend, [{file, "info.log"}, {level, info}, {size, 314572800}]}
]}
]}

View File

@ -206,10 +206,458 @@
```text
{
"key": "light_status",
"value": int, 0: 正常, 1: 不, 2: 不亮, 3: 异常波动
"value": int, 0: 正常, 1: 不, 2: 不亮, 3: 异常波动
"type": "SOE",
"unit": 0,
"name": "是否损坏",
"timestamp": int(10)
}
```
# 施耐德强电
## A相电压
```text
{
"key":"voltage_a",
"label":"v",
"name":"A相电压",
"timestamp":1701395303,
"type":"AI",
"unit":0,
"value":9119
}
```
## B相电压
```text
{
"key":"voltage_b",
"label":"v",
"name":"B相电压",
"timestamp":1701395303,
"type":"AI",
"unit":0,
"value":8815
}
```
## C相电压
```text
{
"key":"voltage_c",
"label":"v",
"name":"C相电压",
"timestamp":1701395303,
"type":"AI",
"unit":0,
"value":422
}
```
## A相电流
```text
{
"key":"currency_a",
"label":"a",
"name":"A相电流",
"timestamp":1701395303,
"type":"AI",
"unit":3,
"value":0
}
```
## B相电流
```text
{
"key":"currency_b",
"label":"a",
"name":"B相电流",
"timestamp":1701395303,
"type":"AI",
"unit":3,
"value":11
}
```
## C相电流
```text
{
"key":"currency_c",
"label":"a",
"name":"C相电流",
"timestamp":1701395303,
"type":"AI",
"unit":3,
"value":13
}
```
## A相有功功率
```text
{
"key":"p_a",
"label":"kw",
"name":"A相有功功率",
"timestamp":1701395303,
"type":"AI",
"unit":23,
"value":0
}
```
## B相有功功率
```text
{
"key":"p_b",
"label":"kw",
"name":"B相有功功率",
"timestamp":1701395303,
"type":"AI",
"unit":23,
"value":90
}
```
## C相有功功率
```text
{
"key":"p_c",
"label":"kw",
"name":"C相有功功率",
"timestamp":1701395303,
"type":"AI",
"unit":23,
"value":107
},
```
## 总有功功率
```text
{
"key":"p",
"label":"kw",
"name":"总有功功率",
"timestamp":1701395303,
"type":"AI",
"unit":23,
"value":10
},
```
## A相无功功率
```text
{
"key":"q_a",
"label":"kvar",
"name":"A相无功功率",
"timestamp":1701395303,
"type":"AI",
"unit":23,
"value":0
},
```
## B相无功功率
```text
{
"key":"q_b",
"label":"kvar",
"name":"B相无功功率",
"timestamp":1701395303,
"type":"AI",
"unit":23,
"value":-21
},
```
## C相无功功率
```text
{
"key":"q_c",
"label":"kvar",
"name":"C相无功功率",
"timestamp":1701395303,
"type":"AI",
"unit":23,
"value":-10
},
```
## 总无功功率
```text
{
"key":"q",
"label":"kvar",
"name":"总无功功率",
"timestamp":1701395303,
"type":"AI",
"unit":23,
"value":20
},
```
## A相视在功率
```text
{
"key":"s_a",
"label":"kva",
"name":"A相视在功率",
"timestamp":1701395303,
"type":"AI",
"unit":23,
"value":0
},
```
## B相视在功率
```text
{
"key":"s_b",
"label":"kva",
"name":"B相视在功率",
"timestamp":1701395303,
"type":"AI",
"unit":23,
"value":93
},
```
## C相视在功率
```text
{
"key":"s_c",
"label":"kva",
"name":"C相视在功率",
"timestamp":1701395303,
"type":"AI",
"unit":23,
"value":107
},
```
## 总视在功率
```text
{
"key":"s",
"label":"kva",
"name":"总视在功率",
"timestamp":1701395303,
"type":"AI",
"unit":23,
"value":-32768
},
```
## A相真实功率因数
```text
{
"key":"factor_a",
"label":"",
"name":"A相真实功率因数",
"timestamp":1701395303,
"type":"AI",
"unit":16,
"value":-0
},
```
## B相真实功率因数
```text
{
"key":"factor_b",
"label":"",
"name":"B相真实功率因数",
"timestamp":1701395303,
"type":"AI",
"unit":16,
"value":0.974
},
```
## C相真实功率因数
```text
{
"key":"factor_c",
"label":"",
"name":"C相真实功率因数",
"timestamp":1701395303,
"type":"AI",
"unit":16,
"value":0.995
},
```
## 总真实功率因数
```text
{
"key":"factor",
"label":"",
"name":"总真实功率因数",
"timestamp":1701395303,
"type":"AI",
"unit":16,
"value":1.151
},
```
## 总有功电能
```text
{
"key":"power_sum",
"label":"WH",
"name":"总有功电能",
"timestamp":1701395303,
"type":"AI",
"unit":2,
"value":1428204899413270500
}
```
# 盛帆数据上传格式
盛帆有三种表,每种表格式不一致。
## NB三相表
NB三相表数据上传格式如下
### 正向总有功电
```text
{
"key":"epi",
"name":"正向总有功电能",
"unit":"5",
"label":"kWh",
"value":"321876",
"type": "AI",
"timestamp":"1701396585",
},
```
### A相电压
```text
{
"key":"a_voltage",
"name": "A相电压",
"unit":0,
"label":"V",
"value":224,
"type": "AI",
"timestamp":1701396585
},
```
### A相电流
```text
{
"key":"a_current",
"name": "A相电流",
"unit":3,
"label":"A",
"value":0.122,
"type": "AI",
"timestamp":1701396585
},
```
### B相电压
```text
{
"key":"b_voltage",
"name": "B相电压",
"unit":0,
"label":"V",
"value":224.3,
"type": "AI",
"timestamp":1701396585
},
```
### B相电流
```text
{
"key":"b_current",
"name": "B相电流",
"unit":3,
"label":"A",
"value":0.125,
"type": "AI",
"timestamp":1701396585
},
```
### C相电压
```text
{
"key":"c_voltage",
"name": "C相电压",
"unit":0,
"label":"V",
"value":224.3,
"type": "AI",
"timestamp":1701396585
},
```
### C相电流
```text
{
"key":"c_current",
"name": "C相电流",
"unit":3,
"label":"A",
"value":0.091,
"type": "AI",
"timestamp":1701396585
},
```
### 瞬时总有功功率
```text
{
"key":"active_power",
"name": "瞬时总有功功率",
"unit":23,
"label":"kW",
"value":44.28,
"type": "AI",
"timestamp":1701396585
},
```
### 总功率因数
```text
{
"key":"power_factor",
"name": "总功率因数",
"unit":16,
"label":"",
"value":0.978,
"type": "AI",
"timestamp":1701396585
}
```
## 其余两种表暂时还没看到数据,待补充