From ecf0646ec77abf2638b406f9e2343f6acfc5b798 Mon Sep 17 00:00:00 2001 From: anlicheng Date: Fri, 18 Aug 2023 11:59:39 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4endpoint?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/src/http_handler/endpoint_handler.erl | 10 ++ apps/iot/src/iot_endpoint.erl | 29 ++-- apps/iot/src/iot_endpoint_monitor.erl | 134 ------------------ apps/iot/src/iot_router.erl | 22 +-- apps/iot/src/iot_sup.erl | 9 -- apps/iot/src/mnesia/mnesia_endpoint.erl | 6 +- 6 files changed, 40 insertions(+), 170 deletions(-) delete mode 100644 apps/iot/src/iot_endpoint_monitor.erl diff --git a/apps/iot/src/http_handler/endpoint_handler.erl b/apps/iot/src/http_handler/endpoint_handler.erl index fab5c10..b0caddc 100644 --- a/apps/iot/src/http_handler/endpoint_handler.erl +++ b/apps/iot/src/http_handler/endpoint_handler.erl @@ -51,6 +51,9 @@ handle_request("POST", "/endpoint/create", _, Params = #{<<"name">> := Name}) -> Endpoint0 = make_endpoint(maps:to_list(Params), #endpoint{}), Endpoint = Endpoint0#endpoint{created_at = iot_util:timestamp_of_seconds()}, ok = mnesia_endpoint:insert(Endpoint), + %% 重新启动endpoint + {ok, _} = iot_endpoint_sup:ensured_endpoint_started(Endpoint), + {ok, 200, iot_util:json_data(<<"success">>)}; {ok, _} -> {ok, 200, iot_util:json_error(404, <<"endpoint name exists">>)} @@ -68,6 +71,13 @@ handle_request("POST", "/endpoint/update", _, Params = #{<<"name">> := Name}) wh NEndpoint1 = NEndpoint#endpoint{updated_at = iot_util:timestamp_of_seconds()}, ok = mnesia_endpoint:insert(NEndpoint1), + case iot_endpoint:get_pid(Name) of + undefined -> + {ok, _} = iot_endpoint_sup:ensured_endpoint_started(Endpoint); + Pid when is_pid(Pid) -> + iot_endpoint:reload(Pid, Endpoint) + end, + {ok, 200, iot_util:json_data(<<"success">>)} end; diff --git a/apps/iot/src/iot_endpoint.erl b/apps/iot/src/iot_endpoint.erl index d18ddf7..8b0bf56 100644 --- a/apps/iot/src/iot_endpoint.erl +++ b/apps/iot/src/iot_endpoint.erl @@ -24,6 +24,7 @@ -record(state, { endpoint :: #endpoint{}, + mp, postman_pid :: undefined | pid(), %% 队列数据库名, 写入到对端都有可能失败,因此缓存队列需要自己维护 tab_name :: atom(), @@ -88,9 +89,11 @@ start_link(Name, Endpoint = #endpoint{}) -> %% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or %% gen_statem:start_link/[3,4], this function is called by the new %% process to initialize. -init([Endpoint = #endpoint{name = Name}]) -> +init([Endpoint = #endpoint{name = Name, matcher = Regexp}]) -> erlang:process_flag(trap_exit, true), + %% 编译正则表达式 + {ok, MP} = re:compile(Regexp), %% 创建转发器, 避免阻塞当前进程的创建,因此采用了延时初始化的机制 erlang:start_timer(0, self(), create_postman), try @@ -98,7 +101,7 @@ init([Endpoint = #endpoint{name = Name}]) -> TabName = binary_to_atom(<<"queue_data:", Name/binary>>), mnesia_queue:ensure_queue(TabName), - {ok, disconnected, #state{endpoint = Endpoint, tab_name = TabName, postman_pid = undefined}} + {ok, disconnected, #state{endpoint = Endpoint, mp = MP, tab_name = TabName, postman_pid = undefined}} catch _:Error -> lager:warning("[iot_endpoint] endpoint: ~p, init get error: ~p, ignore", [Name, Error]), ignore @@ -140,14 +143,20 @@ handle_event(cast, {reload, NEndpoint = #endpoint{name = Name}}, connected, Stat {next_state, disconnected, State#state{endpoint = NEndpoint, timer_map = maps:new(), postman_pid = undefined}} end; -handle_event(cast, {forward, LocationCode, Fields, Timestamp}, StateName, State = #state{tab_name = TabName, window_size = WindowSize, flight_num = FlightNum}) -> - mnesia_queue:insert(TabName, #north_data{location_code = LocationCode, fields = Fields, timestamp = Timestamp}), - %% 避免不必要的内部消息 - Actions = case StateName =:= connected andalso FlightNum < WindowSize of - true -> [{next_event, info, fetch_next}]; - false -> [] - end, - {keep_state, State, Actions}; +handle_event(cast, {forward, LocationCode, Fields, Timestamp}, StateName, State = #state{mp = MP, tab_name = TabName, window_size = WindowSize, flight_num = FlightNum, endpoint = #endpoint{name = Name}}) -> + case re:run(LocationCode, MP, [{capture, all, list}]) of + nomatch -> + {keep_state, State}; + {match, _} -> + lager:debug("[iot_endpoint] name: ~p, match location_code: ~p", [Name, LocationCode]), + mnesia_queue:insert(TabName, #north_data{location_code = LocationCode, fields = Fields, timestamp = Timestamp}), + %% 避免不必要的内部消息 + Actions = case StateName =:= connected andalso FlightNum < WindowSize of + true -> [{next_event, info, fetch_next}]; + false -> [] + end, + {keep_state, State, Actions} + end; %% 触发读取下一条数据 handle_event(info, fetch_next, disconnected, State = #state{endpoint = #endpoint{name = Name}}) -> diff --git a/apps/iot/src/iot_endpoint_monitor.erl b/apps/iot/src/iot_endpoint_monitor.erl deleted file mode 100644 index fa1d546..0000000 --- a/apps/iot/src/iot_endpoint_monitor.erl +++ /dev/null @@ -1,134 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author aresei -%%% @copyright (C) 2023, -%%% @doc -%%% -%%% @end -%%% Created : 25. 7月 2023 16:39 -%%%------------------------------------------------------------------- --module(iot_endpoint_monitor). --author("aresei"). --include("iot.hrl"). - --behaviour(gen_server). - -%% API --export([start_link/0]). --export([get_all_endpoints/0]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). - --define(SERVER, ?MODULE). - --record(state, { - -}). - -%%%=================================================================== -%%% API -%%%=================================================================== - -get_all_endpoints() -> - case ets:lookup(endpoint_cache, endpoints) of - [] -> - []; - [{_, Endpoints} | _] -> - Endpoints - end. - -%% @doc Spawns the server and registers the local name (unique) --spec(start_link() -> - {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). - -%%%=================================================================== -%%% gen_server callbacks -%%%=================================================================== - -%% @private -%% @doc Initializes the server --spec(init(Args :: term()) -> - {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | - {stop, Reason :: term()} | ignore). -init([]) -> - mnesia:subscribe({table, endpoint, simple}), - ets:new(endpoint_cache, [public, set, named_table, {keypos, 1}]), - %% 加载信息到缓存中 - catch load_endpoints(), - {ok, #state{}}. - -%% @private -%% @doc Handling call messages --spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, - State :: #state{}) -> - {reply, Reply :: term(), NewState :: #state{}} | - {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_call(_Request, _From, State = #state{}) -> - {reply, ok, State}. - -%% @private -%% @doc Handling cast messages --spec(handle_cast(Request :: term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_cast(_Request, State = #state{}) -> - {noreply, State}. - -%% @private -%% @doc Handling all non call/cast messages --spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_info({mnesia_table_event, {delete, {endpoint, Name}, Tid}}, State) -> - lager:debug("[iot_endpoint_monitor] delete_event endpoint name: ~p, tid: ~p", [Name, Tid]), - iot_endpoint_sup:delete_endpoint(Name), - catch load_endpoints(), - - {noreply, State}; -handle_info({mnesia_table_event, {write, Endpoint = #endpoint{name = Name}, Tid}}, State) -> - lager:debug("[iot_endpoint_monitor] write_event new endpoint: ~p, tid: ~p", [Endpoint, Tid]), - case iot_endpoint:get_pid(Name) of - undefined -> - %% 重新启动endpoint - {ok, _} = iot_endpoint_sup:ensured_endpoint_started(Endpoint); - Pid when is_pid(Pid) -> - iot_endpoint:reload(Pid, Endpoint) - end, - catch load_endpoints(), - - {noreply, State}. - -%% @private -%% @doc This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any -%% necessary cleaning up. When it returns, the gen_server terminates -%% with Reason. The return value is ignored. --spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), - State :: #state{}) -> term()). -terminate(_Reason, _State = #state{}) -> - ok. - -%% @private -%% @doc Convert process state when code is changed --spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, - Extra :: term()) -> - {ok, NewState :: #state{}} | {error, Reason :: term()}). -code_change(_OldVsn, State = #state{}, _Extra) -> - {ok, State}. - -%%%=================================================================== -%%% Internal functions -%%%=================================================================== - --spec load_endpoints() -> no_return(). -load_endpoints() -> - Endpoints = mnesia_endpoint:get_all_endpoints(), - true = ets:insert(endpoint_cache, {endpoints, Endpoints}). \ No newline at end of file diff --git a/apps/iot/src/iot_router.erl b/apps/iot/src/iot_router.erl index bc605c4..d1ec86d 100644 --- a/apps/iot/src/iot_router.erl +++ b/apps/iot/src/iot_router.erl @@ -27,19 +27,9 @@ route_uuid(RouterUUID, Fields, Timestamp) when is_binary(RouterUUID), is_list(Fi -spec route(LocationCode :: binary(), Fields :: list(), Timestamp :: integer()) -> ok. route(LocationCode, Fields, Timestamp) when is_binary(LocationCode), is_list(Fields), is_integer(Timestamp) -> - Endpoints = iot_endpoint_monitor:get_all_endpoints(), - router0(Endpoints, LocationCode, Fields, Timestamp). -router0([], _, _, _) -> - ok; -router0([#endpoint{matcher = Regexp, name = Name} | Endpoints], LocationCode, Fields, Timestamp) -> - {ok, MP} = re:compile(Regexp), - case re:run(LocationCode, MP, [{capture, all, list}]) of - nomatch -> - router0(Endpoints, LocationCode, Fields, Timestamp); - {match, _} -> - lager:debug("[iot_router] match endpoint: ~p", [Name]), - Pid = iot_endpoint:get_pid(Name), - iot_endpoint:forward(Pid, LocationCode, Fields, Timestamp), - %% 继续匹配其他的Endpoint - router0(Endpoints, LocationCode, Fields, Timestamp) - end. \ No newline at end of file + [begin + Pid = iot_endpoint:get_pid(EndpointName), + iot_endpoint:forward(Pid, LocationCode, Fields, Timestamp) + end || EndpointName <- mnesia_endpoint:get_keys()], + ok. + diff --git a/apps/iot/src/iot_sup.erl b/apps/iot/src/iot_sup.erl index 96707ee..bd20919 100644 --- a/apps/iot/src/iot_sup.erl +++ b/apps/iot/src/iot_sup.erl @@ -28,15 +28,6 @@ start_link() -> init([]) -> SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600}, Specs = [ - #{ - id => 'iot_endpoint_monitor', - start => {'iot_endpoint_monitor', start_link, []}, - restart => permanent, - shutdown => 2000, - type => supervisor, - modules => ['iot_endpoint_monitor'] - }, - #{ id => 'iot_endpoint_sup', start => {'iot_endpoint_sup', start_link, []}, diff --git a/apps/iot/src/mnesia/mnesia_endpoint.erl b/apps/iot/src/mnesia/mnesia_endpoint.erl index ffe2f4a..9c89c9f 100644 --- a/apps/iot/src/mnesia/mnesia_endpoint.erl +++ b/apps/iot/src/mnesia/mnesia_endpoint.erl @@ -12,9 +12,13 @@ -include("iot.hrl"). %% API --export([get_all_endpoints/0, get_endpoint/1, insert/1, delete/1]). +-export([get_keys/0, get_all_endpoints/0, get_endpoint/1, insert/1, delete/1]). -export([to_map/1, config_equals/2]). +-spec get_keys() -> [Name :: binary()]. +get_keys() -> + mnesia:dirty_all_keys(endpoint). + -spec get_all_endpoints() -> [#endpoint{}]. get_all_endpoints() -> Fun = fun() ->