fix north data
This commit is contained in:
parent
839abb97db
commit
2fa55bf40e
@ -29,9 +29,6 @@
|
|||||||
-define(METHOD_FEEDBACK_STEP, 16#05).
|
-define(METHOD_FEEDBACK_STEP, 16#05).
|
||||||
-define(METHOD_FEEDBACK_RESULT, 16#06).
|
-define(METHOD_FEEDBACK_RESULT, 16#06).
|
||||||
|
|
||||||
%% 北向数据
|
|
||||||
-define(METHOD_NORTH_DATA, 16#08).
|
|
||||||
|
|
||||||
%% 消息体类型
|
%% 消息体类型
|
||||||
-define(PACKET_REQUEST, 16#01).
|
-define(PACKET_REQUEST, 16#01).
|
||||||
-define(PACKET_RESPONSE, 16#02).
|
-define(PACKET_RESPONSE, 16#02).
|
||||||
@ -46,6 +43,31 @@
|
|||||||
type :: atom()
|
type :: atom()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
-record(http_endpoint, {
|
||||||
|
url = <<>> :: binary(),
|
||||||
|
pool_size = 10 :: integer()
|
||||||
|
}).
|
||||||
|
|
||||||
|
-record(ws_endpoint, {
|
||||||
|
url = <<>> :: binary()
|
||||||
|
}).
|
||||||
|
|
||||||
|
-record(mqtt_endpoint, {
|
||||||
|
host = <<>> :: binary(),
|
||||||
|
port = 0 :: integer(),
|
||||||
|
username = <<>> :: binary(),
|
||||||
|
password = <<>> :: binary(),
|
||||||
|
topic = <<>> :: binary(),
|
||||||
|
qos = 0 :: integer()
|
||||||
|
}).
|
||||||
|
|
||||||
|
-record(kafka_endpoint, {
|
||||||
|
username = <<>> :: binary(),
|
||||||
|
password = <<>> :: binary(),
|
||||||
|
bootstrap_servers = [] :: [binary()],
|
||||||
|
topic = <<>> :: binary()
|
||||||
|
}).
|
||||||
|
|
||||||
%% 对端配置
|
%% 对端配置
|
||||||
-record(endpoint, {
|
-record(endpoint, {
|
||||||
%% 不同的对端名字要唯一
|
%% 不同的对端名字要唯一
|
||||||
@ -58,7 +80,7 @@
|
|||||||
%% 数据转换规则,基于function
|
%% 数据转换规则,基于function
|
||||||
mapper_fun = fun(_, Data) -> Data end :: fun((binary(), any()) -> any()),
|
mapper_fun = fun(_, Data) -> Data end :: fun((binary(), any()) -> any()),
|
||||||
%% 配置项, 格式: #{<<"protocol">> => <<"http|https|ws|kafka|mqtt">>, <<"args">> => #{}}
|
%% 配置项, 格式: #{<<"protocol">> => <<"http|https|ws|kafka|mqtt">>, <<"args">> => #{}}
|
||||||
config = #{} :: #{},
|
config = #http_endpoint{} :: #http_endpoint{} | #ws_endpoint{} | #mqtt_endpoint{} | #kafka_endpoint{},
|
||||||
%% 更新时间
|
%% 更新时间
|
||||||
updated_at = 0 :: integer(),
|
updated_at = 0 :: integer(),
|
||||||
%% 创建时间
|
%% 创建时间
|
||||||
|
|||||||
@ -46,8 +46,6 @@ handle_request("POST", "/endpoint/create", _, Params = #{<<"name">> := Name}) ->
|
|||||||
Endpoint = Endpoint0#endpoint{created_at = iot_util:timestamp_of_seconds()},
|
Endpoint = Endpoint0#endpoint{created_at = iot_util:timestamp_of_seconds()},
|
||||||
case mnesia_endpoint:insert(Endpoint) of
|
case mnesia_endpoint:insert(Endpoint) of
|
||||||
ok ->
|
ok ->
|
||||||
{ok, _} = iot_endpoint_sup:ensured_endpoint_started(Endpoint),
|
|
||||||
|
|
||||||
{ok, 200, iot_util:json_data(<<"success">>)};
|
{ok, 200, iot_util:json_data(<<"success">>)};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
lager:debug("[endpoint_handler] create router, get error is: ~p", [Reason]),
|
lager:debug("[endpoint_handler] create router, get error is: ~p", [Reason]),
|
||||||
@ -70,13 +68,6 @@ handle_request("POST", "/endpoint/update", _, Params = #{<<"name">> := Name}) wh
|
|||||||
|
|
||||||
case mnesia_endpoint:insert(NEndpoint1) of
|
case mnesia_endpoint:insert(NEndpoint1) of
|
||||||
ok ->
|
ok ->
|
||||||
case iot_endpoint:get_pid(Name) of
|
|
||||||
undefined ->
|
|
||||||
%% 重新启动endpoint
|
|
||||||
{ok, _} = iot_endpoint_sup:ensured_endpoint_started(NEndpoint1);
|
|
||||||
Pid when is_pid(Pid) ->
|
|
||||||
iot_endpoint:reload(Pid, NEndpoint1)
|
|
||||||
end,
|
|
||||||
{ok, 200, iot_util:json_data(<<"success">>)};
|
{ok, 200, iot_util:json_data(<<"success">>)};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
lager:debug("[endpoint_handler] update endpoint, get error is: ~p", [Reason]),
|
lager:debug("[endpoint_handler] update endpoint, get error is: ~p", [Reason]),
|
||||||
@ -139,27 +130,27 @@ make_endpoint([{<<"mapper">>, Mapper} | Params], Endpoint) when is_binary(Mapper
|
|||||||
make_endpoint([{<<"mapper">>, _} | _], _) ->
|
make_endpoint([{<<"mapper">>, _} | _], _) ->
|
||||||
throw(<<"invalid mapper">>);
|
throw(<<"invalid mapper">>);
|
||||||
|
|
||||||
make_endpoint([{<<"config">>, Config = #{<<"protocol">> := <<"http">>, <<"args">> := #{<<"url">> := Url}}} | Params], Endpoint) when Url /= <<>> ->
|
make_endpoint([{<<"config">>, #{<<"protocol">> := <<"http">>, <<"args">> := #{<<"url">> := Url}}} | Params], Endpoint) when Url /= <<>> ->
|
||||||
make_endpoint(Params, Endpoint#endpoint{config = Config});
|
make_endpoint(Params, Endpoint#endpoint{config = #http_endpoint{url = Url}});
|
||||||
make_endpoint([{<<"config">>, Config = #{<<"protocol">> := <<"https">>, <<"args">> := #{<<"url">> := Url}}} | Params], Endpoint) when Url /= <<>> ->
|
make_endpoint([{<<"config">>, #{<<"protocol">> := <<"https">>, <<"args">> := #{<<"url">> := Url}}} | Params], Endpoint) when Url /= <<>> ->
|
||||||
make_endpoint(Params, Endpoint#endpoint{config = Config});
|
make_endpoint(Params, Endpoint#endpoint{config = #http_endpoint{url = Url}});
|
||||||
make_endpoint([{<<"config">>, Config = #{<<"protocol">> := <<"ws">>, <<"args">> := #{<<"url">> := Url}}} | Params], Endpoint) when Url /= <<>> ->
|
make_endpoint([{<<"config">>, #{<<"protocol">> := <<"ws">>, <<"args">> := #{<<"url">> := Url}}} | Params], Endpoint) when Url /= <<>> ->
|
||||||
make_endpoint(Params, Endpoint#endpoint{config = Config});
|
make_endpoint(Params, Endpoint#endpoint{config = #ws_endpoint{url = Url}});
|
||||||
make_endpoint([{<<"config">>, Config = #{<<"protocol">> := <<"kafka">>, <<"args">> := #{<<"username">> := Username, <<"password">> := Password, <<"bootstrap_servers">> := BootstrapServers, <<"topic">> := Topic}}} | Params], Endpoint)
|
make_endpoint([{<<"config">>, #{<<"protocol">> := <<"kafka">>, <<"args">> := #{<<"username">> := Username, <<"password">> := Password, <<"bootstrap_servers">> := BootstrapServers, <<"topic">> := Topic}}} | Params], Endpoint)
|
||||||
when is_binary(Username) andalso Username /= <<>>
|
when is_binary(Username) andalso Username /= <<>>
|
||||||
andalso is_binary(Password) andalso Password /= <<>>
|
andalso is_binary(Password) andalso Password /= <<>>
|
||||||
andalso is_list(BootstrapServers) andalso length(BootstrapServers) > 0
|
andalso is_list(BootstrapServers) andalso length(BootstrapServers) > 0
|
||||||
andalso is_binary(Topic) andalso Topic /= <<>> ->
|
andalso is_binary(Topic) andalso Topic /= <<>> ->
|
||||||
make_endpoint(Params, Endpoint#endpoint{config = Config});
|
make_endpoint(Params, Endpoint#endpoint{config = #kafka_endpoint{username = Username, password = Password, bootstrap_servers = BootstrapServers, topic = Topic}});
|
||||||
|
|
||||||
make_endpoint([{<<"config">>, Config = #{<<"protocol">> := <<"mqtt">>, <<"args">> := #{<<"host">> := Host, <<"port">> := Port, <<"username">> := Username, <<"password">> := Password, <<"topic">> := Topic, <<"qos">> := Qos}}} | Params], Endpoint)
|
make_endpoint([{<<"config">>, #{<<"protocol">> := <<"mqtt">>, <<"args">> := #{<<"host">> := Host, <<"port">> := Port, <<"username">> := Username, <<"password">> := Password, <<"topic">> := Topic, <<"qos">> := Qos}}} | Params], Endpoint)
|
||||||
when is_binary(Username) andalso Username /= <<>>
|
when is_binary(Username) andalso Username /= <<>>
|
||||||
andalso is_binary(Password) andalso Password /= <<>>
|
andalso is_binary(Password) andalso Password /= <<>>
|
||||||
andalso is_binary(Host) andalso Host /= <<>>
|
andalso is_binary(Host) andalso Host /= <<>>
|
||||||
andalso is_integer(Port) andalso Port > 0
|
andalso is_integer(Port) andalso Port > 0
|
||||||
andalso (Qos == 0 orelse Qos == 1 orelse Qos == 2)
|
andalso (Qos == 0 orelse Qos == 1 orelse Qos == 2)
|
||||||
andalso is_binary(Topic) andalso Topic /= <<>> ->
|
andalso is_binary(Topic) andalso Topic /= <<>> ->
|
||||||
make_endpoint(Params, Endpoint#endpoint{config = Config});
|
make_endpoint(Params, Endpoint#endpoint{config = #mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos}});
|
||||||
|
|
||||||
make_endpoint([{<<"config">>, Config} | _], _) ->
|
make_endpoint([{<<"config">>, Config} | _], _) ->
|
||||||
lager:warning("[endpoint_handler] unsupport config: ~p", [Config]),
|
lager:warning("[endpoint_handler] unsupport config: ~p", [Config]),
|
||||||
|
|||||||
@ -49,11 +49,11 @@ get_name(EndpointName) when is_binary(EndpointName) ->
|
|||||||
get_pid(Name) when is_binary(Name) ->
|
get_pid(Name) when is_binary(Name) ->
|
||||||
whereis(get_name(Name)).
|
whereis(get_name(Name)).
|
||||||
|
|
||||||
-spec forward(Pid :: undefined | pid(), LocationCode :: binary(), Data :: binary()) -> no_return().
|
-spec forward(Pid :: undefined | pid(), LocationCode :: binary(), Fields :: list()) -> no_return().
|
||||||
forward(undefined, _, _) ->
|
forward(undefined, _, _) ->
|
||||||
ok;
|
ok;
|
||||||
forward(Pid, LocationCode, Data) when is_pid(Pid), is_binary(LocationCode) ->
|
forward(Pid, LocationCode, Fields) when is_pid(Pid), is_binary(LocationCode), is_list(Fields); is_binary(Fields) ->
|
||||||
gen_statem:cast(Pid, {forward, LocationCode, Data}).
|
gen_statem:cast(Pid, {forward, LocationCode, Fields}).
|
||||||
|
|
||||||
reload(Pid, NEndpoint = #endpoint{}) when is_pid(Pid) ->
|
reload(Pid, NEndpoint = #endpoint{}) when is_pid(Pid) ->
|
||||||
gen_statem:cast(Pid, {reload, NEndpoint}).
|
gen_statem:cast(Pid, {reload, NEndpoint}).
|
||||||
@ -100,18 +100,24 @@ handle_event(cast, {reload, NEndpoint}, disconnected, State = #state{endpoint =
|
|||||||
lager:warning("[iot_endpoint] reload endpoint, old: ~p, new: ~p", [Endpoint, Endpoint, NEndpoint]),
|
lager:warning("[iot_endpoint] reload endpoint, old: ~p, new: ~p", [Endpoint, Endpoint, NEndpoint]),
|
||||||
{keep_state, State#state{endpoint = NEndpoint}};
|
{keep_state, State#state{endpoint = NEndpoint}};
|
||||||
|
|
||||||
handle_event(cast, {reload, NEndpoint}, connected, State = #state{endpoint = Endpoint, postman = {_, PostmanPid}}) ->
|
handle_event(cast, {reload, NEndpoint = #endpoint{name = Name}}, connected, State = #state{endpoint = Endpoint, postman = {_, PostmanPid}}) ->
|
||||||
lager:warning("[iot_endpoint] reload endpoint, old: ~p, new: ~p", [Endpoint, Endpoint, NEndpoint]),
|
lager:debug("[iot_endpoint] reload endpoint, old: ~p~n, new: ~p", [Endpoint, NEndpoint]),
|
||||||
%% 解除和postman的link关系
|
case mnesia_endpoint:config_equals(NEndpoint#endpoint.config, Endpoint#endpoint.config) of
|
||||||
unlink(PostmanPid),
|
true ->
|
||||||
%% 关闭postman进程
|
lager:debug("[iot_endpoint] reload endpoint: ~p, config equals", [Name]),
|
||||||
PostmanPid ! stop,
|
{keep_state, State#state{endpoint = NEndpoint}};
|
||||||
%% 未确认的消息需要暂存
|
false ->
|
||||||
NState = stash(State),
|
%% 解除和postman的link关系
|
||||||
%% 重新建立新的postman
|
unlink(PostmanPid),
|
||||||
erlang:start_timer(0, self(), recreate_postman),
|
%% 关闭postman进程
|
||||||
|
PostmanPid ! stop,
|
||||||
|
%% 未确认的消息需要暂存
|
||||||
|
NState = stash(State),
|
||||||
|
%% 重新建立新的postman
|
||||||
|
erlang:start_timer(0, self(), recreate_postman),
|
||||||
|
|
||||||
{next_state, disconnected, NState#state{endpoint = NEndpoint, postman = undefined}};
|
{next_state, disconnected, NState#state{endpoint = NEndpoint, postman = undefined}}
|
||||||
|
end;
|
||||||
|
|
||||||
handle_event(cast, {forward, LocationCode, Data}, _, State = #state{endpoint = Endpoint = #endpoint{mapper_fun = MapperFun}}) ->
|
handle_event(cast, {forward, LocationCode, Data}, _, State = #state{endpoint = Endpoint = #endpoint{mapper_fun = MapperFun}}) ->
|
||||||
try
|
try
|
||||||
@ -232,38 +238,28 @@ stash(State = #state{ack_map = AckMap, timer_map = TimerMap, q = Q}) ->
|
|||||||
State#state{q = Q1, ack_map = #{}, postman = undefined}.
|
State#state{q = Q1, ack_map = #{}, postman = undefined}.
|
||||||
|
|
||||||
%% 对http和https协议的支持
|
%% 对http和https协议的支持
|
||||||
create_postman(#endpoint{name = Name, config = Config = #{<<"protocol">> := <<"http">>, <<"args">> := #{<<"url">> := Url}}}) ->
|
create_postman(#endpoint{name = Name, config = #http_endpoint{url = Url, pool_size = PoolSize}}) ->
|
||||||
PoolSize = maps:get(<<"pool_size">>, Config, 10),
|
|
||||||
PoolName = binary_to_atom(<<"http_pool:", Name/binary>>),
|
PoolName = binary_to_atom(<<"http_pool:", Name/binary>>),
|
||||||
{ok, PostmanPid} = http_postman:start_link(self(), Url, PoolName, PoolSize),
|
{ok, PostmanPid} = http_postman:start_link(self(), Url, PoolName, PoolSize),
|
||||||
|
|
||||||
{ok, {http, PostmanPid}};
|
{ok, {http, PostmanPid}};
|
||||||
|
|
||||||
%% 对mqtt协议的支持
|
%% 对mqtt协议的支持
|
||||||
create_postman(#endpoint{name = Name, config = Config = #{<<"protocol">> := <<"mqtt">>,
|
create_postman(#endpoint{name = Name, config = #mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos}}) ->
|
||||||
<<"args">> := #{<<"host">> := Host, <<"port">> := Port, <<"username">> := Username, <<"password">> := Password, <<"topic">> := Topic, <<"qos">> := Qos}}}) ->
|
Node = atom_to_binary(node()),
|
||||||
|
ClientId = <<"mqtt-client-", Node/binary, "-", Name/binary>>,
|
||||||
ClientId = case maps:is_key(<<"client_id">>, Config) of
|
|
||||||
true ->
|
|
||||||
maps:get(<<"client_id">>, Config);
|
|
||||||
false ->
|
|
||||||
Node = atom_to_binary(node()),
|
|
||||||
<<"mqtt-client-", Node/binary, "-", Name/binary>>
|
|
||||||
end,
|
|
||||||
Keepalive = maps:get(<<"keepalive">>, Config, 86400),
|
|
||||||
RetryInterval = maps:get(<<"retry_interval">>, Config, 5),
|
|
||||||
Opts = [
|
Opts = [
|
||||||
{clientid, ClientId},
|
{clientid, ClientId},
|
||||||
{host, as_string(Host)},
|
{host, binary_to_list(Host)},
|
||||||
{port, Port},
|
{port, Port},
|
||||||
{tcp_opts, []},
|
{tcp_opts, []},
|
||||||
{username, as_string(Username)},
|
{username, unicode:characters_to_list(Username)},
|
||||||
{password, as_string(Password)},
|
{password, unicode:characters_to_list(Password)},
|
||||||
{keepalive, Keepalive},
|
{keepalive, 86400},
|
||||||
{auto_ack, true},
|
{auto_ack, true},
|
||||||
{connect_timeout, 5000},
|
{connect_timeout, 5000},
|
||||||
{proto_ver, v5},
|
{proto_ver, v5},
|
||||||
{retry_interval, RetryInterval}
|
{retry_interval, 5}
|
||||||
],
|
],
|
||||||
|
|
||||||
{ok, PostmanPid} = mqtt_postman:start_link(self(), Opts, Topic, Qos),
|
{ok, PostmanPid} = mqtt_postman:start_link(self(), Opts, Topic, Qos),
|
||||||
@ -271,9 +267,3 @@ create_postman(#endpoint{name = Name, config = Config = #{<<"protocol">> := <<"m
|
|||||||
{ok, {mqtt, PostmanPid}};
|
{ok, {mqtt, PostmanPid}};
|
||||||
create_postman(#endpoint{}) ->
|
create_postman(#endpoint{}) ->
|
||||||
throw(<<"not supported">>).
|
throw(<<"not supported">>).
|
||||||
|
|
||||||
%% 转出成string
|
|
||||||
as_string(S) when is_list(S) ->
|
|
||||||
S;
|
|
||||||
as_string(S) when is_binary(S) ->
|
|
||||||
unicode:characters_to_list(S).
|
|
||||||
134
apps/iot/src/iot_endpoint_monitor.erl
Normal file
134
apps/iot/src/iot_endpoint_monitor.erl
Normal file
@ -0,0 +1,134 @@
|
|||||||
|
%%%-------------------------------------------------------------------
|
||||||
|
%%% @author aresei
|
||||||
|
%%% @copyright (C) 2023, <COMPANY>
|
||||||
|
%%% @doc
|
||||||
|
%%%
|
||||||
|
%%% @end
|
||||||
|
%%% Created : 25. 7月 2023 16:39
|
||||||
|
%%%-------------------------------------------------------------------
|
||||||
|
-module(iot_endpoint_monitor).
|
||||||
|
-author("aresei").
|
||||||
|
-include("iot.hrl").
|
||||||
|
|
||||||
|
-behaviour(gen_server).
|
||||||
|
|
||||||
|
%% API
|
||||||
|
-export([start_link/0]).
|
||||||
|
-export([get_all_endpoints/0]).
|
||||||
|
|
||||||
|
%% gen_server callbacks
|
||||||
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||||
|
|
||||||
|
-define(SERVER, ?MODULE).
|
||||||
|
|
||||||
|
-record(state, {
|
||||||
|
|
||||||
|
}).
|
||||||
|
|
||||||
|
%%%===================================================================
|
||||||
|
%%% API
|
||||||
|
%%%===================================================================
|
||||||
|
|
||||||
|
get_all_endpoints() ->
|
||||||
|
case ets:lookup(endpoint_cache, endpoints) of
|
||||||
|
[] ->
|
||||||
|
[];
|
||||||
|
[{_, Endpoints} | _] ->
|
||||||
|
Endpoints
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% @doc Spawns the server and registers the local name (unique)
|
||||||
|
-spec(start_link() ->
|
||||||
|
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
|
||||||
|
start_link() ->
|
||||||
|
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
||||||
|
|
||||||
|
%%%===================================================================
|
||||||
|
%%% gen_server callbacks
|
||||||
|
%%%===================================================================
|
||||||
|
|
||||||
|
%% @private
|
||||||
|
%% @doc Initializes the server
|
||||||
|
-spec(init(Args :: term()) ->
|
||||||
|
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
|
||||||
|
{stop, Reason :: term()} | ignore).
|
||||||
|
init([]) ->
|
||||||
|
mnesia:subscribe({table, endpoint, simple}),
|
||||||
|
ets:new(endpoint_cache, [public, set, named_table, {keypos, 1}]),
|
||||||
|
%% 加载信息到缓存中
|
||||||
|
load_endpoints(),
|
||||||
|
{ok, #state{}}.
|
||||||
|
|
||||||
|
%% @private
|
||||||
|
%% @doc Handling call messages
|
||||||
|
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
|
||||||
|
State :: #state{}) ->
|
||||||
|
{reply, Reply :: term(), NewState :: #state{}} |
|
||||||
|
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
|
||||||
|
{noreply, NewState :: #state{}} |
|
||||||
|
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||||
|
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
|
||||||
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
|
handle_call(_Request, _From, State = #state{}) ->
|
||||||
|
{reply, ok, State}.
|
||||||
|
|
||||||
|
%% @private
|
||||||
|
%% @doc Handling cast messages
|
||||||
|
-spec(handle_cast(Request :: term(), State :: #state{}) ->
|
||||||
|
{noreply, NewState :: #state{}} |
|
||||||
|
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||||
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
|
handle_cast(_Request, State = #state{}) ->
|
||||||
|
{noreply, State}.
|
||||||
|
|
||||||
|
%% @private
|
||||||
|
%% @doc Handling all non call/cast messages
|
||||||
|
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
|
||||||
|
{noreply, NewState :: #state{}} |
|
||||||
|
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||||
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
|
handle_info({mnesia_table_event, {delete, {endpoint, Name}, Tid}}, State = #state{}) ->
|
||||||
|
lager:debug("[iot_endpoint_monitor] delete_event endpoint name: ~p, tid: ~p", [Name, Tid]),
|
||||||
|
iot_endpoint_sup:delete_endpoint(Name),
|
||||||
|
load_endpoints(),
|
||||||
|
|
||||||
|
{noreply, State};
|
||||||
|
handle_info({mnesia_table_event, {write, Endpoint = #endpoint{name = Name}, Tid}}, State = #state{}) ->
|
||||||
|
lager:debug("[iot_endpoint_monitor] write_event new endpoint: ~p, tid: ~p", [Endpoint, Tid]),
|
||||||
|
case iot_endpoint:get_pid(Name) of
|
||||||
|
undefined ->
|
||||||
|
%% 重新启动endpoint
|
||||||
|
{ok, _} = iot_endpoint_sup:ensured_endpoint_started(Endpoint);
|
||||||
|
Pid when is_pid(Pid) ->
|
||||||
|
iot_endpoint:reload(Pid, Endpoint)
|
||||||
|
end,
|
||||||
|
load_endpoints(),
|
||||||
|
|
||||||
|
{noreply, State}.
|
||||||
|
|
||||||
|
%% @private
|
||||||
|
%% @doc This function is called by a gen_server when it is about to
|
||||||
|
%% terminate. It should be the opposite of Module:init/1 and do any
|
||||||
|
%% necessary cleaning up. When it returns, the gen_server terminates
|
||||||
|
%% with Reason. The return value is ignored.
|
||||||
|
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
|
||||||
|
State :: #state{}) -> term()).
|
||||||
|
terminate(_Reason, _State = #state{}) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
%% @private
|
||||||
|
%% @doc Convert process state when code is changed
|
||||||
|
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
|
||||||
|
Extra :: term()) ->
|
||||||
|
{ok, NewState :: #state{}} | {error, Reason :: term()}).
|
||||||
|
code_change(_OldVsn, State = #state{}, _Extra) ->
|
||||||
|
{ok, State}.
|
||||||
|
|
||||||
|
%%%===================================================================
|
||||||
|
%%% Internal functions
|
||||||
|
%%%===================================================================
|
||||||
|
|
||||||
|
-spec load_endpoints() -> no_return().
|
||||||
|
load_endpoints() ->
|
||||||
|
Endpoints = mnesia_endpoint:get_all_endpoints(),
|
||||||
|
true = ets:insert(endpoint_cache, {endpoints, Endpoints}).
|
||||||
@ -39,6 +39,7 @@ stat() ->
|
|||||||
|
|
||||||
delete_endpoint(Name) when is_binary(Name) ->
|
delete_endpoint(Name) when is_binary(Name) ->
|
||||||
Id = iot_endpoint:get_name(Name),
|
Id = iot_endpoint:get_name(Name),
|
||||||
|
supervisor:terminate_child(?MODULE, Id),
|
||||||
supervisor:delete_child(?MODULE, Id).
|
supervisor:delete_child(?MODULE, Id).
|
||||||
|
|
||||||
child_spec(Endpoint) ->
|
child_spec(Endpoint) ->
|
||||||
|
|||||||
@ -239,57 +239,39 @@ handle_event({call, From}, {create_session, PubKey}, _StateName, State = #state{
|
|||||||
|
|
||||||
{next_state, session, State#state{status = ?HOST_STATUS_ONLINE}, [{reply, From, {ok, <<10:8, EncReply/binary>>}}]};
|
{next_state, session, State#state{status = ?HOST_STATUS_ONLINE}, [{reply, From, {ok, <<10:8, EncReply/binary>>}}]};
|
||||||
|
|
||||||
%% 需要将消息转换成json格式然后再处理, 需要在host进程里面处理
|
%% 需要将消息转换成json格式然后再处理, 需要在host进程里面处理, 数据带有props,服务端暂时未用到
|
||||||
handle_event(cast, {handle, {data, Data}}, session, State = #state{uuid = UUID, aes = AES}) ->
|
handle_event(cast, {handle, {data, Data}}, session, State = #state{uuid = UUID, aes = AES}) ->
|
||||||
PlainData = iot_cipher_aes:decrypt(AES, Data),
|
PlainData = iot_cipher_aes:decrypt(AES, Data),
|
||||||
case catch jiffy:decode(PlainData, [return_maps]) of
|
case catch jiffy:decode(PlainData, [return_maps]) of
|
||||||
Infos when is_list(Infos) ->
|
Info = #{<<"service_name">> := ServiceName, <<"at">> := Timestamp, <<"fields">> := FieldsList, <<"tags">> := Tags} when is_binary(ServiceName) ->
|
||||||
lager:debug("[iot_host] the data is: ~p", [Infos]),
|
%% 查找终端设备对应的点位信息
|
||||||
%% 记录数据, tags里面包含了 <<"device_uuid">> 信息
|
RouterUUID = router_uuid(Info, UUID),
|
||||||
lists:foreach(fun(Info = #{<<"service_name">> := ServiceName, <<"fields">> := FieldsList, <<"tags">> := Tags}) when is_binary(ServiceName) ->
|
case mnesia_kv:hget(RouterUUID, <<"location_code">>) of
|
||||||
Timestamp = maps:get(<<"at">>, Info, iot_util:timestamp()),
|
none ->
|
||||||
NTags = Tags#{<<"uuid">> => UUID, <<"service_name">> => ServiceName},
|
lager:debug("[iot_host] the north_data hget location_code uuid: ~p, router_uuid: ~p, not found", [UUID, RouterUUID]);
|
||||||
Measurement = <<"metric">>,
|
{error, Reason} ->
|
||||||
|
lager:debug("[iot_host] the north_data hget location_code uuid: ~p, router_uuid: ~p, get error: ~p", [UUID, RouterUUID, Reason]);
|
||||||
|
{ok, LocationCode} ->
|
||||||
|
iot_router:route(LocationCode, FieldsList)
|
||||||
|
end,
|
||||||
|
|
||||||
Points = lists:map(fun(Fields) -> influx_point:new(Measurement, NTags, Fields, Timestamp) end, FieldsList),
|
%% 数据写入influxdb
|
||||||
Precision = influx_client:get_precision(Timestamp),
|
NTags = case maps:is_key(<<"device_uuid">>, Info) of
|
||||||
|
true ->
|
||||||
|
Tags#{<<"uuid">> => UUID, <<"service_name">> => ServiceName, <<"device_uuid">> => maps:get(<<"device_uuid">>, Info)};
|
||||||
|
false ->
|
||||||
|
Tags#{<<"uuid">> => UUID, <<"service_name">> => ServiceName}
|
||||||
|
end,
|
||||||
|
Measurement = <<"metric">>,
|
||||||
|
Points = lists:map(fun(Fields) -> influx_point:new(Measurement, NTags, Fields, Timestamp) end, FieldsList),
|
||||||
|
Precision = influx_client:get_precision(Timestamp),
|
||||||
|
|
||||||
poolboy:transaction(influx_pool, fun(Pid) -> influx_client:write(Pid, <<"iot">>, <<"iot">>, Precision, Points) end)
|
poolboy:transaction(influx_pool, fun(Pid) -> influx_client:write(Pid, <<"iot">>, <<"iot">>, Precision, Points) end);
|
||||||
end, Infos);
|
|
||||||
Other ->
|
Other ->
|
||||||
lager:debug("[iot_host] the data is invalid json: ~p", [Other])
|
lager:debug("[iot_host] the data is invalid json: ~p", [Other])
|
||||||
end,
|
end,
|
||||||
{keep_state, State};
|
{keep_state, State};
|
||||||
|
|
||||||
%% TODO 处理微服务的北向数据
|
|
||||||
handle_event(cast, {handle, {north_data, Data}}, session, State = #state{uuid = UUID, aes = AES}) ->
|
|
||||||
PlainData = iot_cipher_aes:decrypt(AES, Data),
|
|
||||||
lager:debug("[iot_host] the north_data is: ~p", [PlainData]),
|
|
||||||
%% 查找主机对应的点位信息
|
|
||||||
case mnesia_kv:hget(UUID, <<"location_code">>) of
|
|
||||||
none ->
|
|
||||||
lager:debug("[iot_host] the north_data hget location_code uuid: ~p, not found", [UUID]);
|
|
||||||
{error, Reason} ->
|
|
||||||
lager:debug("[iot_host] the north_data hget location_code uuid: ~p, get error: ~p", [UUID, Reason]);
|
|
||||||
{ok, LocationCode} ->
|
|
||||||
iot_router:route(LocationCode, Data)
|
|
||||||
end,
|
|
||||||
{keep_state, State};
|
|
||||||
|
|
||||||
handle_event(cast, {handle, {north_data, {DeviceUUID, Data}}}, session, State = #state{uuid = UUID, aes = AES}) ->
|
|
||||||
PlainData = iot_cipher_aes:decrypt(AES, Data),
|
|
||||||
lager:debug("[iot_host] the north_data uuid: ~p, device_uuid: ~p, is: ~p", [UUID, DeviceUUID, PlainData]),
|
|
||||||
%% 查找终端设备对应的点位信息
|
|
||||||
case mnesia_kv:hget(DeviceUUID, <<"location_code">>) of
|
|
||||||
none ->
|
|
||||||
lager:debug("[iot_host] the north_data hget location_code uuid: ~p, device_uuid: ~p, not found", [UUID, DeviceUUID]);
|
|
||||||
{error, Reason} ->
|
|
||||||
lager:debug("[iot_host] the north_data hget location_code uuid: ~p, device_uuid: ~p, get error: ~p", [UUID, DeviceUUID, Reason]);
|
|
||||||
{ok, LocationCode} ->
|
|
||||||
iot_router:route(LocationCode, Data)
|
|
||||||
end,
|
|
||||||
{keep_state, State};
|
|
||||||
|
|
||||||
handle_event(cast, {handle, {ping, CipherMetric}}, session, State = #state{uuid = UUID, aes = AES}) ->
|
handle_event(cast, {handle, {ping, CipherMetric}}, session, State = #state{uuid = UUID, aes = AES}) ->
|
||||||
MetricsInfo = iot_cipher_aes:decrypt(AES, CipherMetric),
|
MetricsInfo = iot_cipher_aes:decrypt(AES, CipherMetric),
|
||||||
case catch jiffy:decode(MetricsInfo, [return_maps]) of
|
case catch jiffy:decode(MetricsInfo, [return_maps]) of
|
||||||
@ -397,3 +379,9 @@ code_change(_OldVsn, StateName, State = #state{}, _Extra) ->
|
|||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
%%% Internal functions
|
%%% Internal functions
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
|
|
||||||
|
%% 获取到分发的路由
|
||||||
|
router_uuid(#{<<"device_uuid">> := DeviceUUID}, _) when is_binary(DeviceUUID), DeviceUUID /= <<>> ->
|
||||||
|
DeviceUUID;
|
||||||
|
router_uuid(_, UUID) ->
|
||||||
|
UUID.
|
||||||
|
|||||||
@ -13,21 +13,21 @@
|
|||||||
%% API
|
%% API
|
||||||
-export([route/2]).
|
-export([route/2]).
|
||||||
|
|
||||||
-spec route(LocationCode :: binary(), Data :: binary()) -> ok.
|
-spec route(LocationCode :: binary(), Fields :: list()) -> ok.
|
||||||
route(LocationCode, Data) when is_binary(LocationCode), is_binary(Data) ->
|
route(LocationCode, Fields) when is_binary(LocationCode), is_list(Fields) ->
|
||||||
Endpoints = mnesia_endpoint:get_all_endpoints(),
|
Endpoints = iot_endpoint_monitor:get_all_endpoints(),
|
||||||
router0(Endpoints, LocationCode, Data).
|
router0(Endpoints, LocationCode, Fields).
|
||||||
router0([], _, _) ->
|
router0([], _, _) ->
|
||||||
ok;
|
ok;
|
||||||
router0([#endpoint{matcher = Regexp, name = Name}|Endpoints], LocationCode, Data) ->
|
router0([#endpoint{matcher = Regexp, name = Name} | Endpoints], LocationCode, Fields) ->
|
||||||
{ok, MP} = re:compile(Regexp),
|
{ok, MP} = re:compile(Regexp),
|
||||||
case re:run(LocationCode, MP, [{capture, all, list}]) of
|
case re:run(LocationCode, MP, [{capture, all, list}]) of
|
||||||
nomatch ->
|
nomatch ->
|
||||||
router0(Endpoints, LocationCode, Data);
|
router0(Endpoints, LocationCode, Fields);
|
||||||
{match, _} ->
|
{match, _} ->
|
||||||
lager:debug("[iot_router] match endpoint: ~p", [Name]),
|
lager:debug("[iot_router] match endpoint: ~p", [Name]),
|
||||||
Pid = iot_endpoint:get_pid(Name),
|
Pid = iot_endpoint:get_pid(Name),
|
||||||
iot_endpoint:forward(Pid, LocationCode, Data),
|
iot_endpoint:forward(Pid, LocationCode, Fields),
|
||||||
%% 继续匹配其他的Endpoint
|
%% 继续匹配其他的Endpoint
|
||||||
router0(Endpoints, LocationCode, Data)
|
router0(Endpoints, LocationCode, Fields)
|
||||||
end.
|
end.
|
||||||
@ -28,6 +28,15 @@ start_link() ->
|
|||||||
init([]) ->
|
init([]) ->
|
||||||
SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600},
|
SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600},
|
||||||
ChildSpecs = [
|
ChildSpecs = [
|
||||||
|
#{
|
||||||
|
id => 'iot_endpoint_monitor',
|
||||||
|
start => {'iot_endpoint_monitor', start_link, []},
|
||||||
|
restart => permanent,
|
||||||
|
shutdown => 2000,
|
||||||
|
type => supervisor,
|
||||||
|
modules => ['iot_endpoint_monitor']
|
||||||
|
},
|
||||||
|
|
||||||
#{
|
#{
|
||||||
id => 'iot_endpoint_sup',
|
id => 'iot_endpoint_sup',
|
||||||
start => {'iot_endpoint_sup', start_link, []},
|
start => {'iot_endpoint_sup', start_link, []},
|
||||||
|
|||||||
@ -13,7 +13,7 @@
|
|||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([get_all_endpoints/0, get_endpoint/1, insert/1, delete/1]).
|
-export([get_all_endpoints/0, get_endpoint/1, insert/1, delete/1]).
|
||||||
-export([to_map/1]).
|
-export([to_map/1, config_equals/2]).
|
||||||
|
|
||||||
-spec get_all_endpoints() -> [#endpoint{}].
|
-spec get_all_endpoints() -> [#endpoint{}].
|
||||||
get_all_endpoints() ->
|
get_all_endpoints() ->
|
||||||
@ -56,14 +56,37 @@ delete(Name) when is_binary(Name) ->
|
|||||||
{error, Reason}
|
{error, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%% 判断2个endpoint的配置项是否相同
|
||||||
|
-spec config_equals(any(), any()) -> boolean().
|
||||||
|
config_equals(#http_endpoint{url = Url}, #http_endpoint{url = Url}) ->
|
||||||
|
true;
|
||||||
|
config_equals(#ws_endpoint{url = Url}, #ws_endpoint{url = Url}) ->
|
||||||
|
true;
|
||||||
|
config_equals(#kafka_endpoint{username = Username, password = Password, bootstrap_servers = BootstrapServers, topic = Topic},
|
||||||
|
#kafka_endpoint{username = Username, password = Password, bootstrap_servers = BootstrapServers, topic = Topic}) ->
|
||||||
|
true;
|
||||||
|
config_equals(#mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos},
|
||||||
|
#mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos}) ->
|
||||||
|
true;
|
||||||
|
config_equals(_, _) ->
|
||||||
|
false.
|
||||||
|
|
||||||
to_map(#endpoint{name = Name, title = Title, matcher = Matcher, mapper = Mapper, config = Config, updated_at = UpdatedAt, created_at = CreatedAt}) ->
|
to_map(#endpoint{name = Name, title = Title, matcher = Matcher, mapper = Mapper, config = Config, updated_at = UpdatedAt, created_at = CreatedAt}) ->
|
||||||
#{
|
#{
|
||||||
<<"name">> => Name,
|
<<"name">> => Name,
|
||||||
<<"title">> => Title,
|
<<"title">> => Title,
|
||||||
<<"matcher">> => Matcher,
|
<<"matcher">> => Matcher,
|
||||||
<<"mapper">> => Mapper,
|
<<"mapper">> => Mapper,
|
||||||
<<"config">> => Config,
|
<<"config">> => config_map(Config),
|
||||||
<<"updated_at">> => UpdatedAt,
|
<<"updated_at">> => UpdatedAt,
|
||||||
<<"created_at">> => CreatedAt
|
<<"created_at">> => CreatedAt
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
config_map(#http_endpoint{url = Url}) ->
|
||||||
|
#{<<"protocol">> => <<"http">>, <<"args">> => #{<<"url">> => Url}};
|
||||||
|
config_map(#ws_endpoint{url = Url}) ->
|
||||||
|
#{<<"protocol">> => <<"ws">>, <<"args">> => #{<<"url">> => Url}};
|
||||||
|
config_map(#kafka_endpoint{username = Username, password = Password, bootstrap_servers = BootstrapServers, topic = Topic}) ->
|
||||||
|
#{<<"protocol">> => <<"kafka">>, <<"args">> => #{<<"username">> => Username, <<"password">> => Password, <<"bootstrap_servers">> => BootstrapServers, <<"topic">> => Topic}};
|
||||||
|
config_map(#mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos}) ->
|
||||||
|
#{<<"protocol">> => <<"mqtt">>, <<"args">> => #{<<"host">> => Host, <<"port">> => Port, <<"username">> => Username, <<"password">> => Password, <<"topic">> => Topic, <<"qos">> => Qos}}.
|
||||||
|
|||||||
@ -16,9 +16,8 @@
|
|||||||
-export([insert_endpoints/0]).
|
-export([insert_endpoints/0]).
|
||||||
|
|
||||||
insert_endpoints() ->
|
insert_endpoints() ->
|
||||||
Mapper0 = "fun(LocationCode, Data) ->
|
Mapper0 = "fun(LocationCode, Fields) ->
|
||||||
Json = jiffy:decode(Data, [return_maps]),
|
Bin = jiffy:encode(Fields#{<<\"location_code\">> => LocationCode}, [force_utf8]),
|
||||||
Bin = jiffy:encode(Json#{<<\"location_code\">> => LocationCode}, [force_utf8]),
|
|
||||||
iolist_to_binary(Bin)
|
iolist_to_binary(Bin)
|
||||||
end.",
|
end.",
|
||||||
|
|
||||||
@ -30,7 +29,7 @@ insert_endpoints() ->
|
|||||||
matcher = <<"test12*">>,
|
matcher = <<"test12*">>,
|
||||||
mapper = Mapper,
|
mapper = Mapper,
|
||||||
mapper_fun = F,
|
mapper_fun = F,
|
||||||
config = #{<<"protocol">> => <<"http">>, <<"args">> => #{<<"url">> => <<"http://localhost:18080/test/receiver">>}},
|
config = #http_endpoint{url = <<"http://localhost:18080/test/receiver">>},
|
||||||
created_at = iot_util:timestamp_of_seconds()
|
created_at = iot_util:timestamp_of_seconds()
|
||||||
}),
|
}),
|
||||||
|
|
||||||
@ -40,16 +39,13 @@ insert_endpoints() ->
|
|||||||
matcher = <<"test*">>,
|
matcher = <<"test*">>,
|
||||||
mapper = Mapper,
|
mapper = Mapper,
|
||||||
mapper_fun = F,
|
mapper_fun = F,
|
||||||
config = #{
|
config = #mqtt_endpoint{
|
||||||
<<"protocol">> => <<"mqtt">>,
|
host = <<"39.98.184.67">>,
|
||||||
<<"args">> => #{
|
port = 1883,
|
||||||
<<"host">> => <<"39.98.184.67">>,
|
username = <<"test">>,
|
||||||
<<"port">> => 1883,
|
password = <<"test1234">>,
|
||||||
<<"username">> => <<"test">>,
|
topic = <<"CET/NX/${location_code}/upload">>,
|
||||||
<<"password">> => <<"test1234">>,
|
qos = 2
|
||||||
<<"topic">> => <<"CET/NX/${location_code}/upload">>,
|
|
||||||
<<"qos">> => 2
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
created_at = iot_util:timestamp_of_seconds()
|
created_at = iot_util:timestamp_of_seconds()
|
||||||
}),
|
}),
|
||||||
|
|||||||
@ -92,17 +92,6 @@ websocket_handle({binary, <<?PACKET_REQUEST, _PacketId:32, ?METHOD_DATA:8, Data/
|
|||||||
iot_host:handle(HostPid, {data, Data}),
|
iot_host:handle(HostPid, {data, Data}),
|
||||||
{ok, State};
|
{ok, State};
|
||||||
|
|
||||||
%% 北向数据处理
|
|
||||||
websocket_handle({binary, <<?PACKET_REQUEST, _PacketId:32, ?METHOD_NORTH_DATA:8, 0:8, Data/binary>>}, State = #state{uuid = UUID, host_pid = HostPid}) when is_pid(HostPid) ->
|
|
||||||
lager:debug("[ws_channel] north_data uuid: ~p, data: ~p", [UUID, Data]),
|
|
||||||
iot_host:handle(HostPid, {north_data, Data}),
|
|
||||||
{ok, State};
|
|
||||||
|
|
||||||
websocket_handle({binary, <<?PACKET_REQUEST, _PacketId:32, ?METHOD_NORTH_DATA:8, 1:1, Len:7, DeviceUUID:Len/binary, Data/binary>>}, State = #state{uuid = UUID, host_pid = HostPid}) when is_pid(HostPid) ->
|
|
||||||
lager:debug("[ws_channel] north_data uuid: ~p, data: ~p", [UUID, Data]),
|
|
||||||
iot_host:handle(HostPid, {north_data, {DeviceUUID, Data}}),
|
|
||||||
{ok, State};
|
|
||||||
|
|
||||||
websocket_handle({binary, <<?PACKET_REQUEST, _PacketId:32, ?METHOD_PING:8, CipherMetric/binary>>}, State = #state{uuid = UUID, host_pid = HostPid}) when is_pid(HostPid) ->
|
websocket_handle({binary, <<?PACKET_REQUEST, _PacketId:32, ?METHOD_PING:8, CipherMetric/binary>>}, State = #state{uuid = UUID, host_pid = HostPid}) when is_pid(HostPid) ->
|
||||||
lager:debug("[ws_channel] ping uuid: ~p", [UUID]),
|
lager:debug("[ws_channel] ping uuid: ~p", [UUID]),
|
||||||
iot_host:handle(HostPid, {ping, CipherMetric}),
|
iot_host:handle(HostPid, {ping, CipherMetric}),
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user