diff --git a/apps/iot/include/iot.hrl b/apps/iot/include/iot.hrl index dade9f3..0aa5206 100644 --- a/apps/iot/include/iot.hrl +++ b/apps/iot/include/iot.hrl @@ -95,7 +95,7 @@ %% 北向数据 -record(north_data, { - id :: integer(), + id = 0 :: integer(), location_code :: binary(), body :: binary() }). \ No newline at end of file diff --git a/apps/iot/src/http_handler/endpoint_handler.erl b/apps/iot/src/http_handler/endpoint_handler.erl index ba77a9a..e774f96 100644 --- a/apps/iot/src/http_handler/endpoint_handler.erl +++ b/apps/iot/src/http_handler/endpoint_handler.erl @@ -84,10 +84,10 @@ handle_request("POST", "/endpoint/delete", _, #{<<"name">> := Name}) when is_bin {ok, 200, iot_util:json_error(404, <<"endpoint not found">>)}; Pid -> iot_endpoint:clean_up(Pid), - iot_endpoint_sup:delete_endpoint(Name) - end, + iot_endpoint_sup:delete_endpoint(Name), - {ok, 200, iot_util:json_data(<<"success">>)}; + {ok, 200, iot_util:json_data(<<"success">>)} + end; {error, Reason} -> lager:debug("[endpoint_handler] delete endpoint id: ~p, get error is: ~p", [Name, Reason]), {ok, 200, iot_util:json_error(404, <<"error">>)} diff --git a/apps/iot/src/iot_endpoint.erl b/apps/iot/src/iot_endpoint.erl index b58494e..ee67f98 100644 --- a/apps/iot/src/iot_endpoint.erl +++ b/apps/iot/src/iot_endpoint.erl @@ -89,12 +89,16 @@ init([Endpoint = #endpoint{name = Name}]) -> %% 创建转发器, 避免阻塞当前进程的创建,因此采用了延时初始化的机制 erlang:start_timer(0, self(), create_postman), - %% 创建自己的队列数据库表 - TabName = binary_to_atom(<<"queue_data:", Name/binary>>), - %% 如果表已经存在需要等待表数据加载完成 - mnesia_queue:ensure_queue(TabName), + try + %% 创建自己的队列数据库表 + TabName = binary_to_atom(<<"queue_data:", Name/binary>>), + mnesia_queue:ensure_queue(TabName), - {ok, disconnected, #state{endpoint = Endpoint, tab_name = TabName, postman_pid = undefined}}. + {ok, disconnected, #state{endpoint = Endpoint, tab_name = TabName, postman_pid = undefined}} + catch _:Error -> + lager:warning("[iot_endpoint] endpoint: ~p, init get error: ~p", [Name, Error]), + ignore + end. %% @private %% @doc This function is called by a gen_statem when it needs to find out diff --git a/apps/iot/src/iot_endpoint_monitor.erl b/apps/iot/src/iot_endpoint_monitor.erl index a6c9e85..fa1d546 100644 --- a/apps/iot/src/iot_endpoint_monitor.erl +++ b/apps/iot/src/iot_endpoint_monitor.erl @@ -56,7 +56,7 @@ init([]) -> mnesia:subscribe({table, endpoint, simple}), ets:new(endpoint_cache, [public, set, named_table, {keypos, 1}]), %% 加载信息到缓存中 - load_endpoints(), + catch load_endpoints(), {ok, #state{}}. %% @private @@ -87,13 +87,13 @@ handle_cast(_Request, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_info({mnesia_table_event, {delete, {endpoint, Name}, Tid}}, State = #state{}) -> +handle_info({mnesia_table_event, {delete, {endpoint, Name}, Tid}}, State) -> lager:debug("[iot_endpoint_monitor] delete_event endpoint name: ~p, tid: ~p", [Name, Tid]), iot_endpoint_sup:delete_endpoint(Name), - load_endpoints(), + catch load_endpoints(), {noreply, State}; -handle_info({mnesia_table_event, {write, Endpoint = #endpoint{name = Name}, Tid}}, State = #state{}) -> +handle_info({mnesia_table_event, {write, Endpoint = #endpoint{name = Name}, Tid}}, State) -> lager:debug("[iot_endpoint_monitor] write_event new endpoint: ~p, tid: ~p", [Endpoint, Tid]), case iot_endpoint:get_pid(Name) of undefined -> @@ -102,7 +102,7 @@ handle_info({mnesia_table_event, {write, Endpoint = #endpoint{name = Name}, Tid} Pid when is_pid(Pid) -> iot_endpoint:reload(Pid, Endpoint) end, - load_endpoints(), + catch load_endpoints(), {noreply, State}. diff --git a/apps/iot/src/mnesia/mnesia_endpoint.erl b/apps/iot/src/mnesia/mnesia_endpoint.erl index 126e0e3..34828be 100644 --- a/apps/iot/src/mnesia/mnesia_endpoint.erl +++ b/apps/iot/src/mnesia/mnesia_endpoint.erl @@ -24,8 +24,7 @@ get_all_endpoints() -> case mnesia:transaction(Fun) of {atomic, Endpoints} -> Endpoints; - {aborted, Reason} -> - lager:warning("[mnesia_endpoint] get_all_endpoints get a error: ~p", [Reason]), + {aborted, _} -> [] end. diff --git a/apps/iot/src/mocker/eval_test.erl b/apps/iot/src/mocker/eval_test.erl index a7963b0..0268655 100644 --- a/apps/iot/src/mocker/eval_test.erl +++ b/apps/iot/src/mocker/eval_test.erl @@ -19,7 +19,7 @@ test() -> {ok, Tokens, _} = erl_scan:string(binary_to_list(Content)), {ok, ExprList} = erl_parse:parse_exprs(Tokens), - {value, F, NewBindings} = erl_eval:exprs(ExprList, []), + {value, F, _NewBindings} = erl_eval:exprs(ExprList, []), F(#{name => <<"test">>}). diff --git a/apps/iot/src/mocker/iot_endpoint_mocker.erl b/apps/iot/src/mocker/iot_endpoint_mocker.erl deleted file mode 100644 index 179c2a5..0000000 --- a/apps/iot/src/mocker/iot_endpoint_mocker.erl +++ /dev/null @@ -1,119 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author aresei -%%% @copyright (C) 2023, -%%% @doc -%%% -%%% @end -%%% Created : 05. 7月 2023 23:22 -%%%------------------------------------------------------------------- --module(iot_endpoint_mocker). --author("aresei"). - --behaviour(gen_server). - -%% API --export([start_link/0, test/0]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). - --define(SERVER, ?MODULE). - --record(state, { - -}). - -%%%=================================================================== -%%% API -%%%=================================================================== - -test() -> - {ok, Pid} = start_link(), - Data = #{ - <<"name">> => <<"anlicheng">> - }, - gen_server:cast(Pid, {forward, Data}), - ok. - - -%% @doc Spawns the server and registers the local name (unique) --spec(start_link() -> - {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). -start_link() -> - gen_server:start_link(?MODULE, [], []). - -%%%=================================================================== -%%% gen_server callbacks -%%%=================================================================== - -%% @private -%% @doc Initializes the server --spec(init(Args :: term()) -> - {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | - {stop, Reason :: term()} | ignore). -init([]) -> - {ok, #state{}}. - -%% @private -%% @doc Handling call messages --spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, - State :: #state{}) -> - {reply, Reply :: term(), NewState :: #state{}} | - {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_call(_Request, _From, State = #state{}) -> - {reply, ok, State}. - -%% @private -%% @doc Handling cast messages --spec(handle_cast(Request :: term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_cast({forward, Data}, State = #state{}) -> - Name = <<"zhongguodianli">>, - Pid = iot_endpoint:get_pid(Name), - Body = jiffy:encode(Data), - - iot_endpoint:forward(Pid, <<"abc123">>, Body), - - erlang:start_timer(5000, self(), {resend, Pid, Body, 1}), - - {noreply, State}. - -%% @private -%% @doc Handling all non call/cast messages --spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_info({timeout, _, {resend, Pid, Body, Id}}, State = #state{}) -> - iot_endpoint:forward(Pid, <<"abc123">>, jiffy:encode(#{<<"id">> => Id})), - erlang:start_timer(5000, self(), {resend, Pid, Body, Id + 1}), - - {noreply, State}. - -%% @private -%% @doc This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any -%% necessary cleaning up. When it returns, the gen_server terminates -%% with Reason. The return value is ignored. --spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), - State :: #state{}) -> term()). -terminate(_Reason, _State = #state{}) -> - ok. - -%% @private -%% @doc Convert process state when code is changed --spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, - Extra :: term()) -> - {ok, NewState :: #state{}} | {error, Reason :: term()}). -code_change(_OldVsn, State = #state{}, _Extra) -> - {ok, State}. - -%%%=================================================================== -%%% Internal functions -%%%===================================================================