From 9f49fa9b5ce70ee961e077005fa838717c302181 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=89=E7=A4=BC=E6=88=90?= Date: Wed, 1 Mar 2023 16:05:26 +0800 Subject: [PATCH 01/24] add rule sup --- apps/iot/src/iot_router.erl | 98 +++++++++++++++++++++++++++++++++ apps/iot/src/iot_router_sup.erl | 61 ++++++++++++++++++++ apps/iot/src/iot_sup.erl | 15 +++-- 3 files changed, 170 insertions(+), 4 deletions(-) create mode 100644 apps/iot/src/iot_router.erl create mode 100644 apps/iot/src/iot_router_sup.erl diff --git a/apps/iot/src/iot_router.erl b/apps/iot/src/iot_router.erl new file mode 100644 index 0000000..c339616 --- /dev/null +++ b/apps/iot/src/iot_router.erl @@ -0,0 +1,98 @@ +%%%------------------------------------------------------------------- +%%% @author licheng5 +%%% @copyright (C) 2023, +%%% @doc +%%% +%%% @end +%%% Created : 01. 3月 2023 16:03 +%%%------------------------------------------------------------------- +-module(iot_router). +-author("licheng5"). + +-behaviour(gen_server). + +%% API +-export([start_link/0]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-define(SERVER, ?MODULE). + +-record(iot_router_state, {}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link() -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #iot_router_state{}} | {ok, State :: #iot_router_state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([]) -> + {ok, #iot_router_state{}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #iot_router_state{}) -> + {reply, Reply :: term(), NewState :: #iot_router_state{}} | + {reply, Reply :: term(), NewState :: #iot_router_state{}, timeout() | hibernate} | + {noreply, NewState :: #iot_router_state{}} | + {noreply, NewState :: #iot_router_state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #iot_router_state{}} | + {stop, Reason :: term(), NewState :: #iot_router_state{}}). +handle_call(_Request, _From, State = #iot_router_state{}) -> + {reply, ok, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #iot_router_state{}) -> + {noreply, NewState :: #iot_router_state{}} | + {noreply, NewState :: #iot_router_state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #iot_router_state{}}). +handle_cast(_Request, State = #iot_router_state{}) -> + {noreply, State}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #iot_router_state{}) -> + {noreply, NewState :: #iot_router_state{}} | + {noreply, NewState :: #iot_router_state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #iot_router_state{}}). +handle_info(_Info, State = #iot_router_state{}) -> + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #iot_router_state{}) -> term()). +terminate(_Reason, _State = #iot_router_state{}) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #iot_router_state{}, + Extra :: term()) -> + {ok, NewState :: #iot_router_state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #iot_router_state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== diff --git a/apps/iot/src/iot_router_sup.erl b/apps/iot/src/iot_router_sup.erl new file mode 100644 index 0000000..56848a4 --- /dev/null +++ b/apps/iot/src/iot_router_sup.erl @@ -0,0 +1,61 @@ +%%%------------------------------------------------------------------- +%%% @author licheng5 +%%% @copyright (C) 2023, +%%% @doc +%%% +%%% @end +%%% Created : 01. 3月 2023 16:01 +%%%------------------------------------------------------------------- +-module(iot_router_sup). +-author("licheng5"). + +-behaviour(supervisor). + +%% API +-export([start_link/0]). + +%% Supervisor callbacks +-export([init/1]). + +-define(SERVER, ?MODULE). + +%%%=================================================================== +%%% API functions +%%%=================================================================== + +%% @doc Starts the supervisor +-spec(start_link() -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link() -> + supervisor:start_link({local, ?SERVER}, ?MODULE, []). + +%%%=================================================================== +%%% Supervisor callbacks +%%%=================================================================== + +%% @private +%% @doc Whenever a supervisor is started using supervisor:start_link/[2,3], +%% this function is called by the new process to find out about +%% restart strategy, maximum restart frequency and child +%% specifications. +-spec(init(Args :: term()) -> + {ok, {SupFlags :: {RestartStrategy :: supervisor:strategy(), + MaxR :: non_neg_integer(), MaxT :: non_neg_integer()}, + [ChildSpec :: supervisor:child_spec()]}} + | ignore | {error, Reason :: term()}). +init([]) -> + SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600}, + + AChild = #{ + id => 'iot_router', + start => {'iot_router', start_link, []}, + restart => permanent, + shutdown => 2000, + type => worker, + modules => ['iot_router'] + }, + + {ok, {SupFlags, [AChild]}}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== diff --git a/apps/iot/src/iot_sup.erl b/apps/iot/src/iot_sup.erl index 2cddf90..3cf175d 100644 --- a/apps/iot/src/iot_sup.erl +++ b/apps/iot/src/iot_sup.erl @@ -26,10 +26,17 @@ start_link() -> %% type => worker(), % optional %% modules => modules()} % optional init([]) -> - SupFlags = #{strategy => one_for_all, - intensity => 0, - period => 1}, - ChildSpecs = [], + SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600}, + ChildSpecs = [ + #{ + id => 'iot_router_sup', + start => {'iot_router_sup', start_link, []}, + restart => permanent, + shutdown => 2000, + type => supervisor, + modules => ['iot_router_sup'] + } + ], {ok, {SupFlags, ChildSpecs}}. %% internal functions From b4a07fe27308f15f217a262dd21fd367859f28cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=89=E7=A4=BC=E6=88=90?= Date: Wed, 1 Mar 2023 17:06:32 +0800 Subject: [PATCH 02/24] add rule --- apps/iot/include/iot.hrl | 7 ++ apps/iot/src/http_router_handler.erl | 74 ++++++++++++++ apps/iot/src/iot_mnesia.erl | 8 ++ apps/iot/src/model/id_generator_model.erl | 22 ++++ apps/iot/src/model/router_model.erl | 117 ++++++++++++++++++++++ 5 files changed, 228 insertions(+) create mode 100644 apps/iot/src/http_router_handler.erl create mode 100644 apps/iot/src/model/id_generator_model.erl create mode 100644 apps/iot/src/model/router_model.erl diff --git a/apps/iot/include/iot.hrl b/apps/iot/include/iot.hrl index 5995dda..75c4f80 100644 --- a/apps/iot/include/iot.hrl +++ b/apps/iot/include/iot.hrl @@ -84,6 +84,12 @@ interfaces = [] }). +%% id生成器 +-record(id_generator, { + name :: string(), + seq = 0 :: integer() +}). + %% 主机定义 -record(host, { %% ID @@ -164,6 +170,7 @@ %% 数据转换规则表 -record(router, { + router_id :: integer(), %% 名称 name = <<>>, %% 数据过滤规则 diff --git a/apps/iot/src/http_router_handler.erl b/apps/iot/src/http_router_handler.erl new file mode 100644 index 0000000..2eece01 --- /dev/null +++ b/apps/iot/src/http_router_handler.erl @@ -0,0 +1,74 @@ +%%%------------------------------------------------------------------- +%%% @author licheng5 +%%% @copyright (C) 2020, +%%% @doc +%%% +%%% @end +%%% Created : 26. 4月 2020 3:36 下午 +%%%------------------------------------------------------------------- +-module(http_router_handler). +-author("licheng5"). +-include("iot.hrl"). + +%% API +-export([handle_request/4]). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% helper methods +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +handle_request(_, "/router/list", Params, PostParams) -> + Page0 = maps:get(<<"page">>, Params, <<"1">>), + Size0 = maps:get(<<"size">>, Params, <<"10">>), + Page = binary_to_integer(Page0), + Size = binary_to_integer(Size0), + + true = Page > 0 andalso Size > 0, + Start = (Page - 1) * Size, + + %% 处理查询条件 + Name = maps:get(<<"name">>, PostParams, <<"">>), + + MatchHead = #router{name = '$1', _ = '_'}, + Guard = [], + Guard1 = case Name =/= <<"">> of + true -> + [{'=:=', '$1', Name}|Guard]; + false -> + Guard + end, + + Result = ['$_'], + + case router_model:get_routers({MatchHead, Guard1, Result}, Start, Size) of + {ok, Routers, TotalNum} -> + Response = #{ + <<"routers">> => lists:map(fun(R) -> router_model:to_map(R) end, Routers), + <<"total_num">> => TotalNum + }, + + {ok, 200, iot_util:json_data(Response)}; + {error, Reason} -> + lager:warning("[host_handler] get a error: ~p", [Reason]), + {ok, 200, iot_util:json_error(404, <<"database error">>)} + end; + +handle_request("GET", "/router/detail", #{<<"router_id">> := RouterId}, _) -> + case router_model:get_router(RouterId) of + undefined -> + {ok, 200, iot_util:json_error(404, <<"router not found">>)}; + {ok, Router} -> + RouterInfo = router_model:to_map(Router), + + {ok, 200, iot_util:json_data(RouterInfo)} + end; + +handle_request("POST", "/router/changer_status", _, #{<<"router_id">> := RouterId, <<"status">> := NStatus}) -> + lager:debug("[router_handler] post params is: ~p", [_Params]), + + case router_model:change_status(RouterId, NStatus) of + ok -> + {ok, 200, iot_util:json_data(<<"success">>)}; + {error, _} -> + {ok, 200, iot_util:json_error(404, <<"error">>)} + end. \ No newline at end of file diff --git a/apps/iot/src/iot_mnesia.erl b/apps/iot/src/iot_mnesia.erl index 408a981..5e88006 100644 --- a/apps/iot/src/iot_mnesia.erl +++ b/apps/iot/src/iot_mnesia.erl @@ -24,6 +24,14 @@ init_database() -> ok = mnesia:start(), %% 创建数据库表 + %% id生成表 + mnesia:create_table(id_generator, [ + {attributes, record_info(fields, id_generator)}, + {record_name, id_generator}, + {disc_copies, [node()]}, + {type, ordered_set} + ]), + %% 主机表 mnesia:create_table(host, [ {attributes, record_info(fields, host)}, diff --git a/apps/iot/src/model/id_generator_model.erl b/apps/iot/src/model/id_generator_model.erl new file mode 100644 index 0000000..2590ec2 --- /dev/null +++ b/apps/iot/src/model/id_generator_model.erl @@ -0,0 +1,22 @@ +%%%------------------------------------------------------------------- +%%% @author licheng5 +%%% @copyright (C) 2021, +%%% @doc +%%% +%%% @end +%%% Created : 27. 4月 2021 下午4:38 +%%%------------------------------------------------------------------- +-module(id_generator_model). +-author("licheng5"). +-include("iot.hrl"). + +%% API +-export([generate/1]). + +generate(Name) when is_list(Name) -> + case mnesia:transaction(fun() -> mnesia:dirty_update_counter(id_generator, Name, 1) end) of + {atomic, Id} -> + {ok, Id}; + {aborted, Reason} -> + {error, Reason} + end. \ No newline at end of file diff --git a/apps/iot/src/model/router_model.erl b/apps/iot/src/model/router_model.erl new file mode 100644 index 0000000..59b3bcc --- /dev/null +++ b/apps/iot/src/model/router_model.erl @@ -0,0 +1,117 @@ +%%%------------------------------------------------------------------- +%%% @author licheng5 +%%% @copyright (C) 2021, +%%% @doc +%%% +%%% @end +%%% Created : 27. 4月 2021 下午4:38 +%%%------------------------------------------------------------------- +-module(router_model). +-author("licheng5"). +-include("iot.hrl"). +-include_lib("stdlib/include/qlc.hrl"). + +%% API +-export([get_router/1, get_routers/3, add_router/1, change_status/2, delete/1, table_size/0]). +-export([to_map/1]). + +get_router(RouterId) when is_integer(RouterId) -> + case mnesia:dirty_read(router, RouterId) of + [Router = #router{}] -> + {ok, Router}; + _ -> + undefined + end. + +%% 获取app信息 +-spec get_routers(Filter :: any(), Start :: integer(), Limit :: integer()) -> + {ok, Items :: list(), TotalNum :: integer()} | + {error, Reason :: any()}. +get_routers(Spec, Start, Limit) when is_integer(Limit), is_integer(Start), Start >= 0, Limit > 0 -> + Items0 = mnesia:dirty_select(router, [Spec]), + Items = lists:reverse(Items0), + NItems = lists:sublist(Items, Start + 1, Limit), + {ok, NItems, length(Items)}. + +-spec add_router(Router :: #router{}) -> ok | {error, Reason :: any()}. +add_router(Router = #router{}) -> + case mnesia:transaction(fun() -> mnesia:write(router, Router, write) end) of + {atomic, ok} -> + ok; + {aborted, Error} -> + {error, Error} + end. + +-spec change_status(RouterId :: integer(), Status :: integer()) -> ok | {error, Reason :: any()}. +change_status(RouterId, Status) when is_integer(RouterId), is_integer(Status) -> + Fun = fun() -> + case mnesia:read(router, RouterId) of + [] -> + mnesia:abort(<<"router not found">>); + [Router] -> + mnesia:write(host, Router#router{status = Status}, write) + end + end, + case mnesia:transaction(Fun) of + {atomic, ok} -> + ok; + {aborted, Reason} -> + {error, Reason} + end. + +-spec delete(HostId :: integer()) -> ok | {error, Reason :: any()}. +delete(RouterId) when is_integer(RouterId) -> + case mnesia:transaction(fun() -> mnesia:delete(router, RouterId, write) end) of + {atomic, ok} -> + ok; + {aborted, Reason} -> + {error, Reason} + end. + +%% 获取app表的数据大小 +table_size() -> + mnesia:table_info(router, size). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% helper methods +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +to_map(#host{host_id = HostId, name = Name, model = Model, cell_id = CellId, activated_ts = ActivatedTs, update_ts = UpdateTs, status = Status, + metric = #host_metric{ + cpus = Cpus, + cpu_temperature = CpuTemperature, + memory = #memory_metric{used = MemoryUsed, total = MemoryTotal}, + disk = #disk_metric{used = DiskUsed, total = DiskTotal}, + interfaces = Interfaces}}) -> + #{ + <<"host_id">> => HostId, + <<"name">> => Name, + <<"model">> => Model, + <<"cell_id">> => CellId, + <<"activated_ts">> => ActivatedTs, + <<"update_ts">> => UpdateTs, + <<"status">> => Status, + <<"metric">> => #{ + <<"cpus">> => lists:map(fun(#cpu_metric{num = Num, load = Load}) -> + #{<<"num">> => Num, <<"load">> => Load } + end, + Cpus), + <<"cpu_temperature">> => CpuTemperature, + <<"memory">> => #{ + <<"used">> => MemoryUsed, + <<"total">> => MemoryTotal + }, + <<"disk">> => #{ + <<"used">> => DiskUsed, + <<"total">> => DiskTotal + }, + <<"interfaces">> => lists:map(fun(#interface_metric{name = IName, desc = IDesc, status = IStatus, detail = IDetail}) -> + #{ + <<"name">> => IName, + <<"desc">> => IDesc, + <<"status">> => IStatus, + <<"detatil">> => IDetail + } + end, Interfaces) + } + }. \ No newline at end of file From e7b96b3cd56753d31b48e85ee6749cdeb48c795b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=89=E7=A4=BC=E6=88=90?= Date: Wed, 1 Mar 2023 17:52:35 +0800 Subject: [PATCH 03/24] fix routers --- apps/iot/src/iot_mock.erl | 40 ++++++++++++++------------------------- 1 file changed, 14 insertions(+), 26 deletions(-) diff --git a/apps/iot/src/iot_mock.erl b/apps/iot/src/iot_mock.erl index 5eb5f3b..738b2f4 100644 --- a/apps/iot/src/iot_mock.erl +++ b/apps/iot/src/iot_mock.erl @@ -11,7 +11,7 @@ -include("iot.hrl"). %% API --export([insert_hosts/0, insert_services/1, insert_terminals/1]). +-export([insert_hosts/0, insert_services/1, insert_terminals/1, insert_routers/1]). -export([rsa_encode/1]). insert_hosts() -> @@ -71,6 +71,18 @@ insert_terminals(HostId) -> terminal_model:add_terminal(Terminal) end, lists:seq(1, 100)). +insert_routers(HostId) -> + lists:foreach(fun(Id0) -> + R = #router{ + router_id = Id0, + name = <<"计费电表"/utf8>>, + rule = <<"测试规则"/utf8>>, + endpoint = #http_endpoint{url = <<"http://127.0.0.1:8080/data">>}, + status = 0 + }, + router_model:add_router(R) + end, lists:seq(1, 100)). + rsa_encode(Data) when is_binary(Data) -> %% 读取相关配置 @@ -106,28 +118,4 @@ rsa_decode(EncData) when is_binary(EncData) -> PlainData = public_key:decrypt_private(EncData, PubKey), lager:debug("plain data is: ~p", [PlainData]), - ok. - -scan_and(Tokens) -> - scan_and(Tokens, [], [], 0). -%% 扫描完成并且最后一个表达式为空 -scan_and([], [], Acc, 0) -> - lists:reverse(Acc); -%% 扫描完成并且最后一个表达式不为空 -scan_and([], Expr, Acc, 0) -> - lists:reverse([{simple, lists:reverse(Expr)}|Acc]); -%% 遇到OR关键词, 并且此时的层级为0 -scan_and([32, $O, $R, 32|Tokens], Expr, Acc, 0) -> - scan_and(Tokens, [], [{simple, lists:reverse(Expr)}|Acc], 0); -%% 扫描到左括号 && Level > 0; 此时的Expr需要更多的字符 -scan_and([Token|Tokens], Expr, Acc, Level) when Token == $( -> - scan_and(Tokens, [Token|Expr], Acc, Level + 1); -%% 扫描到右括号 && Level == 1; 此时的Expr表达式的内部嵌套的子串扫描完成 -scan_and([Token|Tokens], Expr, Acc, 1) when Token == $) -> - scan_and(Tokens, [Token|Expr], Acc, 0); -%% 扫描到右括号 && Level > 1; 此时的Expr表达式的内部嵌套的子串扫描完成,Level的值减1 -scan_and([Token|Tokens], Expr, Acc, Level) when Token == $) -> - scan_and(Tokens, [Token|Expr], Acc, Level - 1); -%% 普通字符 -scan_and([Token|Tokens], Expr, Acc, Level) -> - scan_and(Tokens, [Token|Expr], Acc, Level). \ No newline at end of file + ok. \ No newline at end of file From bf9369df6d67dae6fe2bab3f632c54945a12a1df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=89=E7=A4=BC=E6=88=90?= Date: Wed, 1 Mar 2023 17:56:16 +0800 Subject: [PATCH 04/24] fix router --- apps/iot/src/model/router_model.erl | 49 +++++++++-------------------- 1 file changed, 14 insertions(+), 35 deletions(-) diff --git a/apps/iot/src/model/router_model.erl b/apps/iot/src/model/router_model.erl index 59b3bcc..eb9be11 100644 --- a/apps/iot/src/model/router_model.erl +++ b/apps/iot/src/model/router_model.erl @@ -76,42 +76,21 @@ table_size() -> %% helper methods %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -to_map(#host{host_id = HostId, name = Name, model = Model, cell_id = CellId, activated_ts = ActivatedTs, update_ts = UpdateTs, status = Status, - metric = #host_metric{ - cpus = Cpus, - cpu_temperature = CpuTemperature, - memory = #memory_metric{used = MemoryUsed, total = MemoryTotal}, - disk = #disk_metric{used = DiskUsed, total = DiskTotal}, - interfaces = Interfaces}}) -> +to_map(#router{router_id = RouterId, name = Name, status = Status, endpoint = Endpoint}) -> + EndpointInfo = case Endpoint of + undefined -> + #{}; + #http_endpoint{url = Url} -> + #{ + <<"type">> => <<"http">>, + <<"url">> => Url + }; + #kafka_endpoint{} -> + #{} + end, #{ - <<"host_id">> => HostId, + <<"router_id">> => RouterId, <<"name">> => Name, - <<"model">> => Model, - <<"cell_id">> => CellId, - <<"activated_ts">> => ActivatedTs, - <<"update_ts">> => UpdateTs, <<"status">> => Status, - <<"metric">> => #{ - <<"cpus">> => lists:map(fun(#cpu_metric{num = Num, load = Load}) -> - #{<<"num">> => Num, <<"load">> => Load } - end, - Cpus), - <<"cpu_temperature">> => CpuTemperature, - <<"memory">> => #{ - <<"used">> => MemoryUsed, - <<"total">> => MemoryTotal - }, - <<"disk">> => #{ - <<"used">> => DiskUsed, - <<"total">> => DiskTotal - }, - <<"interfaces">> => lists:map(fun(#interface_metric{name = IName, desc = IDesc, status = IStatus, detail = IDetail}) -> - #{ - <<"name">> => IName, - <<"desc">> => IDesc, - <<"status">> => IStatus, - <<"detatil">> => IDetail - } - end, Interfaces) - } + <<"endpoint">> => EndpointInfo }. \ No newline at end of file From ee9bfc3bc4f755571168ab727e81437beeb067af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=89=E7=A4=BC=E6=88=90?= Date: Wed, 1 Mar 2023 17:56:53 +0800 Subject: [PATCH 05/24] fix router --- apps/iot/src/http_router_handler.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/iot/src/http_router_handler.erl b/apps/iot/src/http_router_handler.erl index 2eece01..e292b11 100644 --- a/apps/iot/src/http_router_handler.erl +++ b/apps/iot/src/http_router_handler.erl @@ -63,8 +63,8 @@ handle_request("GET", "/router/detail", #{<<"router_id">> := RouterId}, _) -> {ok, 200, iot_util:json_data(RouterInfo)} end; -handle_request("POST", "/router/changer_status", _, #{<<"router_id">> := RouterId, <<"status">> := NStatus}) -> - lager:debug("[router_handler] post params is: ~p", [_Params]), +handle_request("POST", "/router/changer_status", _, Params = #{<<"router_id">> := RouterId, <<"status">> := NStatus}) -> + lager:debug("[router_handler] post params is: ~p", [Params]), case router_model:change_status(RouterId, NStatus) of ok -> From fc53ed34b9f2d8d7d368bcd0a51b9ac5abd641c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=89=E7=A4=BC=E6=88=90?= Date: Wed, 1 Mar 2023 19:33:46 +0800 Subject: [PATCH 06/24] fix handler --- .../src/{ => handler}/http_api_handler.erl | 0 .../src/{ => handler}/http_host_handler.erl | 0 .../src/{ => handler}/http_router_handler.erl | 0 .../src/{ => handler}/iot_message_handler.erl | 0 apps/iot/src/iot_app.erl | 5 ++-- apps/iot/src/iot_mnesia.erl | 8 +++++ apps/iot/src/iot_mock.erl | 8 ++--- apps/iot/src/model/router_model.erl | 30 ++++++++++++++++++- 8 files changed, 43 insertions(+), 8 deletions(-) rename apps/iot/src/{ => handler}/http_api_handler.erl (100%) rename apps/iot/src/{ => handler}/http_host_handler.erl (100%) rename apps/iot/src/{ => handler}/http_router_handler.erl (100%) rename apps/iot/src/{ => handler}/iot_message_handler.erl (100%) diff --git a/apps/iot/src/http_api_handler.erl b/apps/iot/src/handler/http_api_handler.erl similarity index 100% rename from apps/iot/src/http_api_handler.erl rename to apps/iot/src/handler/http_api_handler.erl diff --git a/apps/iot/src/http_host_handler.erl b/apps/iot/src/handler/http_host_handler.erl similarity index 100% rename from apps/iot/src/http_host_handler.erl rename to apps/iot/src/handler/http_host_handler.erl diff --git a/apps/iot/src/http_router_handler.erl b/apps/iot/src/handler/http_router_handler.erl similarity index 100% rename from apps/iot/src/http_router_handler.erl rename to apps/iot/src/handler/http_router_handler.erl diff --git a/apps/iot/src/iot_message_handler.erl b/apps/iot/src/handler/iot_message_handler.erl similarity index 100% rename from apps/iot/src/iot_message_handler.erl rename to apps/iot/src/handler/iot_message_handler.erl diff --git a/apps/iot/src/iot_app.erl b/apps/iot/src/iot_app.erl index 1a118a2..94801c6 100644 --- a/apps/iot/src/iot_app.erl +++ b/apps/iot/src/iot_app.erl @@ -35,8 +35,9 @@ start_http_server() -> Dispatcher = cowboy_router:compile([ {'_', [ - {"/host/[...]", http_protocol, [http_host_handler]} - ]} + {"/host/[...]", http_protocol, [http_host_handler]}, + {"/router/[...]", http_protocol, [http_router_handler]} + ]} ]), TransOpts = [ {port, Port}, diff --git a/apps/iot/src/iot_mnesia.erl b/apps/iot/src/iot_mnesia.erl index 5e88006..928b899 100644 --- a/apps/iot/src/iot_mnesia.erl +++ b/apps/iot/src/iot_mnesia.erl @@ -56,6 +56,14 @@ init_database() -> {type, ordered_set} ]), + %% 转发规则表 + mnesia:create_table(router, [ + {attributes, record_info(fields, router)}, + {record_name, router}, + {disc_copies, [node()]}, + {type, ordered_set} + ]), + ok. %% 加入集群 diff --git a/apps/iot/src/iot_mock.erl b/apps/iot/src/iot_mock.erl index 738b2f4..afa7708 100644 --- a/apps/iot/src/iot_mock.erl +++ b/apps/iot/src/iot_mock.erl @@ -11,7 +11,7 @@ -include("iot.hrl"). %% API --export([insert_hosts/0, insert_services/1, insert_terminals/1, insert_routers/1]). +-export([insert_hosts/0, insert_services/1, insert_terminals/1, insert_routers/0]). -export([rsa_encode/1]). insert_hosts() -> @@ -53,8 +53,6 @@ insert_services(HostId) -> insert_terminals(HostId) -> lists:foreach(fun(Id0) -> - Q0 = queue:new(), - Q = queue:in({1234, 21}, Q0), Terminal = #terminal{ terminal_id = integer_to_binary(Id0), host_id = HostId, @@ -71,14 +69,14 @@ insert_terminals(HostId) -> terminal_model:add_terminal(Terminal) end, lists:seq(1, 100)). -insert_routers(HostId) -> +insert_routers() -> lists:foreach(fun(Id0) -> R = #router{ router_id = Id0, name = <<"计费电表"/utf8>>, rule = <<"测试规则"/utf8>>, endpoint = #http_endpoint{url = <<"http://127.0.0.1:8080/data">>}, - status = 0 + status = 1 }, router_model:add_router(R) end, lists:seq(1, 100)). diff --git a/apps/iot/src/model/router_model.erl b/apps/iot/src/model/router_model.erl index eb9be11..bc6f190 100644 --- a/apps/iot/src/model/router_model.erl +++ b/apps/iot/src/model/router_model.erl @@ -12,7 +12,7 @@ -include_lib("stdlib/include/qlc.hrl"). %% API --export([get_router/1, get_routers/3, add_router/1, change_status/2, delete/1, table_size/0]). +-export([get_router/1, get_routers/3, add_router/1, change_status/2, delete/1, table_size/0, get_all_routers/0, get_all_valid_routers/0]). -export([to_map/1]). get_router(RouterId) when is_integer(RouterId) -> @@ -23,6 +23,34 @@ get_router(RouterId) when is_integer(RouterId) -> undefined end. +%% 获取app信息 +-spec get_all_routers() -> {ok, Items :: list()} | {error, Reason :: any()}. +get_all_routers() -> + Fun = fun() -> + Q = qlc:q([E || E <- mnesia:table(router)]), + qlc:e(Q) + end, + case mnesia:transaction(Fun) of + {atomic, Routers} -> + {ok, Routers}; + {aborted, _} -> + error + end. + +%% 获取app信息 +-spec get_all_valid_routers() -> {ok, Items :: list()} | {error, Reason :: any()}. +get_all_valid_routers() -> + Fun = fun() -> + Q = qlc:q([E || E <- mnesia:table(router), E#router.status =:= 1]), + qlc:e(Q) + end, + case mnesia:transaction(Fun) of + {atomic, Routers} -> + {ok, Routers}; + {aborted, _} -> + error + end. + %% 获取app信息 -spec get_routers(Filter :: any(), Start :: integer(), Limit :: integer()) -> {ok, Items :: list(), TotalNum :: integer()} | From 45d41245f95e3198d3ad051124c2665f78a44df3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=89=E7=A4=BC=E6=88=90?= Date: Wed, 1 Mar 2023 20:44:19 +0800 Subject: [PATCH 07/24] fix --- apps/iot/src/iot_rule_parser.erl | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/apps/iot/src/iot_rule_parser.erl b/apps/iot/src/iot_rule_parser.erl index ee94354..d14b6b4 100644 --- a/apps/iot/src/iot_rule_parser.erl +++ b/apps/iot/src/iot_rule_parser.erl @@ -11,6 +11,8 @@ %% API -export([parse/1, test/0]). +-export([parse_bracket/1]). + test() -> %Rule = <<"SELECT * FROM service.data WHERE id > 0 AND (name = 'anlicheng' OR name = 'test')">>, @@ -133,3 +135,29 @@ get_tag(_) -> complex. +%% 括号解析 +parse_bracket(Tokens) -> + parse_bracket(Tokens, [], []). + +parse_bracket([], [], S) -> + lists:reverse(S); +parse_bracket([], Acc, S) -> + lager:debug("acc is: ~p", [Acc]), + lists:reverse([lists:reverse(Acc)|S]); +parse_bracket([$(|Tokens], Acc, S) -> + {Child, RestTokens} = parse_bracket0(Tokens, []), + %lager:debug("child is: ~p, rest: ~p, s1: ~p", [Child, RestTokens, [Child, lists:reverse(Acc)|S]]), + parse_bracket(RestTokens, [], [Child, lists:reverse(Acc)|S]); +parse_bracket([H|Tokens], Acc, S) -> + parse_bracket(Tokens, [H|Acc], S). + +parse_bracket0([$(|Tokens], Acc) -> + Child = parse_bracket(Tokens), + +parse_bracket0([$)|Tokens], Acc) -> + Child = lists:reverse(Acc), + {Child, Tokens}; +parse_bracket0([H|Tokens], Acc) -> + parse_bracket0(Tokens, [H|Acc]). + + From a4aa4b52680494fe4b909919ac16a0edae4e761a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=89=E7=A4=BC=E6=88=90?= Date: Fri, 3 Mar 2023 11:19:55 +0800 Subject: [PATCH 08/24] fix rule --- apps/iot/src/iot_rule_parser.erl | 56 ++++++++++++++++++++++++-------- 1 file changed, 43 insertions(+), 13 deletions(-) diff --git a/apps/iot/src/iot_rule_parser.erl b/apps/iot/src/iot_rule_parser.erl index d14b6b4..cc9cf42 100644 --- a/apps/iot/src/iot_rule_parser.erl +++ b/apps/iot/src/iot_rule_parser.erl @@ -134,30 +134,60 @@ get_tag(0) -> get_tag(_) -> complex. +-record(bracket, { + items = [] +}). %% 括号解析 parse_bracket(Tokens) -> parse_bracket(Tokens, [], []). parse_bracket([], [], S) -> - lists:reverse(S); + #bracket{items = lists:reverse(S)}; parse_bracket([], Acc, S) -> - lager:debug("acc is: ~p", [Acc]), - lists:reverse([lists:reverse(Acc)|S]); + #bracket{items = lists:reverse([lists:reverse(Acc)|S])}; parse_bracket([$(|Tokens], Acc, S) -> - {Child, RestTokens} = parse_bracket0(Tokens, []), - %lager:debug("child is: ~p, rest: ~p, s1: ~p", [Child, RestTokens, [Child, lists:reverse(Acc)|S]]), - parse_bracket(RestTokens, [], [Child, lists:reverse(Acc)|S]); + {ChildElements, RestTokens} = parse_bracket0(Tokens), + Bracket = parse_bracket(ChildElements), + parse_bracket(RestTokens, [], [Bracket, lists:reverse(Acc)|S]); parse_bracket([H|Tokens], Acc, S) -> parse_bracket(Tokens, [H|Acc], S). -parse_bracket0([$(|Tokens], Acc) -> - Child = parse_bracket(Tokens), +%% 截取配对的括号 +parse_bracket0(Tokens) -> + parse_bracket0(Tokens, [], 1). +parse_bracket0([$)|Tokens], Acc, Level) when Level - 1 == 0 -> + {lists:reverse(Acc), Tokens}; +parse_bracket0([$)|Tokens], Acc, Level) -> + parse_bracket0(Tokens, [$)|Acc], Level - 1); +parse_bracket0([$(|Tokens], Acc, Level) -> + parse_bracket0(Tokens, [$(|Acc], Level + 1); +parse_bracket0([H|Tokens], Acc, Level) -> + parse_bracket0(Tokens, [H|Acc], Level). + +%% 处理运算符号解析出表达式和操作符号 +%{bracket,["id < 3 OR ", +% {bracket,["name = 'anlicheng' OR ", +% {bracket,["name = 'test' AND ", +% {bracket,["y = 1 OR x = 1"]}]}]}, +% " OR ", +% {bracket,["1 = 1"]}]} + +syntax(Bracket = #bracket{items = Items}) -> + lists:map(fun(E) -> + case E of + B = #bracket{} -> + syntax(B); + Expr -> + string:split(Expr, "AND", all) + + end + end, Items), + + + + ok. + -parse_bracket0([$)|Tokens], Acc) -> - Child = lists:reverse(Acc), - {Child, Tokens}; -parse_bracket0([H|Tokens], Acc) -> - parse_bracket0(Tokens, [H|Acc]). From b276134de1e47d83847b9efeb7cc23aafb7f0901 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=89=E7=A4=BC=E6=88=90?= Date: Fri, 3 Mar 2023 11:37:41 +0800 Subject: [PATCH 09/24] fix router sup --- apps/iot/src/iot_router.erl | 73 ++++++++++++++++++--------------- apps/iot/src/iot_router_sup.erl | 41 +++++++++++++----- 2 files changed, 71 insertions(+), 43 deletions(-) diff --git a/apps/iot/src/iot_router.erl b/apps/iot/src/iot_router.erl index c339616..d295bbc 100644 --- a/apps/iot/src/iot_router.erl +++ b/apps/iot/src/iot_router.erl @@ -8,29 +8,35 @@ %%%------------------------------------------------------------------- -module(iot_router). -author("licheng5"). +-include("iot.hrl"). -behaviour(gen_server). %% API --export([start_link/0]). +-export([start_link/2]). +-export([get_name/1]). %% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -define(SERVER, ?MODULE). --record(iot_router_state, {}). +-record(state, { + +}). + +get_name(Id) when is_integer(Id) -> + list_to_atom("iot_router:" ++ integer_to_list(Id)). %%%=================================================================== %%% API %%%=================================================================== %% @doc Spawns the server and registers the local name (unique) --spec(start_link() -> +-spec(start_link(Name :: atom(), Router :: #router{}) -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +start_link(Name, Router = #router{}) -> + gen_server:start_link({global, Name}, ?MODULE, [Router], []). %%%=================================================================== %%% gen_server callbacks @@ -39,40 +45,41 @@ start_link() -> %% @private %% @doc Initializes the server -spec(init(Args :: term()) -> - {ok, State :: #iot_router_state{}} | {ok, State :: #iot_router_state{}, timeout() | hibernate} | + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). -init([]) -> - {ok, #iot_router_state{}}. +init([Router]) -> + lager:debug("router is: ~p", [Router]), + {ok, #state{}}. %% @private %% @doc Handling call messages -spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, - State :: #iot_router_state{}) -> - {reply, Reply :: term(), NewState :: #iot_router_state{}} | - {reply, Reply :: term(), NewState :: #iot_router_state{}, timeout() | hibernate} | - {noreply, NewState :: #iot_router_state{}} | - {noreply, NewState :: #iot_router_state{}, timeout() | hibernate} | - {stop, Reason :: term(), Reply :: term(), NewState :: #iot_router_state{}} | - {stop, Reason :: term(), NewState :: #iot_router_state{}}). -handle_call(_Request, _From, State = #iot_router_state{}) -> + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(_Request, _From, State = #state{}) -> {reply, ok, State}. %% @private %% @doc Handling cast messages --spec(handle_cast(Request :: term(), State :: #iot_router_state{}) -> - {noreply, NewState :: #iot_router_state{}} | - {noreply, NewState :: #iot_router_state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #iot_router_state{}}). -handle_cast(_Request, State = #iot_router_state{}) -> +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast(_Request, State = #state{}) -> {noreply, State}. %% @private %% @doc Handling all non call/cast messages --spec(handle_info(Info :: timeout() | term(), State :: #iot_router_state{}) -> - {noreply, NewState :: #iot_router_state{}} | - {noreply, NewState :: #iot_router_state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #iot_router_state{}}). -handle_info(_Info, State = #iot_router_state{}) -> +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info(_Info, State = #state{}) -> {noreply, State}. %% @private @@ -81,16 +88,16 @@ handle_info(_Info, State = #iot_router_state{}) -> %% necessary cleaning up. When it returns, the gen_server terminates %% with Reason. The return value is ignored. -spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), - State :: #iot_router_state{}) -> term()). -terminate(_Reason, _State = #iot_router_state{}) -> + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> ok. %% @private %% @doc Convert process state when code is changed --spec(code_change(OldVsn :: term() | {down, term()}, State :: #iot_router_state{}, +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, Extra :: term()) -> - {ok, NewState :: #iot_router_state{}} | {error, Reason :: term()}). -code_change(_OldVsn, State = #iot_router_state{}, _Extra) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> {ok, State}. %%%=================================================================== diff --git a/apps/iot/src/iot_router_sup.erl b/apps/iot/src/iot_router_sup.erl index 56848a4..e369e8b 100644 --- a/apps/iot/src/iot_router_sup.erl +++ b/apps/iot/src/iot_router_sup.erl @@ -8,11 +8,12 @@ %%%------------------------------------------------------------------- -module(iot_router_sup). -author("licheng5"). +-include("iot.hrl"). -behaviour(supervisor). %% API --export([start_link/0]). +-export([start_link/0, start_new_router/1]). %% Supervisor callbacks -export([init/1]). @@ -45,17 +46,37 @@ start_link() -> init([]) -> SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600}, - AChild = #{ - id => 'iot_router', - start => {'iot_router', start_link, []}, - restart => permanent, - shutdown => 2000, - type => worker, - modules => ['iot_router'] - }, + %% 启动目前生效的全部转发规则 + {ok, Routers} = router_model:get_all_valid_routers(), + Specs = lists:map(fun generate_router_spec/1, Routers), - {ok, {SupFlags, [AChild]}}. + {ok, {SupFlags, Specs}}. %%%=================================================================== %%% Internal functions %%%=================================================================== + +%% 动态启动一个转发规则 +-spec start_new_router(Router :: #router{}) -> {ok, Pid :: pid()} | {error, Reason :: any()}. +start_new_router(Router = #router{}) -> + Spec = generate_router_spec(Router), + case supervisor:start_child(?MODULE, Spec) of + {ok, Pid} -> + {ok, Pid}; + {error, {already_started, Pid}} -> + {ok, Pid}; + {error, Error} -> + lager:debug("start router get a error: ~p", [Error]), + {error, Error} + end. + +generate_router_spec(Router = #router{router_id = RouterId}) -> + Id = iot_router:get_name(RouterId), + #{ + id => Id, + start => {'iot_router', start_link, [Id, Router]}, + restart => permanent, + shutdown => 2000, + type => worker, + modules => ['iot_router'] + }. \ No newline at end of file From 486d870b42499e0a41fe6525740ec3d665e368f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=89=E7=A4=BC=E6=88=90?= Date: Fri, 3 Mar 2023 11:59:19 +0800 Subject: [PATCH 10/24] fix http client --- apps/iot/src/handler/http_api_handler.erl | 3 +- apps/iot/src/iot.app.src | 3 ++ apps/iot/src/iot_app.erl | 1 + apps/iot/src/iot_http_client.erl | 40 +++++++++++++++++++++++ apps/iot/src/iot_mock.erl | 11 +++++++ apps/iot/src/iot_router_sup.erl | 8 +++-- 6 files changed, 63 insertions(+), 3 deletions(-) create mode 100644 apps/iot/src/iot_http_client.erl diff --git a/apps/iot/src/handler/http_api_handler.erl b/apps/iot/src/handler/http_api_handler.erl index 7af6d71..24c3e96 100644 --- a/apps/iot/src/handler/http_api_handler.erl +++ b/apps/iot/src/handler/http_api_handler.erl @@ -18,5 +18,6 @@ handle_request("GET", "/api/booking", _, _Params) -> {ok, 200, iot_util:json_data(<<"success">>)}; -handle_request("POST", "/api/booking", _, _Params) -> +handle_request("POST", _, _, Params) -> + lager:debug("body is: ~p", [Params]), {ok, 200, iot_util:json_data(<<"success">>)}. \ No newline at end of file diff --git a/apps/iot/src/iot.app.src b/apps/iot/src/iot.app.src index 5df4b41..9d1ffb3 100644 --- a/apps/iot/src/iot.app.src +++ b/apps/iot/src/iot.app.src @@ -13,8 +13,11 @@ parse_trans, hackney, poolboy, + mnesia, crypto, + public_key, + ssl, kernel, stdlib ]}, diff --git a/apps/iot/src/iot_app.erl b/apps/iot/src/iot_app.erl index 94801c6..07414f3 100644 --- a/apps/iot/src/iot_app.erl +++ b/apps/iot/src/iot_app.erl @@ -36,6 +36,7 @@ start_http_server() -> Dispatcher = cowboy_router:compile([ {'_', [ {"/host/[...]", http_protocol, [http_host_handler]}, + {"/api/[...]", http_protocol, [http_api_handler]}, {"/router/[...]", http_protocol, [http_router_handler]} ]} ]), diff --git a/apps/iot/src/iot_http_client.erl b/apps/iot/src/iot_http_client.erl new file mode 100644 index 0000000..f1f73f6 --- /dev/null +++ b/apps/iot/src/iot_http_client.erl @@ -0,0 +1,40 @@ +%%%------------------------------------------------------------------- +%%% @author licheng5 +%%% @copyright (C) 2023, +%%% @doc +%%% +%%% @end +%%% Created : 03. 3月 2023 11:48 +%%%------------------------------------------------------------------- +-module(iot_http_client). +-author("licheng5"). + +%% API +-export([post/2]). + +post(Url, Body) when is_list(Url), is_binary(Body) -> + case hackney:request(post, Url, [], Body) of + {ok, 200, _, ClientRef} -> + case hackney:body(ClientRef) of + {ok, RespBody} -> + lager:debug("[iot_http_client] url: ~p, response is: ~p", [Url, RespBody]), + ok; + {error, Reason} -> + lager:warning("[iot_http_client] url: ~p, get error: ~p", [Url, Reason]), + {error, failed} + end; + + {ok, HttpCode, _, ClientRef} -> + case hackney:body(ClientRef) of + {ok, RespBody} -> + lager:debug("[iot_http_client] url: ~p, http_code: ~p, response is: ~p", [Url, HttpCode, RespBody]), + ok; + {error, Reason} -> + lager:warning("[iot_http_client] url: ~p, http_code: ~p, get error: ~p", [Url, HttpCode, Reason]), + {error, failed} + end; + + {error, Reason} -> + lager:warning("[iot_http_client] url: ~p, get error: ~p", [Url, Reason]), + {error, failed} + end. \ No newline at end of file diff --git a/apps/iot/src/iot_mock.erl b/apps/iot/src/iot_mock.erl index afa7708..8eb16e3 100644 --- a/apps/iot/src/iot_mock.erl +++ b/apps/iot/src/iot_mock.erl @@ -12,6 +12,7 @@ %% API -export([insert_hosts/0, insert_services/1, insert_terminals/1, insert_routers/0]). +-export([start_router/1]). -export([rsa_encode/1]). insert_hosts() -> @@ -81,6 +82,16 @@ insert_routers() -> router_model:add_router(R) end, lists:seq(1, 100)). +start_router(Id0) when is_integer(Id0) -> + R = #router{ + router_id = Id0, + name = <<"计费电表"/utf8>>, + rule = <<"测试规则"/utf8>>, + endpoint = #http_endpoint{url = <<"http://127.0.0.1:8080/data">>}, + status = 1 + }, + router_model:add_router(R), + iot_router_sup:start_new_router(R). rsa_encode(Data) when is_binary(Data) -> %% 读取相关配置 diff --git a/apps/iot/src/iot_router_sup.erl b/apps/iot/src/iot_router_sup.erl index e369e8b..3f85710 100644 --- a/apps/iot/src/iot_router_sup.erl +++ b/apps/iot/src/iot_router_sup.erl @@ -47,8 +47,12 @@ init([]) -> SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600}, %% 启动目前生效的全部转发规则 - {ok, Routers} = router_model:get_all_valid_routers(), - Specs = lists:map(fun generate_router_spec/1, Routers), + Specs = case router_model:get_all_valid_routers() of + {ok, Routers} -> + lists:map(fun generate_router_spec/1, Routers); + error -> + [] + end, {ok, {SupFlags, Specs}}. From 9ecebfde0c27bf6d56403393add15f85063421b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=89=E7=A4=BC=E6=88=90?= Date: Fri, 3 Mar 2023 14:24:56 +0800 Subject: [PATCH 11/24] fix --- apps/iot/src/iot_app.erl | 3 +++ apps/iot/src/iot_util.erl | 10 ++++++++-- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/apps/iot/src/iot_app.erl b/apps/iot/src/iot_app.erl index 07414f3..a99c2d2 100644 --- a/apps/iot/src/iot_app.erl +++ b/apps/iot/src/iot_app.erl @@ -13,6 +13,9 @@ start(_StartType, _StartArgs) -> io:setopts([{encoding, unicode}]), %% 启动数据库 mnesia:start(), + Tables = mnesia:system_info(tables), + iot_util:assert_exec(lists:member(router, [Tables]), fun() -> mnesia:wait_for_tables([router], infinity) end), + %% 加速内存的回收 erlang:system_flag(fullsweep_after, 16), diff --git a/apps/iot/src/iot_util.erl b/apps/iot/src/iot_util.erl index ec01914..ecd9b0e 100644 --- a/apps/iot/src/iot_util.erl +++ b/apps/iot/src/iot_util.erl @@ -13,7 +13,7 @@ -export([timestamp/0, number_format/2, current_time/0]). -export([step/3, chunks/2, rand_bytes/1, uuid/0]). -export([json_data/1, json_error/2]). --export([queue_limited_in/3]). +-export([queue_limited_in/3, assert_exec/2]). %% 时间,精确到毫秒 timestamp() -> @@ -80,4 +80,10 @@ queue_limited_in(Item, Q, Num) when is_integer(Num) -> queue:in(Item, Q1); false -> queue:in(Item, Q) - end. \ No newline at end of file + end. + +-spec assert_exec(boolean(), any()) -> any(). +assert_exec(true, Fun) when is_function(Fun, 0) -> + {ok, Fun()}; +assert_exec(_, _) -> + aborted. \ No newline at end of file From 29a9f46340a4c0c83ba23f5f95a3d2927e93fb28 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=89=E7=A4=BC=E6=88=90?= Date: Mon, 6 Mar 2023 14:26:12 +0800 Subject: [PATCH 12/24] rename --- apps/iot/src/{handler => http_handler}/http_api_handler.erl | 0 apps/iot/src/{handler => http_handler}/http_host_handler.erl | 0 apps/iot/src/{handler => http_handler}/http_router_handler.erl | 0 apps/iot/src/{handler => mqtt_handler}/iot_message_handler.erl | 0 4 files changed, 0 insertions(+), 0 deletions(-) rename apps/iot/src/{handler => http_handler}/http_api_handler.erl (100%) rename apps/iot/src/{handler => http_handler}/http_host_handler.erl (100%) rename apps/iot/src/{handler => http_handler}/http_router_handler.erl (100%) rename apps/iot/src/{handler => mqtt_handler}/iot_message_handler.erl (100%) diff --git a/apps/iot/src/handler/http_api_handler.erl b/apps/iot/src/http_handler/http_api_handler.erl similarity index 100% rename from apps/iot/src/handler/http_api_handler.erl rename to apps/iot/src/http_handler/http_api_handler.erl diff --git a/apps/iot/src/handler/http_host_handler.erl b/apps/iot/src/http_handler/http_host_handler.erl similarity index 100% rename from apps/iot/src/handler/http_host_handler.erl rename to apps/iot/src/http_handler/http_host_handler.erl diff --git a/apps/iot/src/handler/http_router_handler.erl b/apps/iot/src/http_handler/http_router_handler.erl similarity index 100% rename from apps/iot/src/handler/http_router_handler.erl rename to apps/iot/src/http_handler/http_router_handler.erl diff --git a/apps/iot/src/handler/iot_message_handler.erl b/apps/iot/src/mqtt_handler/iot_message_handler.erl similarity index 100% rename from apps/iot/src/handler/iot_message_handler.erl rename to apps/iot/src/mqtt_handler/iot_message_handler.erl From 9e32c0b196ff6e56a2820adafb4d855f234e56b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=89=E7=A4=BC=E6=88=90?= Date: Mon, 6 Mar 2023 15:29:24 +0800 Subject: [PATCH 13/24] fix --- apps/iot/include/iot.hrl | 2 + .../iot/src/http_handler/http_iot_handler.erl | 59 +++++++++++++++++++ apps/iot/src/model/host_model.erl | 32 +++++++++- .../src/mqtt_handler/iot_message_handler.erl | 2 +- 4 files changed, 93 insertions(+), 2 deletions(-) create mode 100644 apps/iot/src/http_handler/http_iot_handler.erl diff --git a/apps/iot/include/iot.hrl b/apps/iot/include/iot.hrl index 75c4f80..9374ba4 100644 --- a/apps/iot/include/iot.hrl +++ b/apps/iot/include/iot.hrl @@ -100,6 +100,8 @@ model :: binary(), %% 单元网格编号 cell_id :: integer(), + %% rsa公钥 + public_rsa :: binary(), %% aes的key, 后续通讯需要基于这个加密 aes :: binary(), metric = #host_metric{}, diff --git a/apps/iot/src/http_handler/http_iot_handler.erl b/apps/iot/src/http_handler/http_iot_handler.erl new file mode 100644 index 0000000..c4de7a4 --- /dev/null +++ b/apps/iot/src/http_handler/http_iot_handler.erl @@ -0,0 +1,59 @@ +%%%------------------------------------------------------------------- +%%% @author licheng5 +%%% @copyright (C) 2023, +%%% @doc +%%% +%%% @end +%%% Created : 06. 3月 2023 14:29 +%%%------------------------------------------------------------------- +-module(http_iot_handler). +-author("licheng5"). +-include("iot.hrl"). + +%% API +-export([handle_request/4]). + +handle_request("GET", "/api/booking", _, _Params) -> + {ok, 200, iot_util:json_data(<<"success">>)}; + +%% 处理命令下发 +handle_request("POST", "/iot/send_command", _, Params = #{<<"host_id">> := HostId}) -> + lager:debug("body is: ~p", [Params]), + + case host_model:activate(HostId) of + {error, Reason} when is_binary(Reason) -> + {ok, 200, iot_util:json_error(404, Reason)}; + {ok, #host{aes = Aes, public_rsa = PubKey}} -> + Reply = #{ + <<"a">> => true, + <<"aes">> => Aes, + <<"reply">> => <<"client.reply.", HostId/binary>> + }, + EncReply = iot_cipher_rsa:encode(Reply, PubKey), + + %% TODO 发送消息到终端 + lager:debug("enc_reply is: ~p", [EncReply]), + + {ok, 200, iot_util:json_data(<<"success">>)} + end; + +%% 处理主机的授权 +handle_request("POST", "/iot/host_auth", _, Params = #{<<"host_id">> := HostId}) -> + lager:debug("body is: ~p", [Params]), + + case host_model:activate(HostId) of + {error, Reason} when is_binary(Reason) -> + {ok, 200, iot_util:json_error(404, Reason)}; + {ok, #host{aes = Aes, public_rsa = PubKey}} -> + Reply = #{ + <<"a">> => true, + <<"aes">> => Aes, + <<"reply">> => <<"client.reply.", HostId/binary>> + }, + EncReply = iot_cipher_rsa:encode(Reply, PubKey), + + %% TODO 发送消息到终端 + lager:debug("enc_reply is: ~p", [EncReply]), + + {ok, 200, iot_util:json_data(<<"success">>)} + end. \ No newline at end of file diff --git a/apps/iot/src/model/host_model.erl b/apps/iot/src/model/host_model.erl index bad464b..9c940d1 100644 --- a/apps/iot/src/model/host_model.erl +++ b/apps/iot/src/model/host_model.erl @@ -12,7 +12,7 @@ -include_lib("stdlib/include/qlc.hrl"). %% API --export([get_host/1, get_hosts/3, get_stat/0, add_host/1, change_status/2, delete/1, table_size/0, find_hosts/3]). +-export([get_host/1, get_hosts/3, get_stat/0, add_host/1, change_status/2, delete/1, table_size/0, find_hosts/3, activate/1]). -export([to_map/1]). get_host(HostId) when is_binary(HostId) -> @@ -97,6 +97,36 @@ change_status(HostId, Status) when is_binary(HostId), is_integer(Status) -> {error, Reason} end. +-spec activate(HostId :: binary()) -> ok | {error, Reason}. +activate(HostId) when is_binary(HostId) -> + Fun = fun() -> + case mnesia:read(host, HostId) of + [] -> + mnesia:abort(<<"host not found">>); + [Host = #host{status = Status}] -> + case Status =:= ?HOST_STATUS_INACTIVE of + true -> + Aes = iot_util:rand_bytes(16), + NHost = Host#host{ + aes = Aes, + activated_ts = iot_util:current_time(), + update_ts = iot_util:current_time(), + status = ?HOST_STATUS_ONLINE + }, + ok = mnesia:write(host, NHost, write), + NHost; + false -> + mnesia:abort(<<"host status invalid">>) + end + end + end, + case mnesia:transaction(Fun) of + {atomic, Host} -> + {ok, Host}; + {aborted, Reason} -> + {error, Reason} + end. + -spec delete(HostId :: binary()) -> ok | {error, Reason :: any()}. delete(HostId) when is_binary(HostId) -> case mnesia:transaction(fun() -> mnesia:delete(host, HostId, write) end) of diff --git a/apps/iot/src/mqtt_handler/iot_message_handler.erl b/apps/iot/src/mqtt_handler/iot_message_handler.erl index 7c01562..25e2449 100644 --- a/apps/iot/src/mqtt_handler/iot_message_handler.erl +++ b/apps/iot/src/mqtt_handler/iot_message_handler.erl @@ -81,7 +81,7 @@ handle(<<"server.register">>, Msg = #{<<"c_id">> := ClientId, <<"r">> := PubKey, handle(<<"server.data">>, #{<<"c_id">> := HostId, <<"d">> := Data}) -> case host_model:get_host(HostId) of {ok, #host{aes = Aes}} -> - Services = service_model:get_services(HostId), + Services = service_model:get_host_services(HostId), PlainData = iot_cipher_aes:decrypt(Aes, Aes, Data), case jiffy:decode(PlainData, [return_maps]) of Infos when is_list(Infos) -> From f593aa00de022b75fe35ce4e345f740f210c950c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=89=E7=A4=BC=E6=88=90?= Date: Mon, 6 Mar 2023 16:05:34 +0800 Subject: [PATCH 14/24] fix --- apps/iot/src/iot_emqtt_client.erl | 119 ++++++++++++++++++++++++++++++ apps/iot/src/iot_sup.erl | 9 +++ config/sys.config | 12 +++ rebar.config | 1 + rebar.lock | 65 ---------------- 5 files changed, 141 insertions(+), 65 deletions(-) create mode 100644 apps/iot/src/iot_emqtt_client.erl delete mode 100644 rebar.lock diff --git a/apps/iot/src/iot_emqtt_client.erl b/apps/iot/src/iot_emqtt_client.erl new file mode 100644 index 0000000..30667bf --- /dev/null +++ b/apps/iot/src/iot_emqtt_client.erl @@ -0,0 +1,119 @@ +%%%------------------------------------------------------------------- +%%% @author licheng5 +%%% @copyright (C) 2023, +%%% @doc +%%% +%%% @end +%%% Created : 06. 3月 2023 15:42 +%%%------------------------------------------------------------------- +-module(iot_emqtt_client). +-author("licheng5"). + +-behaviour(gen_server). + +%% API +-export([start_link/0]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-export([publish/3]). + +-define(SERVER, ?MODULE). + +-record(state, { + conn_pid :: pid() +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +publish(Topic, Message, Qos) when is_binary(Topic), is_binary(Message), is_integer(Qos), Qos == 0; Qos == 1 -> + gen_server:call(?SERVER, {publish, Topic, Message, Qos}). + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link() -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([]) -> + {ok, ServerOpts} = application:get_env(iot, emqx_server), + {ok, ConnPid} = emqtt:start_link([{clientid, iot_emqtt_client}, {owner, self()}|ServerOpts]), + {ok, _Props} = emqtt:connect(ConnPid), + + SubOpts = [{qos, 1}], + {ok, _Props, _ReasonCodes} = emqtt:subscribe(ConnPid, #{}, [{<<"hello">>, SubOpts}]), + + {ok, #state{}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call({publish, Topic, Message, Qos}, _From, State = #state{conn_pid = ConnPid}) -> + if + Qos == 0 -> + ok = emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, 0}]); + Qos == 1 -> + {ok, _PktId} = emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, 1}]); + true -> + ok + end, + {reply, ok, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast(_Request, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info(_Info, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== diff --git a/apps/iot/src/iot_sup.erl b/apps/iot/src/iot_sup.erl index 3cf175d..924e656 100644 --- a/apps/iot/src/iot_sup.erl +++ b/apps/iot/src/iot_sup.erl @@ -35,6 +35,15 @@ init([]) -> shutdown => 2000, type => supervisor, modules => ['iot_router_sup'] + }, + + #{ + id => 'iot_emqtt_client', + start => {'iot_emqtt_client', start_link, []}, + restart => permanent, + shutdown => 2000, + type => worker, + modules => ['iot_emqtt_client'] } ], {ok, {SupFlags, ChildSpecs}}. diff --git a/config/sys.config b/config/sys.config index 51ea41e..53513b0 100644 --- a/config/sys.config +++ b/config/sys.config @@ -5,7 +5,19 @@ {acceptors, 500}, {max_connections, 10240}, {backlog, 10240} + ]}, + + %% 目标服务器地址 + {emqx_server, [ + {host, ""}, + {port, 18080}, + {tcp_opts, []}, + {username, ""}, + {password, ""}, + {keepalive, 'Keepalive'}, + {retry_interval, 5} ]} + ]}, %% 系统日志配置,系统日志为lager, 支持日志按日期自动分割 diff --git a/rebar.config b/rebar.config index 089d748..e04e0d7 100644 --- a/rebar.config +++ b/rebar.config @@ -6,6 +6,7 @@ {cowboy, ".*", {git, "https://github.com/ninenines/cowboy.git", {tag, "2.5.0"}}}, {jiffy, ".*", {git, "https://github.com/davisp/jiffy.git", {tag, "1.1.1"}}}, {parse_trans, ".*", {git, "https://github.com/uwiger/parse_trans", {tag, "3.0.0"}}}, + {emqtt, ".*", {git, "https://github.com/emqx/emqtt", {tag, "v1.2.0"}}}, {lager, ".*", {git,"https://github.com/erlang-lager/lager.git", {tag, "3.9.2"}}} ]}. diff --git a/rebar.lock b/rebar.lock deleted file mode 100644 index c0d8370..0000000 --- a/rebar.lock +++ /dev/null @@ -1,65 +0,0 @@ -{"1.2.0", -[{<<"certifi">>,{pkg,<<"certifi">>,<<"2.5.2">>},1}, - {<<"cowboy">>, - {git,"https://github.com/ninenines/cowboy.git", - {ref,"c998673eb009da2ea4dc0e6ef0332534cf679cc4"}}, - 0}, - {<<"cowlib">>, - {git,"https://github.com/ninenines/cowlib", - {ref,"106ba84bb04537879d8ce59321a04e0682110b91"}}, - 1}, - {<<"fs">>,{pkg,<<"fs">>,<<"6.1.1">>},1}, - {<<"goldrush">>,{pkg,<<"goldrush">>,<<"0.1.9">>},1}, - {<<"hackney">>, - {git,"https://github.com/benoitc/hackney.git", - {ref,"f3e9292db22c807e73f57a8422402d6b423ddf5f"}}, - 0}, - {<<"idna">>,{pkg,<<"idna">>,<<"6.0.1">>},1}, - {<<"jiffy">>, - {git,"https://github.com/davisp/jiffy.git", - {ref,"9ea1b35b6e60ba21dfd4adbd18e7916a831fd7d4"}}, - 0}, - {<<"lager">>, - {git,"https://github.com/erlang-lager/lager.git", - {ref,"459a3b2cdd9eadd29e5a7ce5c43932f5ccd6eb88"}}, - 0}, - {<<"metrics">>,{pkg,<<"metrics">>,<<"1.0.1">>},1}, - {<<"mimerl">>,{pkg,<<"mimerl">>,<<"1.2.0">>},1}, - {<<"parse_trans">>, - {git,"https://github.com/uwiger/parse_trans", - {ref,"6f3645afb43c7c57d61b54ef59aecab288ce1013"}}, - 0}, - {<<"poolboy">>, - {git,"https://github.com/devinus/poolboy.git", - {ref,"3bb48a893ff5598f7c73731ac17545206d259fac"}}, - 0}, - {<<"ranch">>, - {git,"https://github.com/ninenines/ranch", - {ref,"9b8ed47d789412b0021bfc1f94f1c17c387c721c"}}, - 1}, - {<<"ssl_verify_fun">>,{pkg,<<"ssl_verify_fun">>,<<"1.1.6">>},1}, - {<<"sync">>, - {git,"https://github.com/rustyio/sync.git", - {ref,"3f0049e809ffe303ae2cd395217a025ce6e758ae"}}, - 0}, - {<<"unicode_util_compat">>,{pkg,<<"unicode_util_compat">>,<<"0.5.0">>},2}]}. -[ -{pkg_hash,[ - {<<"certifi">>, <<"B7CFEAE9D2ED395695DD8201C57A2D019C0C43ECAF8B8BCB9320B40D6662F340">>}, - {<<"fs">>, <<"9D147B944D60CFA48A349F12D06C8EE71128F610C90870BDF9A6773206452ED0">>}, - {<<"goldrush">>, <<"F06E5D5F1277DA5C413E84D5A2924174182FB108DABB39D5EC548B27424CD106">>}, - {<<"idna">>, <<"1D038FB2E7668CE41FBF681D2C45902E52B3CB9E9C77B55334353B222C2EE50C">>}, - {<<"metrics">>, <<"25F094DEA2CDA98213CECC3AEFF09E940299D950904393B2A29D191C346A8486">>}, - {<<"mimerl">>, <<"67E2D3F571088D5CFD3E550C383094B47159F3EEE8FFA08E64106CDF5E981BE3">>}, - {<<"ssl_verify_fun">>, <<"CF344F5692C82D2CD7554F5EC8FD961548D4FD09E7D22F5B62482E5AEAEBD4B0">>}, - {<<"unicode_util_compat">>, <<"8516502659002CEC19E244EBD90D312183064BE95025A319A6C7E89F4BCCD65B">>}]}, -{pkg_hash_ext,[ - {<<"certifi">>, <<"3B3B5F36493004AC3455966991EAF6E768CE9884693D9968055AEEEB1E575040">>}, - {<<"fs">>, <<"EF94E95FFE79916860649FED80AC62B04C322B0BB70F5128144C026B4D171F8B">>}, - {<<"goldrush">>, <<"99CB4128CFFCB3227581E5D4D803D5413FA643F4EB96523F77D9E6937D994CEB">>}, - {<<"idna">>, <<"A02C8A1C4FD601215BB0B0324C8A6986749F807CE35F25449EC9E69758708122">>}, - {<<"metrics">>, <<"69B09ADDDC4F74A40716AE54D140F93BEB0FB8978D8636EADED0C31B6F099F16">>}, - {<<"mimerl">>, <<"F278585650AA581986264638EBF698F8BB19DF297F66AD91B18910DFC6E19323">>}, - {<<"ssl_verify_fun">>, <<"BDB0D2471F453C88FF3908E7686F86F9BE327D065CC1EC16FA4540197EA04680">>}, - {<<"unicode_util_compat">>, <<"D48D002E15F5CC105A696CF2F1BBB3FC72B4B770A184D8420C8DB20DA2674B38">>}]} -]. From 059fc6b53f8fd46b7e7b3d70acd0cf070a5b7de8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=89=E7=A4=BC=E6=88=90?= Date: Mon, 6 Mar 2023 16:39:45 +0800 Subject: [PATCH 15/24] fix --- apps/iot/include/iot.hrl | 4 + .../iot/src/http_handler/http_iot_handler.erl | 104 ++++++++++++++++++ ...ndler.erl => iot_mqtt_message_handler.erl} | 26 ++++- rebar.lock | 76 +++++++++++++ 4 files changed, 208 insertions(+), 2 deletions(-) rename apps/iot/src/mqtt_handler/{iot_message_handler.erl => iot_mqtt_message_handler.erl} (85%) create mode 100644 rebar.lock diff --git a/apps/iot/include/iot.hrl b/apps/iot/include/iot.hrl index 9374ba4..f9c1b4d 100644 --- a/apps/iot/include/iot.hrl +++ b/apps/iot/include/iot.hrl @@ -30,6 +30,8 @@ -record(memory_metric, { %% 使用量 used = 0, + %% 空余 + free = 0, %% 总量 total = 0 }). @@ -38,6 +40,8 @@ -record(disk_metric, { %% 使用量 used = 0, + %% 空余 + free = 0, %% 总量 total = 0 }). diff --git a/apps/iot/src/http_handler/http_iot_handler.erl b/apps/iot/src/http_handler/http_iot_handler.erl index c4de7a4..316d576 100644 --- a/apps/iot/src/http_handler/http_iot_handler.erl +++ b/apps/iot/src/http_handler/http_iot_handler.erl @@ -16,6 +16,110 @@ handle_request("GET", "/api/booking", _, _Params) -> {ok, 200, iot_util:json_data(<<"success">>)}; +%% 下发参数 +handle_request("POST", "/iot/send_params", _, PostParams = #{<<"host_id">> := HostId, <<"service_name">> := ServiceName, <<"params">> := Params}) -> + lager:debug("body is: ~p", [PostParams]), + + case host_model:activate(HostId) of + {error, Reason} when is_binary(Reason) -> + {ok, 200, iot_util:json_error(404, Reason)}; + {ok, #host{aes = Aes}} -> + Reply = #{ + <<"t">> => 1, + <<"b">> => #{ + <<"t_id">> => iot_util:uuid() , + <<"to">> => ServiceName, + <<"t">> => 10, + <<"m">> => Params + } + }, + EncMsg = iot_cipher_aes:encrypt(Aes, Aes, Reply), + + iot_emqtt_client:publish(<<"clients.cmd.", HostId/binary>>, EncMsg, 1), + lager:debug("enc_reply is: ~p", [EncMsg]), + + {ok, 200, iot_util:json_data(<<"success">>)} + end; + +%% 下发采集项 +handle_request("POST", "/iot/send_metrics", _, + PostParams = #{<<"host_id">> := HostId, <<"service_name">> := ServiceName, <<"metrics">> := Metrics}) -> + + lager:debug("body is: ~p", [PostParams]), + + case host_model:activate(HostId) of + {error, Reason} when is_binary(Reason) -> + {ok, 200, iot_util:json_error(404, Reason)}; + {ok, #host{aes = Aes}} -> + Reply = #{ + <<"t">> => 2, + <<"b">> => #{ + <<"t_id">> => iot_util:uuid() , + <<"to">> => ServiceName, + <<"t">> => 10, + <<"m">> => Metrics + } + }, + EncMsg = iot_cipher_aes:encrypt(Aes, Aes, Reply), + + iot_emqtt_client:publish(<<"clients.cmd.", HostId/binary>>, EncMsg, 1), + lager:debug("enc_reply is: ~p", [EncMsg]), + + {ok, 200, iot_util:json_data(<<"success">>)} + end; + +%% 下发微服务 +handle_request("POST", "/iot/send_mirco_service", _, + PostParams = #{<<"host_id">> := HostId, <<"args">> := Args}) -> + + lager:debug("body is: ~p", [PostParams]), + + case host_model:activate(HostId) of + {error, Reason} when is_binary(Reason) -> + {ok, 200, iot_util:json_error(404, Reason)}; + {ok, #host{aes = Aes}} -> + Reply = #{ + <<"t">> => 3, + <<"b">> => #{ + <<"t_id">> => iot_util:uuid() , + <<"t">> => 10, + <<"m">> => Args + } + }, + EncMsg = iot_cipher_aes:encrypt(Aes, Aes, Reply), + + iot_emqtt_client:publish(<<"clients.cmd.", HostId/binary>>, EncMsg, 1), + lager:debug("enc_reply is: ~p", [EncMsg]), + + {ok, 200, iot_util:json_data(<<"success">>)} + end; + +%% 下发数据流图 +handle_request("POST", "/iot/send_data_flow", _, + PostParams = #{<<"host_id">> := HostId, <<"args">> := Args}) -> + + lager:debug("body is: ~p", [PostParams]), + + case host_model:activate(HostId) of + {error, Reason} when is_binary(Reason) -> + {ok, 200, iot_util:json_error(404, Reason)}; + {ok, #host{aes = Aes}} -> + Reply = #{ + <<"t">> => 4, + <<"b">> => #{ + <<"t_id">> => iot_util:uuid() , + <<"t">> => 10, + <<"m">> => Args + } + }, + EncMsg = iot_cipher_aes:encrypt(Aes, Aes, Reply), + + iot_emqtt_client:publish(<<"clients.cmd.", HostId/binary>>, EncMsg, 1), + lager:debug("enc_reply is: ~p", [EncMsg]), + + {ok, 200, iot_util:json_data(<<"success">>)} + end; + %% 处理命令下发 handle_request("POST", "/iot/send_command", _, Params = #{<<"host_id">> := HostId}) -> lager:debug("body is: ~p", [Params]), diff --git a/apps/iot/src/mqtt_handler/iot_message_handler.erl b/apps/iot/src/mqtt_handler/iot_mqtt_message_handler.erl similarity index 85% rename from apps/iot/src/mqtt_handler/iot_message_handler.erl rename to apps/iot/src/mqtt_handler/iot_mqtt_message_handler.erl index 25e2449..4a3702c 100644 --- a/apps/iot/src/mqtt_handler/iot_message_handler.erl +++ b/apps/iot/src/mqtt_handler/iot_mqtt_message_handler.erl @@ -6,7 +6,7 @@ %%% @end %%% Created : 18. 2月 2023 21:39 %%%------------------------------------------------------------------- --module(iot_message_handler). +-module(iot_mqtt_message_handler). -author("aresei"). -include("iot.hrl"). @@ -107,7 +107,29 @@ handle(<<"server.data">>, #{<<"c_id">> := HostId, <<"d">> := Data}) -> end; undefined -> lager:warning("[iot_message_handler] host_id: ~p, not exists", [HostId]) -end. +end; + +%% 处理服务器的ping +handle(<<"server.ping">>, #{<<"c">> := HostId, <<"at">> := At, + <<"h">> := #{<<"fm">> := FreeMemory, <<"fd">> := FreeDisk, <<"cp">> := CpuLoad}}) -> + + case host_model:get_host(HostId) of + {ok, Host=#host{metric = Metric = #host_metric{disk = Disk, memory = Memory}}} -> + NMetric = Metric#host_metric{ + disk = Disk#disk_metric{free = FreeDisk}, + memory = Memory#memory_metric{free = FreeMemory} + }, + + case mnesia:transaction(fun() -> mnesia:write(host, Host#host{metric = NMetric}, write) end) of + {atomic, ok} -> + ok; + {error, Reason} -> + lager:warning("[iot_message_handler] host_id: ~p, ping get error: ~p", [HostId, Reason]) + end; + + undefined -> + lager:warning("[iot_message_handler] host_id: ~p, not exists", [HostId]) + end. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% helper methods diff --git a/rebar.lock b/rebar.lock new file mode 100644 index 0000000..06e78cf --- /dev/null +++ b/rebar.lock @@ -0,0 +1,76 @@ +{"1.2.0", +[{<<"certifi">>,{pkg,<<"certifi">>,<<"2.5.2">>},1}, + {<<"cowboy">>, + {git,"https://github.com/ninenines/cowboy.git", + {ref,"c998673eb009da2ea4dc0e6ef0332534cf679cc4"}}, + 0}, + {<<"cowlib">>, + {git,"https://github.com/ninenines/cowlib", + {ref,"106ba84bb04537879d8ce59321a04e0682110b91"}}, + 1}, + {<<"emqtt">>, + {git,"https://github.com/emqx/emqtt", + {ref,"55e50041cc5b3416067c120eadb8774f1d3d1f4a"}}, + 0}, + {<<"fs">>,{pkg,<<"fs">>,<<"6.1.1">>},1}, + {<<"getopt">>,{pkg,<<"getopt">>,<<"1.0.1">>},1}, + {<<"goldrush">>,{pkg,<<"goldrush">>,<<"0.1.9">>},1}, + {<<"gun">>, + {git,"https://github.com/ninenines/gun", + {ref,"e7dd9f227e46979d8073e71c683395a809b78cb4"}}, + 1}, + {<<"hackney">>, + {git,"https://github.com/benoitc/hackney.git", + {ref,"f3e9292db22c807e73f57a8422402d6b423ddf5f"}}, + 0}, + {<<"idna">>,{pkg,<<"idna">>,<<"6.0.1">>},1}, + {<<"jiffy">>, + {git,"https://github.com/davisp/jiffy.git", + {ref,"9ea1b35b6e60ba21dfd4adbd18e7916a831fd7d4"}}, + 0}, + {<<"lager">>, + {git,"https://github.com/erlang-lager/lager.git", + {ref,"459a3b2cdd9eadd29e5a7ce5c43932f5ccd6eb88"}}, + 0}, + {<<"metrics">>,{pkg,<<"metrics">>,<<"1.0.1">>},1}, + {<<"mimerl">>,{pkg,<<"mimerl">>,<<"1.2.0">>},1}, + {<<"parse_trans">>, + {git,"https://github.com/uwiger/parse_trans", + {ref,"6f3645afb43c7c57d61b54ef59aecab288ce1013"}}, + 0}, + {<<"poolboy">>, + {git,"https://github.com/devinus/poolboy.git", + {ref,"3bb48a893ff5598f7c73731ac17545206d259fac"}}, + 0}, + {<<"ranch">>, + {git,"https://github.com/ninenines/ranch", + {ref,"6bbc8431d513d9bbed7817bc1bcb3b17ef26cb35"}}, + 1}, + {<<"ssl_verify_fun">>,{pkg,<<"ssl_verify_fun">>,<<"1.1.6">>},1}, + {<<"sync">>, + {git,"https://github.com/rustyio/sync.git", + {ref,"aa27b66ccfbfc798b57288af4cf7dc386e205d68"}}, + 0}, + {<<"unicode_util_compat">>,{pkg,<<"unicode_util_compat">>,<<"0.5.0">>},2}]}. +[ +{pkg_hash,[ + {<<"certifi">>, <<"B7CFEAE9D2ED395695DD8201C57A2D019C0C43ECAF8B8BCB9320B40D6662F340">>}, + {<<"fs">>, <<"9D147B944D60CFA48A349F12D06C8EE71128F610C90870BDF9A6773206452ED0">>}, + {<<"getopt">>, <<"C73A9FA687B217F2FF79F68A3B637711BB1936E712B521D8CE466B29CBF7808A">>}, + {<<"goldrush">>, <<"F06E5D5F1277DA5C413E84D5A2924174182FB108DABB39D5EC548B27424CD106">>}, + {<<"idna">>, <<"1D038FB2E7668CE41FBF681D2C45902E52B3CB9E9C77B55334353B222C2EE50C">>}, + {<<"metrics">>, <<"25F094DEA2CDA98213CECC3AEFF09E940299D950904393B2A29D191C346A8486">>}, + {<<"mimerl">>, <<"67E2D3F571088D5CFD3E550C383094B47159F3EEE8FFA08E64106CDF5E981BE3">>}, + {<<"ssl_verify_fun">>, <<"CF344F5692C82D2CD7554F5EC8FD961548D4FD09E7D22F5B62482E5AEAEBD4B0">>}, + {<<"unicode_util_compat">>, <<"8516502659002CEC19E244EBD90D312183064BE95025A319A6C7E89F4BCCD65B">>}]}, +{pkg_hash_ext,[ + {<<"certifi">>, <<"3B3B5F36493004AC3455966991EAF6E768CE9884693D9968055AEEEB1E575040">>}, + {<<"fs">>, <<"EF94E95FFE79916860649FED80AC62B04C322B0BB70F5128144C026B4D171F8B">>}, + {<<"getopt">>, <<"53E1AB83B9CEB65C9672D3E7A35B8092E9BDC9B3EE80721471A161C10C59959C">>}, + {<<"goldrush">>, <<"99CB4128CFFCB3227581E5D4D803D5413FA643F4EB96523F77D9E6937D994CEB">>}, + {<<"idna">>, <<"A02C8A1C4FD601215BB0B0324C8A6986749F807CE35F25449EC9E69758708122">>}, + {<<"metrics">>, <<"69B09ADDDC4F74A40716AE54D140F93BEB0FB8978D8636EADED0C31B6F099F16">>}, + {<<"mimerl">>, <<"F278585650AA581986264638EBF698F8BB19DF297F66AD91B18910DFC6E19323">>}, + {<<"ssl_verify_fun">>, <<"BDB0D2471F453C88FF3908E7686F86F9BE327D065CC1EC16FA4540197EA04680">>}, + {<<"unicode_util_compat">>, <<"D48D002E15F5CC105A696CF2F1BBB3FC72B4B770A184D8420C8DB20DA2674B38">>}]} +]. From d44889aff274ff3e65f666b14fcd68673d0b74c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=89=E7=A4=BC=E6=88=90?= Date: Mon, 6 Mar 2023 16:40:13 +0800 Subject: [PATCH 16/24] fix --- apps/iot/src/{mqtt_handler => }/iot_mqtt_message_handler.erl | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename apps/iot/src/{mqtt_handler => }/iot_mqtt_message_handler.erl (100%) diff --git a/apps/iot/src/mqtt_handler/iot_mqtt_message_handler.erl b/apps/iot/src/iot_mqtt_message_handler.erl similarity index 100% rename from apps/iot/src/mqtt_handler/iot_mqtt_message_handler.erl rename to apps/iot/src/iot_mqtt_message_handler.erl From 02cf888c32d9f0cef84f7eba4ec5b8dfd3731fce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=89=E7=A4=BC=E6=88=90?= Date: Mon, 6 Mar 2023 16:52:05 +0800 Subject: [PATCH 17/24] fix --- apps/iot/src/iot_app.erl | 3 ++- apps/iot/src/iot_emqtt_client.erl | 10 +++++----- apps/iot/src/model/host_model.erl | 2 +- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/apps/iot/src/iot_app.erl b/apps/iot/src/iot_app.erl index a99c2d2..a647c10 100644 --- a/apps/iot/src/iot_app.erl +++ b/apps/iot/src/iot_app.erl @@ -8,6 +8,7 @@ -behaviour(application). -export([start/2, stop/1]). +-export([start_http_server/0]). start(_StartType, _StartArgs) -> io:setopts([{encoding, unicode}]), @@ -19,7 +20,7 @@ start(_StartType, _StartArgs) -> %% 加速内存的回收 erlang:system_flag(fullsweep_after, 16), - start_http_server(), + %start_http_server(), iot_sup:start_link(). diff --git a/apps/iot/src/iot_emqtt_client.erl b/apps/iot/src/iot_emqtt_client.erl index 30667bf..60a53f6 100644 --- a/apps/iot/src/iot_emqtt_client.erl +++ b/apps/iot/src/iot_emqtt_client.erl @@ -48,12 +48,12 @@ start_link() -> {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). init([]) -> - {ok, ServerOpts} = application:get_env(iot, emqx_server), - {ok, ConnPid} = emqtt:start_link([{clientid, iot_emqtt_client}, {owner, self()}|ServerOpts]), - {ok, _Props} = emqtt:connect(ConnPid), + %{ok, ServerOpts} = application:get_env(iot, emqx_server), + %{ok, ConnPid} = emqtt:start_link([{clientid, iot_emqtt_client}, {owner, self()}|ServerOpts]), + %{ok, _Props} = emqtt:connect(ConnPid), - SubOpts = [{qos, 1}], - {ok, _Props, _ReasonCodes} = emqtt:subscribe(ConnPid, #{}, [{<<"hello">>, SubOpts}]), + %SubOpts = [{qos, 1}], + %{ok, _Props, _ReasonCodes} = emqtt:subscribe(ConnPid, #{}, [{<<"hello">>, SubOpts}]), {ok, #state{}}. diff --git a/apps/iot/src/model/host_model.erl b/apps/iot/src/model/host_model.erl index 9c940d1..284e281 100644 --- a/apps/iot/src/model/host_model.erl +++ b/apps/iot/src/model/host_model.erl @@ -97,7 +97,7 @@ change_status(HostId, Status) when is_binary(HostId), is_integer(Status) -> {error, Reason} end. --spec activate(HostId :: binary()) -> ok | {error, Reason}. +-spec activate(HostId :: binary()) -> ok | {error, Reason :: any()}. activate(HostId) when is_binary(HostId) -> Fun = fun() -> case mnesia:read(host, HostId) of From a81c64291ab7a8cb2c360c2026520dbe942008ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=89=E7=A4=BC=E6=88=90?= Date: Fri, 10 Mar 2023 14:18:10 +0800 Subject: [PATCH 18/24] http server --- apps/iot/src/iot_app.erl | 2 +- rebar.lock | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/iot/src/iot_app.erl b/apps/iot/src/iot_app.erl index a647c10..c771355 100644 --- a/apps/iot/src/iot_app.erl +++ b/apps/iot/src/iot_app.erl @@ -20,7 +20,7 @@ start(_StartType, _StartArgs) -> %% 加速内存的回收 erlang:system_flag(fullsweep_after, 16), - %start_http_server(), + start_http_server(), iot_sup:start_link(). diff --git a/rebar.lock b/rebar.lock index 06e78cf..fc8b7b9 100644 --- a/rebar.lock +++ b/rebar.lock @@ -44,7 +44,7 @@ 0}, {<<"ranch">>, {git,"https://github.com/ninenines/ranch", - {ref,"6bbc8431d513d9bbed7817bc1bcb3b17ef26cb35"}}, + {ref,"9b8ed47d789412b0021bfc1f94f1c17c387c721c"}}, 1}, {<<"ssl_verify_fun">>,{pkg,<<"ssl_verify_fun">>,<<"1.1.6">>},1}, {<<"sync">>, From a425947db40503788016472b8eca2f4615710884 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=89=E7=A4=BC=E6=88=90?= Date: Fri, 10 Mar 2023 14:55:53 +0800 Subject: [PATCH 19/24] add model --- apps/iot/include/iot.hrl | 32 ++++++ .../src/http_handler/http_host_handler.erl | 1 - .../http_handler/http_scenario_handler.erl | 88 +++++++++++++++++ apps/iot/src/model/scenario_deploy_model.erl | 97 +++++++++++++++++++ apps/iot/src/model/scenario_model.erl | 87 +++++++++++++++++ 5 files changed, 304 insertions(+), 1 deletion(-) create mode 100644 apps/iot/src/http_handler/http_scenario_handler.erl create mode 100644 apps/iot/src/model/scenario_deploy_model.erl create mode 100644 apps/iot/src/model/scenario_model.erl diff --git a/apps/iot/include/iot.hrl b/apps/iot/include/iot.hrl index f9c1b4d..d607d14 100644 --- a/apps/iot/include/iot.hrl +++ b/apps/iot/include/iot.hrl @@ -166,6 +166,38 @@ status = 0 }). +%% scenario 应用场景 +-record(scenario, { + %% 场景id,自增id + scenario_id :: integer(), + %% 名称 + name :: binary(), + %% 场景描述 + desc :: binary(), + %% 场景规则数据 + rule :: binary(), + %% 最后更新时间 + update_ts = 0 :: integer(), + %% 状态 + status :: integer() +}). + +%% 场景部署关系表 +-record(scenario_deploy, { + %% id,自增id + deploy_id :: integer(), + %% 场景id + scenario_id :: integer(), + %% 主机id + host_id :: binary(), + %% 创建时间 + create_ts = 0 :: integer(), + %% 更新时间 + update_ts = 0 :: integer(), + %% 状态 + status = 0:: integer() +}). + -record(http_endpoint, { url = <<>> }). diff --git a/apps/iot/src/http_handler/http_host_handler.erl b/apps/iot/src/http_handler/http_host_handler.erl index 75bdeaf..dcbe936 100644 --- a/apps/iot/src/http_handler/http_host_handler.erl +++ b/apps/iot/src/http_handler/http_host_handler.erl @@ -72,7 +72,6 @@ handle_request(_, "/host/list", Params, PostParams) -> handle_request("GET", "/host/detail", #{<<"id">> := HostId}, _) -> lager:debug("[host_handler] detail id is: ~p", [HostId]), - timer:tc(), case host_model:get_host(HostId) of undefined -> {ok, 200, iot_util:json_error(404, <<"host not found">>)}; diff --git a/apps/iot/src/http_handler/http_scenario_handler.erl b/apps/iot/src/http_handler/http_scenario_handler.erl new file mode 100644 index 0000000..ab7b741 --- /dev/null +++ b/apps/iot/src/http_handler/http_scenario_handler.erl @@ -0,0 +1,88 @@ +%%%------------------------------------------------------------------- +%%% @author licheng5 +%%% @copyright (C) 2020, +%%% @doc +%%% +%%% @end +%%% Created : 26. 4月 2020 3:36 下午 +%%%------------------------------------------------------------------- +-module(http_scenario_handler). +-author("licheng5"). +-include("iot.hrl"). + +%% API +-export([handle_request/4]). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% helper methods +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +handle_request(_, "/scenario/list", Params, PostParams) -> + Page0 = maps:get(<<"page">>, Params, <<"1">>), + Size0 = maps:get(<<"size">>, Params, <<"10">>), + Page = binary_to_integer(Page0), + Size = binary_to_integer(Size0), + + true = Page > 0 andalso Size > 0, + Start = (Page - 1) * Size, + + %% 处理查询条件 + Name = maps:get(<<"name">>, PostParams, <<"">>), + + MatchHead = #scenario{name = '$1', _ = '_'}, + Guard = [], + Guard1 = case Model =/= <<"">> of + true -> + [{'=:=', '$1', Model}|Guard]; + false -> + Guard + end, + + Guard2 = case CellId1 > 0 of + true -> + [{'=:=', '$2', CellId1}|Guard1]; + false -> + Guard1 + end, + + Result = ['$_'], + + case host_model:get_hosts({MatchHead, Guard2, Result}, Start, Size) of + {ok, Hosts, TotalNum} -> + Response = #{ + <<"hosts">> => lists:map(fun(Host) -> host_model:to_map(Host) end, Hosts), + <<"stat">> => host_model:get_stat(), + <<"total_num">> => TotalNum + }, + + lager:debug("resp is: ~p", [Response]), + + {ok, 200, iot_util:json_data(Response)}; + {error, Reason} -> + lager:warning("[host_handler] get a error: ~p", [Reason]), + {ok, 200, iot_util:json_error(404, <<"database error">>)} + end; + +handle_request("GET", "/host/detail", #{<<"id">> := HostId}, _) -> + lager:debug("[host_handler] detail id is: ~p", [HostId]), + case host_model:get_host(HostId) of + undefined -> + {ok, 200, iot_util:json_error(404, <<"host not found">>)}; + {ok, Host} -> + HostInfo = host_model:to_map(Host), + %% 获取终端信息 + {ok, Terminals0} = terminal_model:get_host_terminals(HostId), + Terminals = lists:map(fun(E) -> terminal_model:to_map(E) end, Terminals0), + HostInfo1 = maps:put(<<"terminals">>, Terminals, HostInfo), + %% 获取微服务信息 + {ok, Services0} = service_model:get_host_services(HostId), + Services = lists:map(fun(S) -> service_model:to_map(S) end, Services0), + HostInfo2 = maps:put(<<"services">>, Services, HostInfo1), + + {ok, 200, iot_util:json_data(HostInfo2)} + end; + +handle_request("POST", "/host/update", _, _Params) -> + lager:debug("[host_handler] post params is: ~p", [_Params]), + + {ok, 200, iot_util:json_data(<<"success">>)}. \ No newline at end of file diff --git a/apps/iot/src/model/scenario_deploy_model.erl b/apps/iot/src/model/scenario_deploy_model.erl new file mode 100644 index 0000000..f99ea3f --- /dev/null +++ b/apps/iot/src/model/scenario_deploy_model.erl @@ -0,0 +1,97 @@ +%%%------------------------------------------------------------------- +%%% @author licheng5 +%%% @copyright (C) 2021, +%%% @doc +%%% +%%% @end +%%% Created : 27. 4月 2021 下午4:38 +%%%------------------------------------------------------------------- +-module(scenario_deploy_model). +-author("licheng5"). +-include("iot.hrl"). +-include_lib("stdlib/include/qlc.hrl"). + +-define(TAB_NAME, scenario_deploy). + +%% API +-export([get_host_deploy_list/1, get_scenario_deploy_list/1, add_deploy/1, change_status/2, delete/1, table_size/0]). +-export([to_map/1]). + +-spec get_host_deploy_list(HostId :: binary()) -> {ok, List :: [#scenario_deploy{}]} | error. +get_host_deploy_list(HostId) when is_binary(HostId) -> + Fun = fun() -> + Q = qlc:q([E || E <- mnesia:table(?TAB_NAME), E#scenario_deploy.host_id =:= HostId]), + qlc:e(Q) + end, + case mnesia:transaction(Fun) of + {atomic, Items} -> + {ok, Items}; + {aborted, _} -> + error + end. + +-spec get_scenario_deploy_list(ScenarioId :: integer()) -> {ok, List :: [#scenario_deploy{}]} | error. +get_scenario_deploy_list(ScenarioId) when is_integer(ScenarioId) -> + Fun = fun() -> + Q = qlc:q([E || E <- mnesia:table(?TAB_NAME), E#scenario_deploy.scenario_id =:= ScenarioId]), + qlc:e(Q) + end, + case mnesia:transaction(Fun) of + {atomic, Items} -> + {ok, Items}; + {aborted, _} -> + error + end. + +-spec add_deploy(Deploy :: #scenario_deploy{}) -> ok | {error, Reason :: any()}. +add_deploy(Deploy = #scenario_deploy{}) -> + case mnesia:transaction(fun() -> mnesia:write(?TAB_NAME, Deploy, write) end) of + {atomic, _} -> + ok; + {aborted, Error} -> + {error, Error} + end. + +-spec change_status(DeployId :: integer(), Status :: integer()) -> ok | {error, Reason :: any()}. +change_status(DeployId, Status) when is_integer(DeployId), is_integer(Status) -> + Fun = fun() -> + case mnesia:read(?TAB_NAME, DeployId) of + [] -> + mnesia:abort(<<"deploy not found">>); + [Deploy] -> + mnesia:write(?TAB_NAME, Deploy#scenario_deploy{status = Status}, write) + end + end, + case mnesia:transaction(Fun) of + {atomic, ok} -> + ok; + {aborted, Reason} -> + {error, Reason} + end. + +-spec delete(DeployId :: binary()) -> ok | {error, Reason :: any()}. +delete(DeployId) when is_integer(DeployId) -> + case mnesia:transaction(fun() -> mnesia:delete(?TAB_NAME, DeployId, write) end) of + {atomic, ok} -> + ok; + {aborted, Reason} -> + {error, Reason} + end. + +%% 获取app表的数据大小 +table_size() -> + mnesia:table_info(?TAB_NAME, size). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% helper methods +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +to_map(#scenario_deploy{deploy_id = DeployId, scenario_id = ScenarioId, host_id = HostId, create_ts = CreateTs, update_ts = UpdateTs, status = Status}) -> + #{ + <<"deploy_id">> => DeployId, + <<"scenario_id">> => ScenarioId, + <<"host_id">> => HostId, + <<"create_ts">> => CreateTs, + <<"update_ts">> => UpdateTs, + <<"status">> => Status + }. \ No newline at end of file diff --git a/apps/iot/src/model/scenario_model.erl b/apps/iot/src/model/scenario_model.erl new file mode 100644 index 0000000..d573465 --- /dev/null +++ b/apps/iot/src/model/scenario_model.erl @@ -0,0 +1,87 @@ +%%%------------------------------------------------------------------- +%%% @author licheng5 +%%% @copyright (C) 2021, +%%% @doc +%%% +%%% @end +%%% Created : 27. 4月 2021 下午4:38 +%%%------------------------------------------------------------------- +-module(scenario_model). +-author("licheng5"). +-include("iot.hrl"). +-include_lib("stdlib/include/qlc.hrl"). + +%% API +-export([get_scenario/1, get_scenario_list/3, add_scenario/1, change_status/2, delete/1, table_size/0]). +-export([to_map/1]). + +get_scenario(ScenarioId) when is_integer(ScenarioId) -> + case mnesia:dirty_read(scenario, ScenarioId) of + [Scenario = #scenario{}] -> + {ok, Scenario}; + _ -> + undefined + end. + +%% 获取app信息 +-spec get_scenario_list(Filter :: any(), Start :: integer(), Limit :: integer()) -> + {ok, Items :: list(), TotalNum :: integer()} | + {error, Reason :: any()}. +get_scenario_list(Spec, Start, Limit) when is_integer(Limit), is_integer(Start), Start >= 0, Limit > 0 -> + Items0 = mnesia:dirty_select(scenario, [Spec]), + Items = lists:reverse(Items0), + NItems = lists:sublist(Items, Start + 1, Limit), + {ok, NItems, length(Items)}. + +-spec add_scenario(Scenario :: #scenario{}) -> ok | {error, Reason :: any()}. +add_scenario(Scenario = #scenario{}) -> + case mnesia:transaction(fun() -> mnesia:write(scenario, Scenario, write) end) of + {atomic, _} -> + ok; + {aborted, Error} -> + {error, Error} + end. + +-spec change_status(ScenarioId :: integer(), Status :: integer()) -> ok | {error, Reason :: any()}. +change_status(ScenarioId, Status) when is_integer(ScenarioId), is_integer(Status) -> + Fun = fun() -> + case mnesia:read(scenario, ScenarioId) of + [] -> + mnesia:abort(<<"host not found">>); + [Scenario] -> + mnesia:write(scenario, Scenario#scenario{status = Status}, write) + end + end, + case mnesia:transaction(Fun) of + {atomic, ok} -> + ok; + {aborted, Reason} -> + {error, Reason} + end. + +-spec delete(ScenarioId :: binary()) -> ok | {error, Reason :: any()}. +delete(ScenarioId) when is_integer(ScenarioId) -> + case mnesia:transaction(fun() -> mnesia:delete(scenario, ScenarioId, write) end) of + {atomic, ok} -> + ok; + {aborted, Reason} -> + {error, Reason} + end. + +%% 获取app表的数据大小 +table_size() -> + mnesia:table_info(scenario, size). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% helper methods +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +to_map(#scenario{scenario_id = ScenarioId, name = Name, desc = Desc, rule = Rule, update_ts = UpdateTs, status = Status}) -> + #{ + <<"scenario_id">> => ScenarioId, + <<"name">> => Name, + <<"desc">> => Desc, + <<"rule">> => Rule, + <<"update_ts">> => UpdateTs, + <<"status">> => Status + }. \ No newline at end of file From 148773241ce3ae78ec226f527ab2fac84eb1adb5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=89=E7=A4=BC=E6=88=90?= Date: Fri, 10 Mar 2023 15:34:38 +0800 Subject: [PATCH 20/24] fix handler --- .../src/http_handler/http_host_handler.erl | 18 +++- .../http_handler/http_scenario_handler.erl | 90 +++++++++++-------- apps/iot/src/model/scenario_model.erl | 2 +- 3 files changed, 72 insertions(+), 38 deletions(-) diff --git a/apps/iot/src/http_handler/http_host_handler.erl b/apps/iot/src/http_handler/http_host_handler.erl index dcbe936..3ab3a8a 100644 --- a/apps/iot/src/http_handler/http_host_handler.erl +++ b/apps/iot/src/http_handler/http_host_handler.erl @@ -86,7 +86,23 @@ handle_request("GET", "/host/detail", #{<<"id">> := HostId}, _) -> Services = lists:map(fun(S) -> service_model:to_map(S) end, Services0), HostInfo2 = maps:put(<<"services">>, Services, HostInfo1), - {ok, 200, iot_util:json_data(HostInfo2)} + %% 获取部署应用场景 + {ok, DeployList0} = scenario_deploy_model:get_host_deploy_list(HostId), + DeployList = lists:map(fun(E) -> scenario_deploy_model:to_map(E) end, DeployList0), + %% 获取部署的场景信息 + DeployList1 = lists:map(fun(DeployInfo = #{<<"scenario_id">> := ScenarioId}) -> + case scenario_model:get_scenario(ScenarioId) of + {ok, Scenario} -> + ScenarioInfo = scenario_model:to_map(Scenario), + DeployInfo#{<<"scenario_info">> => ScenarioInfo}; + undefined -> + DeployInfo#{<<"scenario_info">> => #{}} + end + end, DeployList), + + HostInfo3 = maps:put(<<"deploy_list">>, HostInfo2, DeployList1), + + {ok, 200, iot_util:json_data(HostInfo3)} end; handle_request("POST", "/host/update", _, _Params) -> diff --git a/apps/iot/src/http_handler/http_scenario_handler.erl b/apps/iot/src/http_handler/http_scenario_handler.erl index ab7b741..7f9acfe 100644 --- a/apps/iot/src/http_handler/http_scenario_handler.erl +++ b/apps/iot/src/http_handler/http_scenario_handler.erl @@ -31,58 +31,76 @@ handle_request(_, "/scenario/list", Params, PostParams) -> MatchHead = #scenario{name = '$1', _ = '_'}, Guard = [], - Guard1 = case Model =/= <<"">> of + Guard1 = case Name =/= <<"">> of true -> - [{'=:=', '$1', Model}|Guard]; + [{'=:=', '$1', Name}|Guard]; false -> Guard end, - - Guard2 = case CellId1 > 0 of - true -> - [{'=:=', '$2', CellId1}|Guard1]; - false -> - Guard1 - end, - Result = ['$_'], - case host_model:get_hosts({MatchHead, Guard2, Result}, Start, Size) of - {ok, Hosts, TotalNum} -> + case scenario_model:get_scenario_list({MatchHead, Guard1, Result}, Start, Size) of + {ok, ScenarioList, TotalNum} -> Response = #{ - <<"hosts">> => lists:map(fun(Host) -> host_model:to_map(Host) end, Hosts), - <<"stat">> => host_model:get_stat(), + <<"scenarios">> => lists:map(fun(Host) -> scenario_model:to_map(Host) end, ScenarioList), <<"total_num">> => TotalNum }, - - lager:debug("resp is: ~p", [Response]), - {ok, 200, iot_util:json_data(Response)}; {error, Reason} -> - lager:warning("[host_handler] get a error: ~p", [Reason]), + lager:warning("[http_scenario_handler] get a error: ~p", [Reason]), {ok, 200, iot_util:json_error(404, <<"database error">>)} end; -handle_request("GET", "/host/detail", #{<<"id">> := HostId}, _) -> - lager:debug("[host_handler] detail id is: ~p", [HostId]), - case host_model:get_host(HostId) of +handle_request("GET", "/scenario/detail", #{<<"id">> := ScenarioId0}, _) -> + ScenarioId = binary_to_integer(ScenarioId0), + case scenario_model:get_scenario(ScenarioId) of undefined -> - {ok, 200, iot_util:json_error(404, <<"host not found">>)}; - {ok, Host} -> - HostInfo = host_model:to_map(Host), - %% 获取终端信息 - {ok, Terminals0} = terminal_model:get_host_terminals(HostId), - Terminals = lists:map(fun(E) -> terminal_model:to_map(E) end, Terminals0), - HostInfo1 = maps:put(<<"terminals">>, Terminals, HostInfo), - %% 获取微服务信息 - {ok, Services0} = service_model:get_host_services(HostId), - Services = lists:map(fun(S) -> service_model:to_map(S) end, Services0), - HostInfo2 = maps:put(<<"services">>, Services, HostInfo1), + {ok, 200, iot_util:json_error(404, <<"scenario not found">>)}; + {ok, Scenario} -> + ScenarioInfo = scenario_model:to_map(Scenario), + %% 获取场景下部署的主机列表 + {ok, DeployList0} = scenario_deploy_model:get_scenario_deploy_list(ScenarioId), + DeployList = lists:map(fun(E) -> scenario_deploy_model:to_map(E) end, DeployList0), + ScenarioInfo1 = maps:put(<<"deploy_list">>, ScenarioInfo, DeployList), + %% 获取部署的主机信息 + ScenarioInfo2 = lists:map(fun(Info = #{<<"host_id">> := HostId}) -> + case host_model:get_host(HostId) of + {ok, Host} -> + HostInfo = host_model:to_map(Host), + Info#{<<"host_info">> => HostInfo}; + undefined -> + Info#{<<"host_info">> => #{}} + end + end, ScenarioInfo1), - {ok, 200, iot_util:json_data(HostInfo2)} + {ok, 200, iot_util:json_data(ScenarioInfo2)} end; -handle_request("POST", "/host/update", _, _Params) -> - lager:debug("[host_handler] post params is: ~p", [_Params]), +handle_request("POST", "/scenario/change_status", _, #{<<"scenario_id">> := ScenarioId, <<"status">> := Status}) when is_integer(ScenarioId), is_integer(Status) -> + case scenario_model:change_status(ScenarioId, Status) of + ok -> + {ok, 200, iot_util:json_data(<<"success">>)}; + {error, Reason} when is_binary(Reason) -> + lager:warning("[http_scenario_handler] change_status get a error: ~p", [Reason]), + {ok, 200, iot_util:json_error(404, Reason)} + end; - {ok, 200, iot_util:json_data(<<"success">>)}. \ No newline at end of file +%% 部署 +handle_request("POST", "/scenario/deploy", _, #{<<"scenario_id">> := ScenarioId, <<"status">> := Status}) when is_integer(ScenarioId), is_integer(Status) -> + case scenario_model:change_status(ScenarioId, Status) of + ok -> + {ok, 200, iot_util:json_data(<<"success">>)}; + {error, Reason} when is_binary(Reason) -> + lager:warning("[http_scenario_handler] change_status get a error: ~p", [Reason]), + {ok, 200, iot_util:json_error(404, Reason)} + end; + +%% 删除?? +handle_request("POST", "/scenario/change_status", _, #{<<"scenario_id">> := ScenarioId, <<"status">> := Status}) when is_integer(ScenarioId), is_integer(Status) -> + case scenario_model:change_status(ScenarioId, Status) of + ok -> + {ok, 200, iot_util:json_data(<<"success">>)}; + {error, Reason} when is_binary(Reason) -> + lager:warning("[http_scenario_handler] change_status get a error: ~p", [Reason]), + {ok, 200, iot_util:json_error(404, Reason)} + end. \ No newline at end of file diff --git a/apps/iot/src/model/scenario_model.erl b/apps/iot/src/model/scenario_model.erl index d573465..b3dbcb8 100644 --- a/apps/iot/src/model/scenario_model.erl +++ b/apps/iot/src/model/scenario_model.erl @@ -47,7 +47,7 @@ change_status(ScenarioId, Status) when is_integer(ScenarioId), is_integer(Status Fun = fun() -> case mnesia:read(scenario, ScenarioId) of [] -> - mnesia:abort(<<"host not found">>); + mnesia:abort(<<"scenario not found">>); [Scenario] -> mnesia:write(scenario, Scenario#scenario{status = Status}, write) end From 09c8f7c4ed094281beff25e2d4e7b88b57208a44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=89=E7=A4=BC=E6=88=90?= Date: Fri, 10 Mar 2023 15:43:17 +0800 Subject: [PATCH 21/24] fix --- docs/R.md | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 docs/R.md diff --git a/docs/R.md b/docs/R.md new file mode 100644 index 0000000..32a21ae --- /dev/null +++ b/docs/R.md @@ -0,0 +1,2 @@ +## +here From 70d624aeb4a544ca1813295cbc96249dbad80031 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=89=E7=A4=BC=E6=88=90?= Date: Fri, 10 Mar 2023 16:47:13 +0800 Subject: [PATCH 22/24] add issue --- apps/iot/include/iot.hrl | 23 ++++++ apps/iot/src/iot_issue.erl | 99 ++++++++++++++++++++++++++ apps/iot/src/iot_issue_sup.erl | 60 ++++++++++++++++ apps/iot/src/iot_mnesia.erl | 27 +++++++ apps/iot/src/iot_sup.erl | 9 +++ apps/iot/src/model/issue_model.erl | 109 +++++++++++++++++++++++++++++ 6 files changed, 327 insertions(+) create mode 100644 apps/iot/src/iot_issue.erl create mode 100644 apps/iot/src/iot_issue_sup.erl create mode 100644 apps/iot/src/model/issue_model.erl diff --git a/apps/iot/include/iot.hrl b/apps/iot/include/iot.hrl index d607d14..a3907f0 100644 --- a/apps/iot/include/iot.hrl +++ b/apps/iot/include/iot.hrl @@ -198,6 +198,29 @@ status = 0:: integer() }). +%% 工单管理 +-record(issue, { + issue_id :: integer(), + %% 工单名称 + name :: binary(), + %% 创建用户 + uid :: integer(), + %% 部署类型 + deploy_type :: integer(), + %% 关联的id, 微服务id或者场景id + assoc_id :: any(), + %% 主机的id列表 + hosts = [] :: list(), + %% 超时时间 + timeout = 0 :: integer(), + %% 部署结果 + results = [], + %% 创建时间 + create_ts = 0 :: integer(), + %% 工单状态 + status = 0 :: integer() +}). + -record(http_endpoint, { url = <<>> }). diff --git a/apps/iot/src/iot_issue.erl b/apps/iot/src/iot_issue.erl new file mode 100644 index 0000000..b980933 --- /dev/null +++ b/apps/iot/src/iot_issue.erl @@ -0,0 +1,99 @@ +%%%------------------------------------------------------------------- +%%% @author licheng5 +%%% @copyright (C) 2023, +%%% @doc +%%% +%%% @end +%%% Created : 10. 3月 2023 16:44 +%%%------------------------------------------------------------------- +-module(iot_issue). +-author("licheng5"). + +-behaviour(gen_server). + +%% API +-export([start_link/0]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, { + +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link() -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([]) -> + {ok, #state{}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(_Request, _From, State = #state{}) -> + {reply, ok, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast(_Request, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info(_Info, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== diff --git a/apps/iot/src/iot_issue_sup.erl b/apps/iot/src/iot_issue_sup.erl new file mode 100644 index 0000000..0b71ae7 --- /dev/null +++ b/apps/iot/src/iot_issue_sup.erl @@ -0,0 +1,60 @@ +%%%------------------------------------------------------------------- +%%% @author licheng5 +%%% @copyright (C) 2023, +%%% @doc +%%% +%%% @end +%%% Created : 10. 3月 2023 16:44 +%%%------------------------------------------------------------------- +-module(iot_issue_sup). +-author("licheng5"). + +-behaviour(supervisor). + +%% API +-export([start_link/0]). + +%% Supervisor callbacks +-export([init/1]). + +-define(SERVER, ?MODULE). + +%%%=================================================================== +%%% API functions +%%%=================================================================== + +%% @doc Starts the supervisor +-spec(start_link() -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link() -> + supervisor:start_link({local, ?SERVER}, ?MODULE, []). + +%%%=================================================================== +%%% Supervisor callbacks +%%%=================================================================== + +%% @private +%% @doc Whenever a supervisor is started using supervisor:start_link/[2,3], +%% this function is called by the new process to find out about +%% restart strategy, maximum restart frequency and child +%% specifications. +-spec(init(Args :: term()) -> + {ok, {SupFlags :: {RestartStrategy :: supervisor:strategy(), + MaxR :: non_neg_integer(), MaxT :: non_neg_integer()}, + [ChildSpec :: supervisor:child_spec()]}} + | ignore | {error, Reason :: term()}). +init([]) -> + SupFlags = #{strategy => simple_one_for_one, intensity => 1000, period => 3600}, + AChild = #{ + id => 'iot_issue', + start => {'iot_issue', start_link, []}, + restart => temporary, + shutdown => 2000, + type => worker, + modules => ['iot_issue'] + }, + + {ok, {SupFlags, [AChild]}}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== diff --git a/apps/iot/src/iot_mnesia.erl b/apps/iot/src/iot_mnesia.erl index 928b899..82e7f3a 100644 --- a/apps/iot/src/iot_mnesia.erl +++ b/apps/iot/src/iot_mnesia.erl @@ -64,6 +64,30 @@ init_database() -> {type, ordered_set} ]), + %% 应用场景 + mnesia:create_table(scenario, [ + {attributes, record_info(fields, scenario)}, + {record_name, scenario}, + {disc_copies, [node()]}, + {type, ordered_set} + ]), + + %% 应用场景部署关系表 + mnesia:create_table(scenario_deploy, [ + {attributes, record_info(fields, scenario_deploy)}, + {record_name, scenario_deploy}, + {disc_copies, [node()]}, + {type, ordered_set} + ]), + + %% 工单表 + mnesia:create_table(issue, [ + {attributes, record_info(fields, issue)}, + {record_name, issue}, + {disc_copies, [node()]}, + {type, ordered_set} + ]), + ok. %% 加入集群 @@ -85,4 +109,7 @@ copy_database(MasterNode) when is_atom(MasterNode) -> mnesia:add_table_copy(host, node(), ram_copies), mnesia:add_table_copy(terminal, node(), ram_copies), mnesia:add_table_copy(service, node(), ram_copies), + mnesia:add_table_copy(scenario, node(), ram_copies), + mnesia:add_table_copy(scenario_deploy, node(), ram_copies), + mnesia:add_table_copy(issue, node(), ram_copies), ok. \ No newline at end of file diff --git a/apps/iot/src/iot_sup.erl b/apps/iot/src/iot_sup.erl index 924e656..3cb80b4 100644 --- a/apps/iot/src/iot_sup.erl +++ b/apps/iot/src/iot_sup.erl @@ -37,6 +37,15 @@ init([]) -> modules => ['iot_router_sup'] }, + #{ + id => 'iot_issue_sup', + start => {'iot_issue_sup', start_link, []}, + restart => permanent, + shutdown => 2000, + type => supervisor, + modules => ['iot_issue_sup'] + }, + #{ id => 'iot_emqtt_client', start => {'iot_emqtt_client', start_link, []}, diff --git a/apps/iot/src/model/issue_model.erl b/apps/iot/src/model/issue_model.erl new file mode 100644 index 0000000..7ec209e --- /dev/null +++ b/apps/iot/src/model/issue_model.erl @@ -0,0 +1,109 @@ +%%%------------------------------------------------------------------- +%%% @author licheng5 +%%% @copyright (C) 2021, +%%% @doc +%%% +%%% @end +%%% Created : 27. 4月 2021 下午4:38 +%%%------------------------------------------------------------------- +-module(issue_model). +-author("licheng5"). +-include("iot.hrl"). +-include_lib("stdlib/include/qlc.hrl"). + +-define(TAB_NAME, issue). + +%% API +-export([get_issue/1, get_issues/3, add_issue/1, change_status/2, delete/1, table_size/0, get_user_issues/3]). +-export([to_map/1]). + +get_issue(IssueId) when is_integer(IssueId) -> + case mnesia:dirty_read(?TAB_NAME, IssueId) of + [Issue = #issue{}] -> + {ok, Issue}; + _ -> + undefined + end. + +%% 获取app信息 +-spec get_issues(Filter :: any(), Start :: integer(), Limit :: integer()) -> + {ok, Items :: list(), TotalNum :: integer()} | + {error, Reason :: any()}. +get_issues(Spec, Start, Limit) when is_integer(Limit), is_integer(Start), Start >= 0, Limit > 0 -> + Items = mnesia:dirty_select(?TAB_NAME, [Spec]), + NItems = lists:sublist(Items, Start + 1, Limit), + {ok, NItems, length(Items)}. + +-spec get_user_issues(UserId :: integer(), Start :: integer(), Limit :: integer()) -> + {ok, Items :: [#host{}], Num :: integer()} | + {error, Reason :: any()}. +get_user_issues(UserId, Start, Limit) when is_integer(UserId), is_integer(Limit), is_integer(Start), Start >= 0, Limit > 0 -> + Fun = fun() -> + Q = qlc:q([E || E <- mnesia:table(?TAB_NAME), E#issue.uid =:= UserId]), + qlc:e(Q) + end, + case mnesia:transaction(Fun) of + {atomic, Items} when is_list(Items) -> + {ok, lists:sublist(Items, Start + 1, Limit), length(Items)}; + {aborted, Error} -> + {error, Error} + end. + +-spec add_issue(Issue :: #issue{}) -> ok | {error, Reason :: any()}. +add_issue(Issue = #issue{}) -> + case mnesia:transaction(fun() -> mnesia:write(?TAB_NAME, Issue, write) end) of + {atomic, _} -> + ok; + {aborted, Error} -> + {error, Error} + end. + +-spec change_status(IssueId :: integer(), Status :: integer()) -> ok | {error, Reason :: any()}. +change_status(IssueId, Status) when is_integer(IssueId), is_integer(Status) -> + Fun = fun() -> + case mnesia:read(?TAB_NAME, IssueId) of + [] -> + mnesia:abort(<<"issue not found">>); + [Issue] -> + mnesia:write(?TAB_NAME, Issue#issue{status = Status}, write) + end + end, + case mnesia:transaction(Fun) of + {atomic, ok} -> + ok; + {aborted, Reason} -> + {error, Reason} + end. + +-spec delete(IssueId :: binary()) -> ok | {error, Reason :: any()}. +delete(IssueId) when is_integer(IssueId) -> + case mnesia:transaction(fun() -> mnesia:delete(?TAB_NAME, IssueId, write) end) of + {atomic, ok} -> + ok; + {aborted, Reason} -> + {error, Reason} + end. + +%% 获取app表的数据大小 +table_size() -> + mnesia:table_info(?TAB_NAME, size). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% helper methods +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +to_map(#issue{issue_id = IssueId, name = Name, uid = Uid, deploy_type = DeployType, assoc_id = AssocId, hosts = Hosts, timeout = Timeout, + create_ts = CreateTs, results = Results, status = Status}) -> + + #{ + <<"issue_id">> => IssueId, + <<"name">> => Name, + <<"uid">> => Uid, + <<"deploy_type">> => DeployType, + <<"assoc_id">> => AssocId, + <<"hosts">> => Hosts, + <<"timeout">> => Timeout, + <<"status">> => Status, + <<"create_ts">> => CreateTs, + <<"results">> => Results + }. \ No newline at end of file From 7bdaad201341f37f6c1b18562538e1e1ba8623c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=89=E7=A4=BC=E6=88=90?= Date: Fri, 10 Mar 2023 17:19:15 +0800 Subject: [PATCH 23/24] fix issue --- apps/iot/src/iot_issue.erl | 20 +++++++++++++------- apps/iot/src/iot_issue_sup.erl | 15 ++++++++++++++- apps/iot/src/iot_mock.erl | 14 ++++++++++++++ 3 files changed, 41 insertions(+), 8 deletions(-) diff --git a/apps/iot/src/iot_issue.erl b/apps/iot/src/iot_issue.erl index b980933..7091473 100644 --- a/apps/iot/src/iot_issue.erl +++ b/apps/iot/src/iot_issue.erl @@ -8,11 +8,12 @@ %%%------------------------------------------------------------------- -module(iot_issue). -author("licheng5"). +-include("iot.hrl"). -behaviour(gen_server). %% API --export([start_link/0]). +-export([start_link/1]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -20,18 +21,22 @@ -define(SERVER, ?MODULE). -record(state, { - + issue :: #issue{} }). %%%=================================================================== %%% API %%%=================================================================== +get_name(IssueId) when is_integer(IssueId) -> + list_to_atom("iot_issue:" ++ integer_to_list(IssueId)). + %% @doc Spawns the server and registers the local name (unique) --spec(start_link() -> +-spec(start_link(Issue :: #issue{}) -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +start_link(Issue = #issue{issue_id = IssueId}) -> + Name = get_name(IssueId), + gen_server:start_link({local, Name}, ?MODULE, [Issue], []). %%%=================================================================== %%% gen_server callbacks @@ -42,8 +47,9 @@ start_link() -> -spec(init(Args :: term()) -> {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). -init([]) -> - {ok, #state{}}. +init([Issue]) -> + lager:debug("iot_issue started!!"), + {ok, #state{issue = Issue}}. %% @private %% @doc Handling call messages diff --git a/apps/iot/src/iot_issue_sup.erl b/apps/iot/src/iot_issue_sup.erl index 0b71ae7..9c4a369 100644 --- a/apps/iot/src/iot_issue_sup.erl +++ b/apps/iot/src/iot_issue_sup.erl @@ -8,11 +8,12 @@ %%%------------------------------------------------------------------- -module(iot_issue_sup). -author("licheng5"). +-include("iot.hrl"). -behaviour(supervisor). %% API --export([start_link/0]). +-export([start_link/0, start_issue/1]). %% Supervisor callbacks -export([init/1]). @@ -58,3 +59,15 @@ init([]) -> %%%=================================================================== %%% Internal functions %%%=================================================================== + +%% 启动一个任务 +-spec start_issue(Issue :: #issue{}) -> {ok, Pid :: pid()} | {error, Reason :: any()}. +start_issue(Issue = #issue{}) -> + case supervisor:start_child(?MODULE, [Issue]) of + {ok, Pid} -> + {ok, Pid}; + {error, {'already_started', Pid}} -> + {ok, Pid}; + Error -> + Error + end. \ No newline at end of file diff --git a/apps/iot/src/iot_mock.erl b/apps/iot/src/iot_mock.erl index 8eb16e3..f0d7b0d 100644 --- a/apps/iot/src/iot_mock.erl +++ b/apps/iot/src/iot_mock.erl @@ -14,6 +14,20 @@ -export([insert_hosts/0, insert_services/1, insert_terminals/1, insert_routers/0]). -export([start_router/1]). -export([rsa_encode/1]). +-export([start_issue/0]). + +start_issue() -> + iot_issue_sup:start_issue(#issue{ + issue_id = 1, + name = <<"issue 1">>, + uid = 1234, + deploy_type = 1, + assoc_id = 1, + hosts = [<<"host1">>, <<"host2">>], + timeout = 6, + create_ts = 0, + status = 0 + }). insert_hosts() -> lists:foreach(fun(Id0) -> From 7877753602e92bd8051d8b2ce4a86c577d57dcff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=89=E7=A4=BC=E6=88=90?= Date: Fri, 10 Mar 2023 17:29:04 +0800 Subject: [PATCH 24/24] fix issue --- apps/iot/src/iot_issue.erl | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/apps/iot/src/iot_issue.erl b/apps/iot/src/iot_issue.erl index 7091473..08b1c07 100644 --- a/apps/iot/src/iot_issue.erl +++ b/apps/iot/src/iot_issue.erl @@ -14,6 +14,7 @@ %% API -export([start_link/1]). +-export([get_pid/1]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -21,16 +22,20 @@ -define(SERVER, ?MODULE). -record(state, { - issue :: #issue{} + issue :: #issue{}, + timer_ref }). %%%=================================================================== %%% API %%%=================================================================== + +get_pid(IssueId) when is_integer(IssueId) -> + whereis(get_name(IssueId)). + get_name(IssueId) when is_integer(IssueId) -> list_to_atom("iot_issue:" ++ integer_to_list(IssueId)). - %% @doc Spawns the server and registers the local name (unique) -spec(start_link(Issue :: #issue{}) -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). @@ -47,9 +52,18 @@ start_link(Issue = #issue{issue_id = IssueId}) -> -spec(init(Args :: term()) -> {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). -init([Issue]) -> - lager:debug("iot_issue started!!"), - {ok, #state{issue = Issue}}. +init([Issue = #issue{timeout = Timeout0, hosts = Hosts}]) -> + lager:debug("iot_issue started!!: ~p", [Issue]), + %% 启动任务定时器, 默认最长为1个小时 + Timeout = if Timeout0 > 0 -> Timeout0; true -> 3600 end, + TimerRef = erlang:start_timer(Timeout * 1000, self(), issue_task_timeout), + + %% TODO 下发数据到主机 + lists:map(fun(Host) -> + iot_emqtt_client:publish(Host, x, 1) + end, Hosts), + + {ok, #state{issue = Issue, timer_ref = TimerRef}}. %% @private %% @doc Handling call messages