diff --git a/apps/iot/include/iot.hrl b/apps/iot/include/iot.hrl index 5995dda..a3907f0 100644 --- a/apps/iot/include/iot.hrl +++ b/apps/iot/include/iot.hrl @@ -30,6 +30,8 @@ -record(memory_metric, { %% 使用量 used = 0, + %% 空余 + free = 0, %% 总量 total = 0 }). @@ -38,6 +40,8 @@ -record(disk_metric, { %% 使用量 used = 0, + %% 空余 + free = 0, %% 总量 total = 0 }). @@ -84,6 +88,12 @@ interfaces = [] }). +%% id生成器 +-record(id_generator, { + name :: string(), + seq = 0 :: integer() +}). + %% 主机定义 -record(host, { %% ID @@ -94,6 +104,8 @@ model :: binary(), %% 单元网格编号 cell_id :: integer(), + %% rsa公钥 + public_rsa :: binary(), %% aes的key, 后续通讯需要基于这个加密 aes :: binary(), metric = #host_metric{}, @@ -154,6 +166,61 @@ status = 0 }). +%% scenario 应用场景 +-record(scenario, { + %% 场景id,自增id + scenario_id :: integer(), + %% 名称 + name :: binary(), + %% 场景描述 + desc :: binary(), + %% 场景规则数据 + rule :: binary(), + %% 最后更新时间 + update_ts = 0 :: integer(), + %% 状态 + status :: integer() +}). + +%% 场景部署关系表 +-record(scenario_deploy, { + %% id,自增id + deploy_id :: integer(), + %% 场景id + scenario_id :: integer(), + %% 主机id + host_id :: binary(), + %% 创建时间 + create_ts = 0 :: integer(), + %% 更新时间 + update_ts = 0 :: integer(), + %% 状态 + status = 0:: integer() +}). + +%% 工单管理 +-record(issue, { + issue_id :: integer(), + %% 工单名称 + name :: binary(), + %% 创建用户 + uid :: integer(), + %% 部署类型 + deploy_type :: integer(), + %% 关联的id, 微服务id或者场景id + assoc_id :: any(), + %% 主机的id列表 + hosts = [] :: list(), + %% 超时时间 + timeout = 0 :: integer(), + %% 部署结果 + results = [], + %% 创建时间 + create_ts = 0 :: integer(), + %% 工单状态 + status = 0 :: integer() +}). + -record(http_endpoint, { url = <<>> }). @@ -164,6 +231,7 @@ %% 数据转换规则表 -record(router, { + router_id :: integer(), %% 名称 name = <<>>, %% 数据过滤规则 diff --git a/apps/iot/src/http_api_handler.erl b/apps/iot/src/http_handler/http_api_handler.erl similarity index 88% rename from apps/iot/src/http_api_handler.erl rename to apps/iot/src/http_handler/http_api_handler.erl index 7af6d71..24c3e96 100644 --- a/apps/iot/src/http_api_handler.erl +++ b/apps/iot/src/http_handler/http_api_handler.erl @@ -18,5 +18,6 @@ handle_request("GET", "/api/booking", _, _Params) -> {ok, 200, iot_util:json_data(<<"success">>)}; -handle_request("POST", "/api/booking", _, _Params) -> +handle_request("POST", _, _, Params) -> + lager:debug("body is: ~p", [Params]), {ok, 200, iot_util:json_data(<<"success">>)}. \ No newline at end of file diff --git a/apps/iot/src/http_host_handler.erl b/apps/iot/src/http_handler/http_host_handler.erl similarity index 78% rename from apps/iot/src/http_host_handler.erl rename to apps/iot/src/http_handler/http_host_handler.erl index 75bdeaf..3ab3a8a 100644 --- a/apps/iot/src/http_host_handler.erl +++ b/apps/iot/src/http_handler/http_host_handler.erl @@ -72,7 +72,6 @@ handle_request(_, "/host/list", Params, PostParams) -> handle_request("GET", "/host/detail", #{<<"id">> := HostId}, _) -> lager:debug("[host_handler] detail id is: ~p", [HostId]), - timer:tc(), case host_model:get_host(HostId) of undefined -> {ok, 200, iot_util:json_error(404, <<"host not found">>)}; @@ -87,7 +86,23 @@ handle_request("GET", "/host/detail", #{<<"id">> := HostId}, _) -> Services = lists:map(fun(S) -> service_model:to_map(S) end, Services0), HostInfo2 = maps:put(<<"services">>, Services, HostInfo1), - {ok, 200, iot_util:json_data(HostInfo2)} + %% 获取部署应用场景 + {ok, DeployList0} = scenario_deploy_model:get_host_deploy_list(HostId), + DeployList = lists:map(fun(E) -> scenario_deploy_model:to_map(E) end, DeployList0), + %% 获取部署的场景信息 + DeployList1 = lists:map(fun(DeployInfo = #{<<"scenario_id">> := ScenarioId}) -> + case scenario_model:get_scenario(ScenarioId) of + {ok, Scenario} -> + ScenarioInfo = scenario_model:to_map(Scenario), + DeployInfo#{<<"scenario_info">> => ScenarioInfo}; + undefined -> + DeployInfo#{<<"scenario_info">> => #{}} + end + end, DeployList), + + HostInfo3 = maps:put(<<"deploy_list">>, HostInfo2, DeployList1), + + {ok, 200, iot_util:json_data(HostInfo3)} end; handle_request("POST", "/host/update", _, _Params) -> diff --git a/apps/iot/src/http_handler/http_iot_handler.erl b/apps/iot/src/http_handler/http_iot_handler.erl new file mode 100644 index 0000000..316d576 --- /dev/null +++ b/apps/iot/src/http_handler/http_iot_handler.erl @@ -0,0 +1,163 @@ +%%%------------------------------------------------------------------- +%%% @author licheng5 +%%% @copyright (C) 2023, +%%% @doc +%%% +%%% @end +%%% Created : 06. 3月 2023 14:29 +%%%------------------------------------------------------------------- +-module(http_iot_handler). +-author("licheng5"). +-include("iot.hrl"). + +%% API +-export([handle_request/4]). + +handle_request("GET", "/api/booking", _, _Params) -> + {ok, 200, iot_util:json_data(<<"success">>)}; + +%% 下发参数 +handle_request("POST", "/iot/send_params", _, PostParams = #{<<"host_id">> := HostId, <<"service_name">> := ServiceName, <<"params">> := Params}) -> + lager:debug("body is: ~p", [PostParams]), + + case host_model:activate(HostId) of + {error, Reason} when is_binary(Reason) -> + {ok, 200, iot_util:json_error(404, Reason)}; + {ok, #host{aes = Aes}} -> + Reply = #{ + <<"t">> => 1, + <<"b">> => #{ + <<"t_id">> => iot_util:uuid() , + <<"to">> => ServiceName, + <<"t">> => 10, + <<"m">> => Params + } + }, + EncMsg = iot_cipher_aes:encrypt(Aes, Aes, Reply), + + iot_emqtt_client:publish(<<"clients.cmd.", HostId/binary>>, EncMsg, 1), + lager:debug("enc_reply is: ~p", [EncMsg]), + + {ok, 200, iot_util:json_data(<<"success">>)} + end; + +%% 下发采集项 +handle_request("POST", "/iot/send_metrics", _, + PostParams = #{<<"host_id">> := HostId, <<"service_name">> := ServiceName, <<"metrics">> := Metrics}) -> + + lager:debug("body is: ~p", [PostParams]), + + case host_model:activate(HostId) of + {error, Reason} when is_binary(Reason) -> + {ok, 200, iot_util:json_error(404, Reason)}; + {ok, #host{aes = Aes}} -> + Reply = #{ + <<"t">> => 2, + <<"b">> => #{ + <<"t_id">> => iot_util:uuid() , + <<"to">> => ServiceName, + <<"t">> => 10, + <<"m">> => Metrics + } + }, + EncMsg = iot_cipher_aes:encrypt(Aes, Aes, Reply), + + iot_emqtt_client:publish(<<"clients.cmd.", HostId/binary>>, EncMsg, 1), + lager:debug("enc_reply is: ~p", [EncMsg]), + + {ok, 200, iot_util:json_data(<<"success">>)} + end; + +%% 下发微服务 +handle_request("POST", "/iot/send_mirco_service", _, + PostParams = #{<<"host_id">> := HostId, <<"args">> := Args}) -> + + lager:debug("body is: ~p", [PostParams]), + + case host_model:activate(HostId) of + {error, Reason} when is_binary(Reason) -> + {ok, 200, iot_util:json_error(404, Reason)}; + {ok, #host{aes = Aes}} -> + Reply = #{ + <<"t">> => 3, + <<"b">> => #{ + <<"t_id">> => iot_util:uuid() , + <<"t">> => 10, + <<"m">> => Args + } + }, + EncMsg = iot_cipher_aes:encrypt(Aes, Aes, Reply), + + iot_emqtt_client:publish(<<"clients.cmd.", HostId/binary>>, EncMsg, 1), + lager:debug("enc_reply is: ~p", [EncMsg]), + + {ok, 200, iot_util:json_data(<<"success">>)} + end; + +%% 下发数据流图 +handle_request("POST", "/iot/send_data_flow", _, + PostParams = #{<<"host_id">> := HostId, <<"args">> := Args}) -> + + lager:debug("body is: ~p", [PostParams]), + + case host_model:activate(HostId) of + {error, Reason} when is_binary(Reason) -> + {ok, 200, iot_util:json_error(404, Reason)}; + {ok, #host{aes = Aes}} -> + Reply = #{ + <<"t">> => 4, + <<"b">> => #{ + <<"t_id">> => iot_util:uuid() , + <<"t">> => 10, + <<"m">> => Args + } + }, + EncMsg = iot_cipher_aes:encrypt(Aes, Aes, Reply), + + iot_emqtt_client:publish(<<"clients.cmd.", HostId/binary>>, EncMsg, 1), + lager:debug("enc_reply is: ~p", [EncMsg]), + + {ok, 200, iot_util:json_data(<<"success">>)} + end; + +%% 处理命令下发 +handle_request("POST", "/iot/send_command", _, Params = #{<<"host_id">> := HostId}) -> + lager:debug("body is: ~p", [Params]), + + case host_model:activate(HostId) of + {error, Reason} when is_binary(Reason) -> + {ok, 200, iot_util:json_error(404, Reason)}; + {ok, #host{aes = Aes, public_rsa = PubKey}} -> + Reply = #{ + <<"a">> => true, + <<"aes">> => Aes, + <<"reply">> => <<"client.reply.", HostId/binary>> + }, + EncReply = iot_cipher_rsa:encode(Reply, PubKey), + + %% TODO 发送消息到终端 + lager:debug("enc_reply is: ~p", [EncReply]), + + {ok, 200, iot_util:json_data(<<"success">>)} + end; + +%% 处理主机的授权 +handle_request("POST", "/iot/host_auth", _, Params = #{<<"host_id">> := HostId}) -> + lager:debug("body is: ~p", [Params]), + + case host_model:activate(HostId) of + {error, Reason} when is_binary(Reason) -> + {ok, 200, iot_util:json_error(404, Reason)}; + {ok, #host{aes = Aes, public_rsa = PubKey}} -> + Reply = #{ + <<"a">> => true, + <<"aes">> => Aes, + <<"reply">> => <<"client.reply.", HostId/binary>> + }, + EncReply = iot_cipher_rsa:encode(Reply, PubKey), + + %% TODO 发送消息到终端 + lager:debug("enc_reply is: ~p", [EncReply]), + + {ok, 200, iot_util:json_data(<<"success">>)} + end. \ No newline at end of file diff --git a/apps/iot/src/http_handler/http_router_handler.erl b/apps/iot/src/http_handler/http_router_handler.erl new file mode 100644 index 0000000..e292b11 --- /dev/null +++ b/apps/iot/src/http_handler/http_router_handler.erl @@ -0,0 +1,74 @@ +%%%------------------------------------------------------------------- +%%% @author licheng5 +%%% @copyright (C) 2020, +%%% @doc +%%% +%%% @end +%%% Created : 26. 4月 2020 3:36 下午 +%%%------------------------------------------------------------------- +-module(http_router_handler). +-author("licheng5"). +-include("iot.hrl"). + +%% API +-export([handle_request/4]). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% helper methods +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +handle_request(_, "/router/list", Params, PostParams) -> + Page0 = maps:get(<<"page">>, Params, <<"1">>), + Size0 = maps:get(<<"size">>, Params, <<"10">>), + Page = binary_to_integer(Page0), + Size = binary_to_integer(Size0), + + true = Page > 0 andalso Size > 0, + Start = (Page - 1) * Size, + + %% 处理查询条件 + Name = maps:get(<<"name">>, PostParams, <<"">>), + + MatchHead = #router{name = '$1', _ = '_'}, + Guard = [], + Guard1 = case Name =/= <<"">> of + true -> + [{'=:=', '$1', Name}|Guard]; + false -> + Guard + end, + + Result = ['$_'], + + case router_model:get_routers({MatchHead, Guard1, Result}, Start, Size) of + {ok, Routers, TotalNum} -> + Response = #{ + <<"routers">> => lists:map(fun(R) -> router_model:to_map(R) end, Routers), + <<"total_num">> => TotalNum + }, + + {ok, 200, iot_util:json_data(Response)}; + {error, Reason} -> + lager:warning("[host_handler] get a error: ~p", [Reason]), + {ok, 200, iot_util:json_error(404, <<"database error">>)} + end; + +handle_request("GET", "/router/detail", #{<<"router_id">> := RouterId}, _) -> + case router_model:get_router(RouterId) of + undefined -> + {ok, 200, iot_util:json_error(404, <<"router not found">>)}; + {ok, Router} -> + RouterInfo = router_model:to_map(Router), + + {ok, 200, iot_util:json_data(RouterInfo)} + end; + +handle_request("POST", "/router/changer_status", _, Params = #{<<"router_id">> := RouterId, <<"status">> := NStatus}) -> + lager:debug("[router_handler] post params is: ~p", [Params]), + + case router_model:change_status(RouterId, NStatus) of + ok -> + {ok, 200, iot_util:json_data(<<"success">>)}; + {error, _} -> + {ok, 200, iot_util:json_error(404, <<"error">>)} + end. \ No newline at end of file diff --git a/apps/iot/src/http_handler/http_scenario_handler.erl b/apps/iot/src/http_handler/http_scenario_handler.erl new file mode 100644 index 0000000..7f9acfe --- /dev/null +++ b/apps/iot/src/http_handler/http_scenario_handler.erl @@ -0,0 +1,106 @@ +%%%------------------------------------------------------------------- +%%% @author licheng5 +%%% @copyright (C) 2020, +%%% @doc +%%% +%%% @end +%%% Created : 26. 4月 2020 3:36 下午 +%%%------------------------------------------------------------------- +-module(http_scenario_handler). +-author("licheng5"). +-include("iot.hrl"). + +%% API +-export([handle_request/4]). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% helper methods +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +handle_request(_, "/scenario/list", Params, PostParams) -> + Page0 = maps:get(<<"page">>, Params, <<"1">>), + Size0 = maps:get(<<"size">>, Params, <<"10">>), + Page = binary_to_integer(Page0), + Size = binary_to_integer(Size0), + + true = Page > 0 andalso Size > 0, + Start = (Page - 1) * Size, + + %% 处理查询条件 + Name = maps:get(<<"name">>, PostParams, <<"">>), + + MatchHead = #scenario{name = '$1', _ = '_'}, + Guard = [], + Guard1 = case Name =/= <<"">> of + true -> + [{'=:=', '$1', Name}|Guard]; + false -> + Guard + end, + Result = ['$_'], + + case scenario_model:get_scenario_list({MatchHead, Guard1, Result}, Start, Size) of + {ok, ScenarioList, TotalNum} -> + Response = #{ + <<"scenarios">> => lists:map(fun(Host) -> scenario_model:to_map(Host) end, ScenarioList), + <<"total_num">> => TotalNum + }, + {ok, 200, iot_util:json_data(Response)}; + {error, Reason} -> + lager:warning("[http_scenario_handler] get a error: ~p", [Reason]), + {ok, 200, iot_util:json_error(404, <<"database error">>)} + end; + +handle_request("GET", "/scenario/detail", #{<<"id">> := ScenarioId0}, _) -> + ScenarioId = binary_to_integer(ScenarioId0), + case scenario_model:get_scenario(ScenarioId) of + undefined -> + {ok, 200, iot_util:json_error(404, <<"scenario not found">>)}; + {ok, Scenario} -> + ScenarioInfo = scenario_model:to_map(Scenario), + %% 获取场景下部署的主机列表 + {ok, DeployList0} = scenario_deploy_model:get_scenario_deploy_list(ScenarioId), + DeployList = lists:map(fun(E) -> scenario_deploy_model:to_map(E) end, DeployList0), + ScenarioInfo1 = maps:put(<<"deploy_list">>, ScenarioInfo, DeployList), + %% 获取部署的主机信息 + ScenarioInfo2 = lists:map(fun(Info = #{<<"host_id">> := HostId}) -> + case host_model:get_host(HostId) of + {ok, Host} -> + HostInfo = host_model:to_map(Host), + Info#{<<"host_info">> => HostInfo}; + undefined -> + Info#{<<"host_info">> => #{}} + end + end, ScenarioInfo1), + + {ok, 200, iot_util:json_data(ScenarioInfo2)} + end; + +handle_request("POST", "/scenario/change_status", _, #{<<"scenario_id">> := ScenarioId, <<"status">> := Status}) when is_integer(ScenarioId), is_integer(Status) -> + case scenario_model:change_status(ScenarioId, Status) of + ok -> + {ok, 200, iot_util:json_data(<<"success">>)}; + {error, Reason} when is_binary(Reason) -> + lager:warning("[http_scenario_handler] change_status get a error: ~p", [Reason]), + {ok, 200, iot_util:json_error(404, Reason)} + end; + +%% 部署 +handle_request("POST", "/scenario/deploy", _, #{<<"scenario_id">> := ScenarioId, <<"status">> := Status}) when is_integer(ScenarioId), is_integer(Status) -> + case scenario_model:change_status(ScenarioId, Status) of + ok -> + {ok, 200, iot_util:json_data(<<"success">>)}; + {error, Reason} when is_binary(Reason) -> + lager:warning("[http_scenario_handler] change_status get a error: ~p", [Reason]), + {ok, 200, iot_util:json_error(404, Reason)} + end; + +%% 删除?? +handle_request("POST", "/scenario/change_status", _, #{<<"scenario_id">> := ScenarioId, <<"status">> := Status}) when is_integer(ScenarioId), is_integer(Status) -> + case scenario_model:change_status(ScenarioId, Status) of + ok -> + {ok, 200, iot_util:json_data(<<"success">>)}; + {error, Reason} when is_binary(Reason) -> + lager:warning("[http_scenario_handler] change_status get a error: ~p", [Reason]), + {ok, 200, iot_util:json_error(404, Reason)} + end. \ No newline at end of file diff --git a/apps/iot/src/iot.app.src b/apps/iot/src/iot.app.src index 5df4b41..9d1ffb3 100644 --- a/apps/iot/src/iot.app.src +++ b/apps/iot/src/iot.app.src @@ -13,8 +13,11 @@ parse_trans, hackney, poolboy, + mnesia, crypto, + public_key, + ssl, kernel, stdlib ]}, diff --git a/apps/iot/src/iot_app.erl b/apps/iot/src/iot_app.erl index 1a118a2..c771355 100644 --- a/apps/iot/src/iot_app.erl +++ b/apps/iot/src/iot_app.erl @@ -8,11 +8,15 @@ -behaviour(application). -export([start/2, stop/1]). +-export([start_http_server/0]). start(_StartType, _StartArgs) -> io:setopts([{encoding, unicode}]), %% 启动数据库 mnesia:start(), + Tables = mnesia:system_info(tables), + iot_util:assert_exec(lists:member(router, [Tables]), fun() -> mnesia:wait_for_tables([router], infinity) end), + %% 加速内存的回收 erlang:system_flag(fullsweep_after, 16), @@ -35,8 +39,10 @@ start_http_server() -> Dispatcher = cowboy_router:compile([ {'_', [ - {"/host/[...]", http_protocol, [http_host_handler]} - ]} + {"/host/[...]", http_protocol, [http_host_handler]}, + {"/api/[...]", http_protocol, [http_api_handler]}, + {"/router/[...]", http_protocol, [http_router_handler]} + ]} ]), TransOpts = [ {port, Port}, diff --git a/apps/iot/src/iot_emqtt_client.erl b/apps/iot/src/iot_emqtt_client.erl new file mode 100644 index 0000000..60a53f6 --- /dev/null +++ b/apps/iot/src/iot_emqtt_client.erl @@ -0,0 +1,119 @@ +%%%------------------------------------------------------------------- +%%% @author licheng5 +%%% @copyright (C) 2023, +%%% @doc +%%% +%%% @end +%%% Created : 06. 3月 2023 15:42 +%%%------------------------------------------------------------------- +-module(iot_emqtt_client). +-author("licheng5"). + +-behaviour(gen_server). + +%% API +-export([start_link/0]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-export([publish/3]). + +-define(SERVER, ?MODULE). + +-record(state, { + conn_pid :: pid() +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +publish(Topic, Message, Qos) when is_binary(Topic), is_binary(Message), is_integer(Qos), Qos == 0; Qos == 1 -> + gen_server:call(?SERVER, {publish, Topic, Message, Qos}). + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link() -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([]) -> + %{ok, ServerOpts} = application:get_env(iot, emqx_server), + %{ok, ConnPid} = emqtt:start_link([{clientid, iot_emqtt_client}, {owner, self()}|ServerOpts]), + %{ok, _Props} = emqtt:connect(ConnPid), + + %SubOpts = [{qos, 1}], + %{ok, _Props, _ReasonCodes} = emqtt:subscribe(ConnPid, #{}, [{<<"hello">>, SubOpts}]), + + {ok, #state{}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call({publish, Topic, Message, Qos}, _From, State = #state{conn_pid = ConnPid}) -> + if + Qos == 0 -> + ok = emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, 0}]); + Qos == 1 -> + {ok, _PktId} = emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, 1}]); + true -> + ok + end, + {reply, ok, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast(_Request, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info(_Info, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== diff --git a/apps/iot/src/iot_http_client.erl b/apps/iot/src/iot_http_client.erl new file mode 100644 index 0000000..f1f73f6 --- /dev/null +++ b/apps/iot/src/iot_http_client.erl @@ -0,0 +1,40 @@ +%%%------------------------------------------------------------------- +%%% @author licheng5 +%%% @copyright (C) 2023, +%%% @doc +%%% +%%% @end +%%% Created : 03. 3月 2023 11:48 +%%%------------------------------------------------------------------- +-module(iot_http_client). +-author("licheng5"). + +%% API +-export([post/2]). + +post(Url, Body) when is_list(Url), is_binary(Body) -> + case hackney:request(post, Url, [], Body) of + {ok, 200, _, ClientRef} -> + case hackney:body(ClientRef) of + {ok, RespBody} -> + lager:debug("[iot_http_client] url: ~p, response is: ~p", [Url, RespBody]), + ok; + {error, Reason} -> + lager:warning("[iot_http_client] url: ~p, get error: ~p", [Url, Reason]), + {error, failed} + end; + + {ok, HttpCode, _, ClientRef} -> + case hackney:body(ClientRef) of + {ok, RespBody} -> + lager:debug("[iot_http_client] url: ~p, http_code: ~p, response is: ~p", [Url, HttpCode, RespBody]), + ok; + {error, Reason} -> + lager:warning("[iot_http_client] url: ~p, http_code: ~p, get error: ~p", [Url, HttpCode, Reason]), + {error, failed} + end; + + {error, Reason} -> + lager:warning("[iot_http_client] url: ~p, get error: ~p", [Url, Reason]), + {error, failed} + end. \ No newline at end of file diff --git a/apps/iot/src/iot_issue.erl b/apps/iot/src/iot_issue.erl new file mode 100644 index 0000000..08b1c07 --- /dev/null +++ b/apps/iot/src/iot_issue.erl @@ -0,0 +1,119 @@ +%%%------------------------------------------------------------------- +%%% @author licheng5 +%%% @copyright (C) 2023, +%%% @doc +%%% +%%% @end +%%% Created : 10. 3月 2023 16:44 +%%%------------------------------------------------------------------- +-module(iot_issue). +-author("licheng5"). +-include("iot.hrl"). + +-behaviour(gen_server). + +%% API +-export([start_link/1]). +-export([get_pid/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, { + issue :: #issue{}, + timer_ref +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +get_pid(IssueId) when is_integer(IssueId) -> + whereis(get_name(IssueId)). + +get_name(IssueId) when is_integer(IssueId) -> + list_to_atom("iot_issue:" ++ integer_to_list(IssueId)). + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link(Issue :: #issue{}) -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link(Issue = #issue{issue_id = IssueId}) -> + Name = get_name(IssueId), + gen_server:start_link({local, Name}, ?MODULE, [Issue], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([Issue = #issue{timeout = Timeout0, hosts = Hosts}]) -> + lager:debug("iot_issue started!!: ~p", [Issue]), + %% 启动任务定时器, 默认最长为1个小时 + Timeout = if Timeout0 > 0 -> Timeout0; true -> 3600 end, + TimerRef = erlang:start_timer(Timeout * 1000, self(), issue_task_timeout), + + %% TODO 下发数据到主机 + lists:map(fun(Host) -> + iot_emqtt_client:publish(Host, x, 1) + end, Hosts), + + {ok, #state{issue = Issue, timer_ref = TimerRef}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(_Request, _From, State = #state{}) -> + {reply, ok, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast(_Request, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info(_Info, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== diff --git a/apps/iot/src/iot_issue_sup.erl b/apps/iot/src/iot_issue_sup.erl new file mode 100644 index 0000000..9c4a369 --- /dev/null +++ b/apps/iot/src/iot_issue_sup.erl @@ -0,0 +1,73 @@ +%%%------------------------------------------------------------------- +%%% @author licheng5 +%%% @copyright (C) 2023, +%%% @doc +%%% +%%% @end +%%% Created : 10. 3月 2023 16:44 +%%%------------------------------------------------------------------- +-module(iot_issue_sup). +-author("licheng5"). +-include("iot.hrl"). + +-behaviour(supervisor). + +%% API +-export([start_link/0, start_issue/1]). + +%% Supervisor callbacks +-export([init/1]). + +-define(SERVER, ?MODULE). + +%%%=================================================================== +%%% API functions +%%%=================================================================== + +%% @doc Starts the supervisor +-spec(start_link() -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link() -> + supervisor:start_link({local, ?SERVER}, ?MODULE, []). + +%%%=================================================================== +%%% Supervisor callbacks +%%%=================================================================== + +%% @private +%% @doc Whenever a supervisor is started using supervisor:start_link/[2,3], +%% this function is called by the new process to find out about +%% restart strategy, maximum restart frequency and child +%% specifications. +-spec(init(Args :: term()) -> + {ok, {SupFlags :: {RestartStrategy :: supervisor:strategy(), + MaxR :: non_neg_integer(), MaxT :: non_neg_integer()}, + [ChildSpec :: supervisor:child_spec()]}} + | ignore | {error, Reason :: term()}). +init([]) -> + SupFlags = #{strategy => simple_one_for_one, intensity => 1000, period => 3600}, + AChild = #{ + id => 'iot_issue', + start => {'iot_issue', start_link, []}, + restart => temporary, + shutdown => 2000, + type => worker, + modules => ['iot_issue'] + }, + + {ok, {SupFlags, [AChild]}}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +%% 启动一个任务 +-spec start_issue(Issue :: #issue{}) -> {ok, Pid :: pid()} | {error, Reason :: any()}. +start_issue(Issue = #issue{}) -> + case supervisor:start_child(?MODULE, [Issue]) of + {ok, Pid} -> + {ok, Pid}; + {error, {'already_started', Pid}} -> + {ok, Pid}; + Error -> + Error + end. \ No newline at end of file diff --git a/apps/iot/src/iot_mnesia.erl b/apps/iot/src/iot_mnesia.erl index 408a981..82e7f3a 100644 --- a/apps/iot/src/iot_mnesia.erl +++ b/apps/iot/src/iot_mnesia.erl @@ -24,6 +24,14 @@ init_database() -> ok = mnesia:start(), %% 创建数据库表 + %% id生成表 + mnesia:create_table(id_generator, [ + {attributes, record_info(fields, id_generator)}, + {record_name, id_generator}, + {disc_copies, [node()]}, + {type, ordered_set} + ]), + %% 主机表 mnesia:create_table(host, [ {attributes, record_info(fields, host)}, @@ -48,6 +56,38 @@ init_database() -> {type, ordered_set} ]), + %% 转发规则表 + mnesia:create_table(router, [ + {attributes, record_info(fields, router)}, + {record_name, router}, + {disc_copies, [node()]}, + {type, ordered_set} + ]), + + %% 应用场景 + mnesia:create_table(scenario, [ + {attributes, record_info(fields, scenario)}, + {record_name, scenario}, + {disc_copies, [node()]}, + {type, ordered_set} + ]), + + %% 应用场景部署关系表 + mnesia:create_table(scenario_deploy, [ + {attributes, record_info(fields, scenario_deploy)}, + {record_name, scenario_deploy}, + {disc_copies, [node()]}, + {type, ordered_set} + ]), + + %% 工单表 + mnesia:create_table(issue, [ + {attributes, record_info(fields, issue)}, + {record_name, issue}, + {disc_copies, [node()]}, + {type, ordered_set} + ]), + ok. %% 加入集群 @@ -69,4 +109,7 @@ copy_database(MasterNode) when is_atom(MasterNode) -> mnesia:add_table_copy(host, node(), ram_copies), mnesia:add_table_copy(terminal, node(), ram_copies), mnesia:add_table_copy(service, node(), ram_copies), + mnesia:add_table_copy(scenario, node(), ram_copies), + mnesia:add_table_copy(scenario_deploy, node(), ram_copies), + mnesia:add_table_copy(issue, node(), ram_copies), ok. \ No newline at end of file diff --git a/apps/iot/src/iot_mock.erl b/apps/iot/src/iot_mock.erl index 5eb5f3b..f0d7b0d 100644 --- a/apps/iot/src/iot_mock.erl +++ b/apps/iot/src/iot_mock.erl @@ -11,8 +11,23 @@ -include("iot.hrl"). %% API --export([insert_hosts/0, insert_services/1, insert_terminals/1]). +-export([insert_hosts/0, insert_services/1, insert_terminals/1, insert_routers/0]). +-export([start_router/1]). -export([rsa_encode/1]). +-export([start_issue/0]). + +start_issue() -> + iot_issue_sup:start_issue(#issue{ + issue_id = 1, + name = <<"issue 1">>, + uid = 1234, + deploy_type = 1, + assoc_id = 1, + hosts = [<<"host1">>, <<"host2">>], + timeout = 6, + create_ts = 0, + status = 0 + }). insert_hosts() -> lists:foreach(fun(Id0) -> @@ -53,8 +68,6 @@ insert_services(HostId) -> insert_terminals(HostId) -> lists:foreach(fun(Id0) -> - Q0 = queue:new(), - Q = queue:in({1234, 21}, Q0), Terminal = #terminal{ terminal_id = integer_to_binary(Id0), host_id = HostId, @@ -71,6 +84,28 @@ insert_terminals(HostId) -> terminal_model:add_terminal(Terminal) end, lists:seq(1, 100)). +insert_routers() -> + lists:foreach(fun(Id0) -> + R = #router{ + router_id = Id0, + name = <<"计费电表"/utf8>>, + rule = <<"测试规则"/utf8>>, + endpoint = #http_endpoint{url = <<"http://127.0.0.1:8080/data">>}, + status = 1 + }, + router_model:add_router(R) + end, lists:seq(1, 100)). + +start_router(Id0) when is_integer(Id0) -> + R = #router{ + router_id = Id0, + name = <<"计费电表"/utf8>>, + rule = <<"测试规则"/utf8>>, + endpoint = #http_endpoint{url = <<"http://127.0.0.1:8080/data">>}, + status = 1 + }, + router_model:add_router(R), + iot_router_sup:start_new_router(R). rsa_encode(Data) when is_binary(Data) -> %% 读取相关配置 @@ -106,28 +141,4 @@ rsa_decode(EncData) when is_binary(EncData) -> PlainData = public_key:decrypt_private(EncData, PubKey), lager:debug("plain data is: ~p", [PlainData]), - ok. - -scan_and(Tokens) -> - scan_and(Tokens, [], [], 0). -%% 扫描完成并且最后一个表达式为空 -scan_and([], [], Acc, 0) -> - lists:reverse(Acc); -%% 扫描完成并且最后一个表达式不为空 -scan_and([], Expr, Acc, 0) -> - lists:reverse([{simple, lists:reverse(Expr)}|Acc]); -%% 遇到OR关键词, 并且此时的层级为0 -scan_and([32, $O, $R, 32|Tokens], Expr, Acc, 0) -> - scan_and(Tokens, [], [{simple, lists:reverse(Expr)}|Acc], 0); -%% 扫描到左括号 && Level > 0; 此时的Expr需要更多的字符 -scan_and([Token|Tokens], Expr, Acc, Level) when Token == $( -> - scan_and(Tokens, [Token|Expr], Acc, Level + 1); -%% 扫描到右括号 && Level == 1; 此时的Expr表达式的内部嵌套的子串扫描完成 -scan_and([Token|Tokens], Expr, Acc, 1) when Token == $) -> - scan_and(Tokens, [Token|Expr], Acc, 0); -%% 扫描到右括号 && Level > 1; 此时的Expr表达式的内部嵌套的子串扫描完成,Level的值减1 -scan_and([Token|Tokens], Expr, Acc, Level) when Token == $) -> - scan_and(Tokens, [Token|Expr], Acc, Level - 1); -%% 普通字符 -scan_and([Token|Tokens], Expr, Acc, Level) -> - scan_and(Tokens, [Token|Expr], Acc, Level). \ No newline at end of file + ok. \ No newline at end of file diff --git a/apps/iot/src/iot_message_handler.erl b/apps/iot/src/iot_mqtt_message_handler.erl similarity index 85% rename from apps/iot/src/iot_message_handler.erl rename to apps/iot/src/iot_mqtt_message_handler.erl index 7c01562..4a3702c 100644 --- a/apps/iot/src/iot_message_handler.erl +++ b/apps/iot/src/iot_mqtt_message_handler.erl @@ -6,7 +6,7 @@ %%% @end %%% Created : 18. 2月 2023 21:39 %%%------------------------------------------------------------------- --module(iot_message_handler). +-module(iot_mqtt_message_handler). -author("aresei"). -include("iot.hrl"). @@ -81,7 +81,7 @@ handle(<<"server.register">>, Msg = #{<<"c_id">> := ClientId, <<"r">> := PubKey, handle(<<"server.data">>, #{<<"c_id">> := HostId, <<"d">> := Data}) -> case host_model:get_host(HostId) of {ok, #host{aes = Aes}} -> - Services = service_model:get_services(HostId), + Services = service_model:get_host_services(HostId), PlainData = iot_cipher_aes:decrypt(Aes, Aes, Data), case jiffy:decode(PlainData, [return_maps]) of Infos when is_list(Infos) -> @@ -107,7 +107,29 @@ handle(<<"server.data">>, #{<<"c_id">> := HostId, <<"d">> := Data}) -> end; undefined -> lager:warning("[iot_message_handler] host_id: ~p, not exists", [HostId]) -end. +end; + +%% 处理服务器的ping +handle(<<"server.ping">>, #{<<"c">> := HostId, <<"at">> := At, + <<"h">> := #{<<"fm">> := FreeMemory, <<"fd">> := FreeDisk, <<"cp">> := CpuLoad}}) -> + + case host_model:get_host(HostId) of + {ok, Host=#host{metric = Metric = #host_metric{disk = Disk, memory = Memory}}} -> + NMetric = Metric#host_metric{ + disk = Disk#disk_metric{free = FreeDisk}, + memory = Memory#memory_metric{free = FreeMemory} + }, + + case mnesia:transaction(fun() -> mnesia:write(host, Host#host{metric = NMetric}, write) end) of + {atomic, ok} -> + ok; + {error, Reason} -> + lager:warning("[iot_message_handler] host_id: ~p, ping get error: ~p", [HostId, Reason]) + end; + + undefined -> + lager:warning("[iot_message_handler] host_id: ~p, not exists", [HostId]) + end. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% helper methods diff --git a/apps/iot/src/iot_router.erl b/apps/iot/src/iot_router.erl new file mode 100644 index 0000000..d295bbc --- /dev/null +++ b/apps/iot/src/iot_router.erl @@ -0,0 +1,105 @@ +%%%------------------------------------------------------------------- +%%% @author licheng5 +%%% @copyright (C) 2023, +%%% @doc +%%% +%%% @end +%%% Created : 01. 3月 2023 16:03 +%%%------------------------------------------------------------------- +-module(iot_router). +-author("licheng5"). +-include("iot.hrl"). + +-behaviour(gen_server). + +%% API +-export([start_link/2]). + +-export([get_name/1]). +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, { + +}). + +get_name(Id) when is_integer(Id) -> + list_to_atom("iot_router:" ++ integer_to_list(Id)). + +%%%=================================================================== +%%% API +%%%=================================================================== + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link(Name :: atom(), Router :: #router{}) -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link(Name, Router = #router{}) -> + gen_server:start_link({global, Name}, ?MODULE, [Router], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([Router]) -> + lager:debug("router is: ~p", [Router]), + {ok, #state{}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(_Request, _From, State = #state{}) -> + {reply, ok, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast(_Request, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info(_Info, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== diff --git a/apps/iot/src/iot_router_sup.erl b/apps/iot/src/iot_router_sup.erl new file mode 100644 index 0000000..3f85710 --- /dev/null +++ b/apps/iot/src/iot_router_sup.erl @@ -0,0 +1,86 @@ +%%%------------------------------------------------------------------- +%%% @author licheng5 +%%% @copyright (C) 2023, +%%% @doc +%%% +%%% @end +%%% Created : 01. 3月 2023 16:01 +%%%------------------------------------------------------------------- +-module(iot_router_sup). +-author("licheng5"). +-include("iot.hrl"). + +-behaviour(supervisor). + +%% API +-export([start_link/0, start_new_router/1]). + +%% Supervisor callbacks +-export([init/1]). + +-define(SERVER, ?MODULE). + +%%%=================================================================== +%%% API functions +%%%=================================================================== + +%% @doc Starts the supervisor +-spec(start_link() -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link() -> + supervisor:start_link({local, ?SERVER}, ?MODULE, []). + +%%%=================================================================== +%%% Supervisor callbacks +%%%=================================================================== + +%% @private +%% @doc Whenever a supervisor is started using supervisor:start_link/[2,3], +%% this function is called by the new process to find out about +%% restart strategy, maximum restart frequency and child +%% specifications. +-spec(init(Args :: term()) -> + {ok, {SupFlags :: {RestartStrategy :: supervisor:strategy(), + MaxR :: non_neg_integer(), MaxT :: non_neg_integer()}, + [ChildSpec :: supervisor:child_spec()]}} + | ignore | {error, Reason :: term()}). +init([]) -> + SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600}, + + %% 启动目前生效的全部转发规则 + Specs = case router_model:get_all_valid_routers() of + {ok, Routers} -> + lists:map(fun generate_router_spec/1, Routers); + error -> + [] + end, + + {ok, {SupFlags, Specs}}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +%% 动态启动一个转发规则 +-spec start_new_router(Router :: #router{}) -> {ok, Pid :: pid()} | {error, Reason :: any()}. +start_new_router(Router = #router{}) -> + Spec = generate_router_spec(Router), + case supervisor:start_child(?MODULE, Spec) of + {ok, Pid} -> + {ok, Pid}; + {error, {already_started, Pid}} -> + {ok, Pid}; + {error, Error} -> + lager:debug("start router get a error: ~p", [Error]), + {error, Error} + end. + +generate_router_spec(Router = #router{router_id = RouterId}) -> + Id = iot_router:get_name(RouterId), + #{ + id => Id, + start => {'iot_router', start_link, [Id, Router]}, + restart => permanent, + shutdown => 2000, + type => worker, + modules => ['iot_router'] + }. \ No newline at end of file diff --git a/apps/iot/src/iot_rule_parser.erl b/apps/iot/src/iot_rule_parser.erl index 834ea31..bc68107 100644 --- a/apps/iot/src/iot_rule_parser.erl +++ b/apps/iot/src/iot_rule_parser.erl @@ -11,6 +11,8 @@ %% API -export([parse/1, test/0]). +-export([parse_bracket/1]). + -record(or_expr, { expr = [] @@ -156,4 +158,60 @@ get_tag(0) -> get_tag(_) -> complex. +-record(bracket, { + items = [] +}). + +%% 括号解析 +parse_bracket(Tokens) -> + parse_bracket(Tokens, [], []). + +parse_bracket([], [], S) -> + #bracket{items = lists:reverse(S)}; +parse_bracket([], Acc, S) -> + #bracket{items = lists:reverse([lists:reverse(Acc)|S])}; +parse_bracket([$(|Tokens], Acc, S) -> + {ChildElements, RestTokens} = parse_bracket0(Tokens), + Bracket = parse_bracket(ChildElements), + parse_bracket(RestTokens, [], [Bracket, lists:reverse(Acc)|S]); +parse_bracket([H|Tokens], Acc, S) -> + parse_bracket(Tokens, [H|Acc], S). + +%% 截取配对的括号 +parse_bracket0(Tokens) -> + parse_bracket0(Tokens, [], 1). +parse_bracket0([$)|Tokens], Acc, Level) when Level - 1 == 0 -> + {lists:reverse(Acc), Tokens}; +parse_bracket0([$)|Tokens], Acc, Level) -> + parse_bracket0(Tokens, [$)|Acc], Level - 1); +parse_bracket0([$(|Tokens], Acc, Level) -> + parse_bracket0(Tokens, [$(|Acc], Level + 1); +parse_bracket0([H|Tokens], Acc, Level) -> + parse_bracket0(Tokens, [H|Acc], Level). + +%% 处理运算符号解析出表达式和操作符号 +%{bracket,["id < 3 OR ", +% {bracket,["name = 'anlicheng' OR ", +% {bracket,["name = 'test' AND ", +% {bracket,["y = 1 OR x = 1"]}]}]}, +% " OR ", +% {bracket,["1 = 1"]}]} + +syntax(Bracket = #bracket{items = Items}) -> + lists:map(fun(E) -> + case E of + B = #bracket{} -> + syntax(B); + Expr -> + string:split(Expr, "AND", all) + + end + end, Items), + + + + ok. + + + diff --git a/apps/iot/src/iot_sup.erl b/apps/iot/src/iot_sup.erl index 2cddf90..3cb80b4 100644 --- a/apps/iot/src/iot_sup.erl +++ b/apps/iot/src/iot_sup.erl @@ -26,10 +26,35 @@ start_link() -> %% type => worker(), % optional %% modules => modules()} % optional init([]) -> - SupFlags = #{strategy => one_for_all, - intensity => 0, - period => 1}, - ChildSpecs = [], + SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600}, + ChildSpecs = [ + #{ + id => 'iot_router_sup', + start => {'iot_router_sup', start_link, []}, + restart => permanent, + shutdown => 2000, + type => supervisor, + modules => ['iot_router_sup'] + }, + + #{ + id => 'iot_issue_sup', + start => {'iot_issue_sup', start_link, []}, + restart => permanent, + shutdown => 2000, + type => supervisor, + modules => ['iot_issue_sup'] + }, + + #{ + id => 'iot_emqtt_client', + start => {'iot_emqtt_client', start_link, []}, + restart => permanent, + shutdown => 2000, + type => worker, + modules => ['iot_emqtt_client'] + } + ], {ok, {SupFlags, ChildSpecs}}. %% internal functions diff --git a/apps/iot/src/iot_util.erl b/apps/iot/src/iot_util.erl index ec01914..ecd9b0e 100644 --- a/apps/iot/src/iot_util.erl +++ b/apps/iot/src/iot_util.erl @@ -13,7 +13,7 @@ -export([timestamp/0, number_format/2, current_time/0]). -export([step/3, chunks/2, rand_bytes/1, uuid/0]). -export([json_data/1, json_error/2]). --export([queue_limited_in/3]). +-export([queue_limited_in/3, assert_exec/2]). %% 时间,精确到毫秒 timestamp() -> @@ -80,4 +80,10 @@ queue_limited_in(Item, Q, Num) when is_integer(Num) -> queue:in(Item, Q1); false -> queue:in(Item, Q) - end. \ No newline at end of file + end. + +-spec assert_exec(boolean(), any()) -> any(). +assert_exec(true, Fun) when is_function(Fun, 0) -> + {ok, Fun()}; +assert_exec(_, _) -> + aborted. \ No newline at end of file diff --git a/apps/iot/src/model/host_model.erl b/apps/iot/src/model/host_model.erl index bad464b..284e281 100644 --- a/apps/iot/src/model/host_model.erl +++ b/apps/iot/src/model/host_model.erl @@ -12,7 +12,7 @@ -include_lib("stdlib/include/qlc.hrl"). %% API --export([get_host/1, get_hosts/3, get_stat/0, add_host/1, change_status/2, delete/1, table_size/0, find_hosts/3]). +-export([get_host/1, get_hosts/3, get_stat/0, add_host/1, change_status/2, delete/1, table_size/0, find_hosts/3, activate/1]). -export([to_map/1]). get_host(HostId) when is_binary(HostId) -> @@ -97,6 +97,36 @@ change_status(HostId, Status) when is_binary(HostId), is_integer(Status) -> {error, Reason} end. +-spec activate(HostId :: binary()) -> ok | {error, Reason :: any()}. +activate(HostId) when is_binary(HostId) -> + Fun = fun() -> + case mnesia:read(host, HostId) of + [] -> + mnesia:abort(<<"host not found">>); + [Host = #host{status = Status}] -> + case Status =:= ?HOST_STATUS_INACTIVE of + true -> + Aes = iot_util:rand_bytes(16), + NHost = Host#host{ + aes = Aes, + activated_ts = iot_util:current_time(), + update_ts = iot_util:current_time(), + status = ?HOST_STATUS_ONLINE + }, + ok = mnesia:write(host, NHost, write), + NHost; + false -> + mnesia:abort(<<"host status invalid">>) + end + end + end, + case mnesia:transaction(Fun) of + {atomic, Host} -> + {ok, Host}; + {aborted, Reason} -> + {error, Reason} + end. + -spec delete(HostId :: binary()) -> ok | {error, Reason :: any()}. delete(HostId) when is_binary(HostId) -> case mnesia:transaction(fun() -> mnesia:delete(host, HostId, write) end) of diff --git a/apps/iot/src/model/id_generator_model.erl b/apps/iot/src/model/id_generator_model.erl new file mode 100644 index 0000000..2590ec2 --- /dev/null +++ b/apps/iot/src/model/id_generator_model.erl @@ -0,0 +1,22 @@ +%%%------------------------------------------------------------------- +%%% @author licheng5 +%%% @copyright (C) 2021, +%%% @doc +%%% +%%% @end +%%% Created : 27. 4月 2021 下午4:38 +%%%------------------------------------------------------------------- +-module(id_generator_model). +-author("licheng5"). +-include("iot.hrl"). + +%% API +-export([generate/1]). + +generate(Name) when is_list(Name) -> + case mnesia:transaction(fun() -> mnesia:dirty_update_counter(id_generator, Name, 1) end) of + {atomic, Id} -> + {ok, Id}; + {aborted, Reason} -> + {error, Reason} + end. \ No newline at end of file diff --git a/apps/iot/src/model/issue_model.erl b/apps/iot/src/model/issue_model.erl new file mode 100644 index 0000000..7ec209e --- /dev/null +++ b/apps/iot/src/model/issue_model.erl @@ -0,0 +1,109 @@ +%%%------------------------------------------------------------------- +%%% @author licheng5 +%%% @copyright (C) 2021, +%%% @doc +%%% +%%% @end +%%% Created : 27. 4月 2021 下午4:38 +%%%------------------------------------------------------------------- +-module(issue_model). +-author("licheng5"). +-include("iot.hrl"). +-include_lib("stdlib/include/qlc.hrl"). + +-define(TAB_NAME, issue). + +%% API +-export([get_issue/1, get_issues/3, add_issue/1, change_status/2, delete/1, table_size/0, get_user_issues/3]). +-export([to_map/1]). + +get_issue(IssueId) when is_integer(IssueId) -> + case mnesia:dirty_read(?TAB_NAME, IssueId) of + [Issue = #issue{}] -> + {ok, Issue}; + _ -> + undefined + end. + +%% 获取app信息 +-spec get_issues(Filter :: any(), Start :: integer(), Limit :: integer()) -> + {ok, Items :: list(), TotalNum :: integer()} | + {error, Reason :: any()}. +get_issues(Spec, Start, Limit) when is_integer(Limit), is_integer(Start), Start >= 0, Limit > 0 -> + Items = mnesia:dirty_select(?TAB_NAME, [Spec]), + NItems = lists:sublist(Items, Start + 1, Limit), + {ok, NItems, length(Items)}. + +-spec get_user_issues(UserId :: integer(), Start :: integer(), Limit :: integer()) -> + {ok, Items :: [#host{}], Num :: integer()} | + {error, Reason :: any()}. +get_user_issues(UserId, Start, Limit) when is_integer(UserId), is_integer(Limit), is_integer(Start), Start >= 0, Limit > 0 -> + Fun = fun() -> + Q = qlc:q([E || E <- mnesia:table(?TAB_NAME), E#issue.uid =:= UserId]), + qlc:e(Q) + end, + case mnesia:transaction(Fun) of + {atomic, Items} when is_list(Items) -> + {ok, lists:sublist(Items, Start + 1, Limit), length(Items)}; + {aborted, Error} -> + {error, Error} + end. + +-spec add_issue(Issue :: #issue{}) -> ok | {error, Reason :: any()}. +add_issue(Issue = #issue{}) -> + case mnesia:transaction(fun() -> mnesia:write(?TAB_NAME, Issue, write) end) of + {atomic, _} -> + ok; + {aborted, Error} -> + {error, Error} + end. + +-spec change_status(IssueId :: integer(), Status :: integer()) -> ok | {error, Reason :: any()}. +change_status(IssueId, Status) when is_integer(IssueId), is_integer(Status) -> + Fun = fun() -> + case mnesia:read(?TAB_NAME, IssueId) of + [] -> + mnesia:abort(<<"issue not found">>); + [Issue] -> + mnesia:write(?TAB_NAME, Issue#issue{status = Status}, write) + end + end, + case mnesia:transaction(Fun) of + {atomic, ok} -> + ok; + {aborted, Reason} -> + {error, Reason} + end. + +-spec delete(IssueId :: binary()) -> ok | {error, Reason :: any()}. +delete(IssueId) when is_integer(IssueId) -> + case mnesia:transaction(fun() -> mnesia:delete(?TAB_NAME, IssueId, write) end) of + {atomic, ok} -> + ok; + {aborted, Reason} -> + {error, Reason} + end. + +%% 获取app表的数据大小 +table_size() -> + mnesia:table_info(?TAB_NAME, size). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% helper methods +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +to_map(#issue{issue_id = IssueId, name = Name, uid = Uid, deploy_type = DeployType, assoc_id = AssocId, hosts = Hosts, timeout = Timeout, + create_ts = CreateTs, results = Results, status = Status}) -> + + #{ + <<"issue_id">> => IssueId, + <<"name">> => Name, + <<"uid">> => Uid, + <<"deploy_type">> => DeployType, + <<"assoc_id">> => AssocId, + <<"hosts">> => Hosts, + <<"timeout">> => Timeout, + <<"status">> => Status, + <<"create_ts">> => CreateTs, + <<"results">> => Results + }. \ No newline at end of file diff --git a/apps/iot/src/model/router_model.erl b/apps/iot/src/model/router_model.erl new file mode 100644 index 0000000..bc6f190 --- /dev/null +++ b/apps/iot/src/model/router_model.erl @@ -0,0 +1,124 @@ +%%%------------------------------------------------------------------- +%%% @author licheng5 +%%% @copyright (C) 2021, +%%% @doc +%%% +%%% @end +%%% Created : 27. 4月 2021 下午4:38 +%%%------------------------------------------------------------------- +-module(router_model). +-author("licheng5"). +-include("iot.hrl"). +-include_lib("stdlib/include/qlc.hrl"). + +%% API +-export([get_router/1, get_routers/3, add_router/1, change_status/2, delete/1, table_size/0, get_all_routers/0, get_all_valid_routers/0]). +-export([to_map/1]). + +get_router(RouterId) when is_integer(RouterId) -> + case mnesia:dirty_read(router, RouterId) of + [Router = #router{}] -> + {ok, Router}; + _ -> + undefined + end. + +%% 获取app信息 +-spec get_all_routers() -> {ok, Items :: list()} | {error, Reason :: any()}. +get_all_routers() -> + Fun = fun() -> + Q = qlc:q([E || E <- mnesia:table(router)]), + qlc:e(Q) + end, + case mnesia:transaction(Fun) of + {atomic, Routers} -> + {ok, Routers}; + {aborted, _} -> + error + end. + +%% 获取app信息 +-spec get_all_valid_routers() -> {ok, Items :: list()} | {error, Reason :: any()}. +get_all_valid_routers() -> + Fun = fun() -> + Q = qlc:q([E || E <- mnesia:table(router), E#router.status =:= 1]), + qlc:e(Q) + end, + case mnesia:transaction(Fun) of + {atomic, Routers} -> + {ok, Routers}; + {aborted, _} -> + error + end. + +%% 获取app信息 +-spec get_routers(Filter :: any(), Start :: integer(), Limit :: integer()) -> + {ok, Items :: list(), TotalNum :: integer()} | + {error, Reason :: any()}. +get_routers(Spec, Start, Limit) when is_integer(Limit), is_integer(Start), Start >= 0, Limit > 0 -> + Items0 = mnesia:dirty_select(router, [Spec]), + Items = lists:reverse(Items0), + NItems = lists:sublist(Items, Start + 1, Limit), + {ok, NItems, length(Items)}. + +-spec add_router(Router :: #router{}) -> ok | {error, Reason :: any()}. +add_router(Router = #router{}) -> + case mnesia:transaction(fun() -> mnesia:write(router, Router, write) end) of + {atomic, ok} -> + ok; + {aborted, Error} -> + {error, Error} + end. + +-spec change_status(RouterId :: integer(), Status :: integer()) -> ok | {error, Reason :: any()}. +change_status(RouterId, Status) when is_integer(RouterId), is_integer(Status) -> + Fun = fun() -> + case mnesia:read(router, RouterId) of + [] -> + mnesia:abort(<<"router not found">>); + [Router] -> + mnesia:write(host, Router#router{status = Status}, write) + end + end, + case mnesia:transaction(Fun) of + {atomic, ok} -> + ok; + {aborted, Reason} -> + {error, Reason} + end. + +-spec delete(HostId :: integer()) -> ok | {error, Reason :: any()}. +delete(RouterId) when is_integer(RouterId) -> + case mnesia:transaction(fun() -> mnesia:delete(router, RouterId, write) end) of + {atomic, ok} -> + ok; + {aborted, Reason} -> + {error, Reason} + end. + +%% 获取app表的数据大小 +table_size() -> + mnesia:table_info(router, size). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% helper methods +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +to_map(#router{router_id = RouterId, name = Name, status = Status, endpoint = Endpoint}) -> + EndpointInfo = case Endpoint of + undefined -> + #{}; + #http_endpoint{url = Url} -> + #{ + <<"type">> => <<"http">>, + <<"url">> => Url + }; + #kafka_endpoint{} -> + #{} + end, + #{ + <<"router_id">> => RouterId, + <<"name">> => Name, + <<"status">> => Status, + <<"endpoint">> => EndpointInfo + }. \ No newline at end of file diff --git a/apps/iot/src/model/scenario_deploy_model.erl b/apps/iot/src/model/scenario_deploy_model.erl new file mode 100644 index 0000000..f99ea3f --- /dev/null +++ b/apps/iot/src/model/scenario_deploy_model.erl @@ -0,0 +1,97 @@ +%%%------------------------------------------------------------------- +%%% @author licheng5 +%%% @copyright (C) 2021, +%%% @doc +%%% +%%% @end +%%% Created : 27. 4月 2021 下午4:38 +%%%------------------------------------------------------------------- +-module(scenario_deploy_model). +-author("licheng5"). +-include("iot.hrl"). +-include_lib("stdlib/include/qlc.hrl"). + +-define(TAB_NAME, scenario_deploy). + +%% API +-export([get_host_deploy_list/1, get_scenario_deploy_list/1, add_deploy/1, change_status/2, delete/1, table_size/0]). +-export([to_map/1]). + +-spec get_host_deploy_list(HostId :: binary()) -> {ok, List :: [#scenario_deploy{}]} | error. +get_host_deploy_list(HostId) when is_binary(HostId) -> + Fun = fun() -> + Q = qlc:q([E || E <- mnesia:table(?TAB_NAME), E#scenario_deploy.host_id =:= HostId]), + qlc:e(Q) + end, + case mnesia:transaction(Fun) of + {atomic, Items} -> + {ok, Items}; + {aborted, _} -> + error + end. + +-spec get_scenario_deploy_list(ScenarioId :: integer()) -> {ok, List :: [#scenario_deploy{}]} | error. +get_scenario_deploy_list(ScenarioId) when is_integer(ScenarioId) -> + Fun = fun() -> + Q = qlc:q([E || E <- mnesia:table(?TAB_NAME), E#scenario_deploy.scenario_id =:= ScenarioId]), + qlc:e(Q) + end, + case mnesia:transaction(Fun) of + {atomic, Items} -> + {ok, Items}; + {aborted, _} -> + error + end. + +-spec add_deploy(Deploy :: #scenario_deploy{}) -> ok | {error, Reason :: any()}. +add_deploy(Deploy = #scenario_deploy{}) -> + case mnesia:transaction(fun() -> mnesia:write(?TAB_NAME, Deploy, write) end) of + {atomic, _} -> + ok; + {aborted, Error} -> + {error, Error} + end. + +-spec change_status(DeployId :: integer(), Status :: integer()) -> ok | {error, Reason :: any()}. +change_status(DeployId, Status) when is_integer(DeployId), is_integer(Status) -> + Fun = fun() -> + case mnesia:read(?TAB_NAME, DeployId) of + [] -> + mnesia:abort(<<"deploy not found">>); + [Deploy] -> + mnesia:write(?TAB_NAME, Deploy#scenario_deploy{status = Status}, write) + end + end, + case mnesia:transaction(Fun) of + {atomic, ok} -> + ok; + {aborted, Reason} -> + {error, Reason} + end. + +-spec delete(DeployId :: binary()) -> ok | {error, Reason :: any()}. +delete(DeployId) when is_integer(DeployId) -> + case mnesia:transaction(fun() -> mnesia:delete(?TAB_NAME, DeployId, write) end) of + {atomic, ok} -> + ok; + {aborted, Reason} -> + {error, Reason} + end. + +%% 获取app表的数据大小 +table_size() -> + mnesia:table_info(?TAB_NAME, size). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% helper methods +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +to_map(#scenario_deploy{deploy_id = DeployId, scenario_id = ScenarioId, host_id = HostId, create_ts = CreateTs, update_ts = UpdateTs, status = Status}) -> + #{ + <<"deploy_id">> => DeployId, + <<"scenario_id">> => ScenarioId, + <<"host_id">> => HostId, + <<"create_ts">> => CreateTs, + <<"update_ts">> => UpdateTs, + <<"status">> => Status + }. \ No newline at end of file diff --git a/apps/iot/src/model/scenario_model.erl b/apps/iot/src/model/scenario_model.erl new file mode 100644 index 0000000..b3dbcb8 --- /dev/null +++ b/apps/iot/src/model/scenario_model.erl @@ -0,0 +1,87 @@ +%%%------------------------------------------------------------------- +%%% @author licheng5 +%%% @copyright (C) 2021, +%%% @doc +%%% +%%% @end +%%% Created : 27. 4月 2021 下午4:38 +%%%------------------------------------------------------------------- +-module(scenario_model). +-author("licheng5"). +-include("iot.hrl"). +-include_lib("stdlib/include/qlc.hrl"). + +%% API +-export([get_scenario/1, get_scenario_list/3, add_scenario/1, change_status/2, delete/1, table_size/0]). +-export([to_map/1]). + +get_scenario(ScenarioId) when is_integer(ScenarioId) -> + case mnesia:dirty_read(scenario, ScenarioId) of + [Scenario = #scenario{}] -> + {ok, Scenario}; + _ -> + undefined + end. + +%% 获取app信息 +-spec get_scenario_list(Filter :: any(), Start :: integer(), Limit :: integer()) -> + {ok, Items :: list(), TotalNum :: integer()} | + {error, Reason :: any()}. +get_scenario_list(Spec, Start, Limit) when is_integer(Limit), is_integer(Start), Start >= 0, Limit > 0 -> + Items0 = mnesia:dirty_select(scenario, [Spec]), + Items = lists:reverse(Items0), + NItems = lists:sublist(Items, Start + 1, Limit), + {ok, NItems, length(Items)}. + +-spec add_scenario(Scenario :: #scenario{}) -> ok | {error, Reason :: any()}. +add_scenario(Scenario = #scenario{}) -> + case mnesia:transaction(fun() -> mnesia:write(scenario, Scenario, write) end) of + {atomic, _} -> + ok; + {aborted, Error} -> + {error, Error} + end. + +-spec change_status(ScenarioId :: integer(), Status :: integer()) -> ok | {error, Reason :: any()}. +change_status(ScenarioId, Status) when is_integer(ScenarioId), is_integer(Status) -> + Fun = fun() -> + case mnesia:read(scenario, ScenarioId) of + [] -> + mnesia:abort(<<"scenario not found">>); + [Scenario] -> + mnesia:write(scenario, Scenario#scenario{status = Status}, write) + end + end, + case mnesia:transaction(Fun) of + {atomic, ok} -> + ok; + {aborted, Reason} -> + {error, Reason} + end. + +-spec delete(ScenarioId :: binary()) -> ok | {error, Reason :: any()}. +delete(ScenarioId) when is_integer(ScenarioId) -> + case mnesia:transaction(fun() -> mnesia:delete(scenario, ScenarioId, write) end) of + {atomic, ok} -> + ok; + {aborted, Reason} -> + {error, Reason} + end. + +%% 获取app表的数据大小 +table_size() -> + mnesia:table_info(scenario, size). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% helper methods +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +to_map(#scenario{scenario_id = ScenarioId, name = Name, desc = Desc, rule = Rule, update_ts = UpdateTs, status = Status}) -> + #{ + <<"scenario_id">> => ScenarioId, + <<"name">> => Name, + <<"desc">> => Desc, + <<"rule">> => Rule, + <<"update_ts">> => UpdateTs, + <<"status">> => Status + }. \ No newline at end of file diff --git a/config/sys.config b/config/sys.config index 51ea41e..53513b0 100644 --- a/config/sys.config +++ b/config/sys.config @@ -5,7 +5,19 @@ {acceptors, 500}, {max_connections, 10240}, {backlog, 10240} + ]}, + + %% 目标服务器地址 + {emqx_server, [ + {host, ""}, + {port, 18080}, + {tcp_opts, []}, + {username, ""}, + {password, ""}, + {keepalive, 'Keepalive'}, + {retry_interval, 5} ]} + ]}, %% 系统日志配置,系统日志为lager, 支持日志按日期自动分割 diff --git a/docs/R.md b/docs/R.md new file mode 100644 index 0000000..32a21ae --- /dev/null +++ b/docs/R.md @@ -0,0 +1,2 @@ +## +here diff --git a/rebar.config b/rebar.config index 089d748..e04e0d7 100644 --- a/rebar.config +++ b/rebar.config @@ -6,6 +6,7 @@ {cowboy, ".*", {git, "https://github.com/ninenines/cowboy.git", {tag, "2.5.0"}}}, {jiffy, ".*", {git, "https://github.com/davisp/jiffy.git", {tag, "1.1.1"}}}, {parse_trans, ".*", {git, "https://github.com/uwiger/parse_trans", {tag, "3.0.0"}}}, + {emqtt, ".*", {git, "https://github.com/emqx/emqtt", {tag, "v1.2.0"}}}, {lager, ".*", {git,"https://github.com/erlang-lager/lager.git", {tag, "3.9.2"}}} ]}. diff --git a/rebar.lock b/rebar.lock index c0d8370..fc8b7b9 100644 --- a/rebar.lock +++ b/rebar.lock @@ -8,8 +8,17 @@ {git,"https://github.com/ninenines/cowlib", {ref,"106ba84bb04537879d8ce59321a04e0682110b91"}}, 1}, + {<<"emqtt">>, + {git,"https://github.com/emqx/emqtt", + {ref,"55e50041cc5b3416067c120eadb8774f1d3d1f4a"}}, + 0}, {<<"fs">>,{pkg,<<"fs">>,<<"6.1.1">>},1}, + {<<"getopt">>,{pkg,<<"getopt">>,<<"1.0.1">>},1}, {<<"goldrush">>,{pkg,<<"goldrush">>,<<"0.1.9">>},1}, + {<<"gun">>, + {git,"https://github.com/ninenines/gun", + {ref,"e7dd9f227e46979d8073e71c683395a809b78cb4"}}, + 1}, {<<"hackney">>, {git,"https://github.com/benoitc/hackney.git", {ref,"f3e9292db22c807e73f57a8422402d6b423ddf5f"}}, @@ -40,13 +49,14 @@ {<<"ssl_verify_fun">>,{pkg,<<"ssl_verify_fun">>,<<"1.1.6">>},1}, {<<"sync">>, {git,"https://github.com/rustyio/sync.git", - {ref,"3f0049e809ffe303ae2cd395217a025ce6e758ae"}}, + {ref,"aa27b66ccfbfc798b57288af4cf7dc386e205d68"}}, 0}, {<<"unicode_util_compat">>,{pkg,<<"unicode_util_compat">>,<<"0.5.0">>},2}]}. [ {pkg_hash,[ {<<"certifi">>, <<"B7CFEAE9D2ED395695DD8201C57A2D019C0C43ECAF8B8BCB9320B40D6662F340">>}, {<<"fs">>, <<"9D147B944D60CFA48A349F12D06C8EE71128F610C90870BDF9A6773206452ED0">>}, + {<<"getopt">>, <<"C73A9FA687B217F2FF79F68A3B637711BB1936E712B521D8CE466B29CBF7808A">>}, {<<"goldrush">>, <<"F06E5D5F1277DA5C413E84D5A2924174182FB108DABB39D5EC548B27424CD106">>}, {<<"idna">>, <<"1D038FB2E7668CE41FBF681D2C45902E52B3CB9E9C77B55334353B222C2EE50C">>}, {<<"metrics">>, <<"25F094DEA2CDA98213CECC3AEFF09E940299D950904393B2A29D191C346A8486">>}, @@ -56,6 +66,7 @@ {pkg_hash_ext,[ {<<"certifi">>, <<"3B3B5F36493004AC3455966991EAF6E768CE9884693D9968055AEEEB1E575040">>}, {<<"fs">>, <<"EF94E95FFE79916860649FED80AC62B04C322B0BB70F5128144C026B4D171F8B">>}, + {<<"getopt">>, <<"53E1AB83B9CEB65C9672D3E7A35B8092E9BDC9B3EE80721471A161C10C59959C">>}, {<<"goldrush">>, <<"99CB4128CFFCB3227581E5D4D803D5413FA643F4EB96523F77D9E6937D994CEB">>}, {<<"idna">>, <<"A02C8A1C4FD601215BB0B0324C8A6986749F807CE35F25449EC9E69758708122">>}, {<<"metrics">>, <<"69B09ADDDC4F74A40716AE54D140F93BEB0FB8978D8636EADED0C31B6F099F16">>},