fix
This commit is contained in:
parent
0a229bbbb4
commit
cfe88b4975
@ -95,7 +95,7 @@
|
|||||||
|
|
||||||
%% 北向数据
|
%% 北向数据
|
||||||
-record(north_data, {
|
-record(north_data, {
|
||||||
id :: integer(),
|
id = 0 :: integer(),
|
||||||
location_code :: binary(),
|
location_code :: binary(),
|
||||||
body :: binary()
|
body :: binary()
|
||||||
}).
|
}).
|
||||||
@ -84,10 +84,10 @@ handle_request("POST", "/endpoint/delete", _, #{<<"name">> := Name}) when is_bin
|
|||||||
{ok, 200, iot_util:json_error(404, <<"endpoint not found">>)};
|
{ok, 200, iot_util:json_error(404, <<"endpoint not found">>)};
|
||||||
Pid ->
|
Pid ->
|
||||||
iot_endpoint:clean_up(Pid),
|
iot_endpoint:clean_up(Pid),
|
||||||
iot_endpoint_sup:delete_endpoint(Name)
|
iot_endpoint_sup:delete_endpoint(Name),
|
||||||
end,
|
|
||||||
|
|
||||||
{ok, 200, iot_util:json_data(<<"success">>)};
|
{ok, 200, iot_util:json_data(<<"success">>)}
|
||||||
|
end;
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
lager:debug("[endpoint_handler] delete endpoint id: ~p, get error is: ~p", [Name, Reason]),
|
lager:debug("[endpoint_handler] delete endpoint id: ~p, get error is: ~p", [Name, Reason]),
|
||||||
{ok, 200, iot_util:json_error(404, <<"error">>)}
|
{ok, 200, iot_util:json_error(404, <<"error">>)}
|
||||||
|
|||||||
@ -89,12 +89,16 @@ init([Endpoint = #endpoint{name = Name}]) ->
|
|||||||
|
|
||||||
%% 创建转发器, 避免阻塞当前进程的创建,因此采用了延时初始化的机制
|
%% 创建转发器, 避免阻塞当前进程的创建,因此采用了延时初始化的机制
|
||||||
erlang:start_timer(0, self(), create_postman),
|
erlang:start_timer(0, self(), create_postman),
|
||||||
%% 创建自己的队列数据库表
|
try
|
||||||
TabName = binary_to_atom(<<"queue_data:", Name/binary>>),
|
%% 创建自己的队列数据库表
|
||||||
%% 如果表已经存在需要等待表数据加载完成
|
TabName = binary_to_atom(<<"queue_data:", Name/binary>>),
|
||||||
mnesia_queue:ensure_queue(TabName),
|
mnesia_queue:ensure_queue(TabName),
|
||||||
|
|
||||||
{ok, disconnected, #state{endpoint = Endpoint, tab_name = TabName, postman_pid = undefined}}.
|
{ok, disconnected, #state{endpoint = Endpoint, tab_name = TabName, postman_pid = undefined}}
|
||||||
|
catch _:Error ->
|
||||||
|
lager:warning("[iot_endpoint] endpoint: ~p, init get error: ~p", [Name, Error]),
|
||||||
|
ignore
|
||||||
|
end.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
%% @doc This function is called by a gen_statem when it needs to find out
|
%% @doc This function is called by a gen_statem when it needs to find out
|
||||||
|
|||||||
@ -56,7 +56,7 @@ init([]) ->
|
|||||||
mnesia:subscribe({table, endpoint, simple}),
|
mnesia:subscribe({table, endpoint, simple}),
|
||||||
ets:new(endpoint_cache, [public, set, named_table, {keypos, 1}]),
|
ets:new(endpoint_cache, [public, set, named_table, {keypos, 1}]),
|
||||||
%% 加载信息到缓存中
|
%% 加载信息到缓存中
|
||||||
load_endpoints(),
|
catch load_endpoints(),
|
||||||
{ok, #state{}}.
|
{ok, #state{}}.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
@ -87,13 +87,13 @@ handle_cast(_Request, State = #state{}) ->
|
|||||||
{noreply, NewState :: #state{}} |
|
{noreply, NewState :: #state{}} |
|
||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
handle_info({mnesia_table_event, {delete, {endpoint, Name}, Tid}}, State = #state{}) ->
|
handle_info({mnesia_table_event, {delete, {endpoint, Name}, Tid}}, State) ->
|
||||||
lager:debug("[iot_endpoint_monitor] delete_event endpoint name: ~p, tid: ~p", [Name, Tid]),
|
lager:debug("[iot_endpoint_monitor] delete_event endpoint name: ~p, tid: ~p", [Name, Tid]),
|
||||||
iot_endpoint_sup:delete_endpoint(Name),
|
iot_endpoint_sup:delete_endpoint(Name),
|
||||||
load_endpoints(),
|
catch load_endpoints(),
|
||||||
|
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
handle_info({mnesia_table_event, {write, Endpoint = #endpoint{name = Name}, Tid}}, State = #state{}) ->
|
handle_info({mnesia_table_event, {write, Endpoint = #endpoint{name = Name}, Tid}}, State) ->
|
||||||
lager:debug("[iot_endpoint_monitor] write_event new endpoint: ~p, tid: ~p", [Endpoint, Tid]),
|
lager:debug("[iot_endpoint_monitor] write_event new endpoint: ~p, tid: ~p", [Endpoint, Tid]),
|
||||||
case iot_endpoint:get_pid(Name) of
|
case iot_endpoint:get_pid(Name) of
|
||||||
undefined ->
|
undefined ->
|
||||||
@ -102,7 +102,7 @@ handle_info({mnesia_table_event, {write, Endpoint = #endpoint{name = Name}, Tid}
|
|||||||
Pid when is_pid(Pid) ->
|
Pid when is_pid(Pid) ->
|
||||||
iot_endpoint:reload(Pid, Endpoint)
|
iot_endpoint:reload(Pid, Endpoint)
|
||||||
end,
|
end,
|
||||||
load_endpoints(),
|
catch load_endpoints(),
|
||||||
|
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
|
|||||||
@ -24,8 +24,7 @@ get_all_endpoints() ->
|
|||||||
case mnesia:transaction(Fun) of
|
case mnesia:transaction(Fun) of
|
||||||
{atomic, Endpoints} ->
|
{atomic, Endpoints} ->
|
||||||
Endpoints;
|
Endpoints;
|
||||||
{aborted, Reason} ->
|
{aborted, _} ->
|
||||||
lager:warning("[mnesia_endpoint] get_all_endpoints get a error: ~p", [Reason]),
|
|
||||||
[]
|
[]
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|||||||
@ -19,7 +19,7 @@ test() ->
|
|||||||
{ok, Tokens, _} = erl_scan:string(binary_to_list(Content)),
|
{ok, Tokens, _} = erl_scan:string(binary_to_list(Content)),
|
||||||
{ok, ExprList} = erl_parse:parse_exprs(Tokens),
|
{ok, ExprList} = erl_parse:parse_exprs(Tokens),
|
||||||
|
|
||||||
{value, F, NewBindings} = erl_eval:exprs(ExprList, []),
|
{value, F, _NewBindings} = erl_eval:exprs(ExprList, []),
|
||||||
F(#{name => <<"test">>}).
|
F(#{name => <<"test">>}).
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -1,119 +0,0 @@
|
|||||||
%%%-------------------------------------------------------------------
|
|
||||||
%%% @author aresei
|
|
||||||
%%% @copyright (C) 2023, <COMPANY>
|
|
||||||
%%% @doc
|
|
||||||
%%%
|
|
||||||
%%% @end
|
|
||||||
%%% Created : 05. 7月 2023 23:22
|
|
||||||
%%%-------------------------------------------------------------------
|
|
||||||
-module(iot_endpoint_mocker).
|
|
||||||
-author("aresei").
|
|
||||||
|
|
||||||
-behaviour(gen_server).
|
|
||||||
|
|
||||||
%% API
|
|
||||||
-export([start_link/0, test/0]).
|
|
||||||
|
|
||||||
%% gen_server callbacks
|
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
|
||||||
|
|
||||||
-define(SERVER, ?MODULE).
|
|
||||||
|
|
||||||
-record(state, {
|
|
||||||
|
|
||||||
}).
|
|
||||||
|
|
||||||
%%%===================================================================
|
|
||||||
%%% API
|
|
||||||
%%%===================================================================
|
|
||||||
|
|
||||||
test() ->
|
|
||||||
{ok, Pid} = start_link(),
|
|
||||||
Data = #{
|
|
||||||
<<"name">> => <<"anlicheng">>
|
|
||||||
},
|
|
||||||
gen_server:cast(Pid, {forward, Data}),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
|
|
||||||
%% @doc Spawns the server and registers the local name (unique)
|
|
||||||
-spec(start_link() ->
|
|
||||||
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
|
|
||||||
start_link() ->
|
|
||||||
gen_server:start_link(?MODULE, [], []).
|
|
||||||
|
|
||||||
%%%===================================================================
|
|
||||||
%%% gen_server callbacks
|
|
||||||
%%%===================================================================
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Initializes the server
|
|
||||||
-spec(init(Args :: term()) ->
|
|
||||||
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
|
|
||||||
{stop, Reason :: term()} | ignore).
|
|
||||||
init([]) ->
|
|
||||||
{ok, #state{}}.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Handling call messages
|
|
||||||
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
|
|
||||||
State :: #state{}) ->
|
|
||||||
{reply, Reply :: term(), NewState :: #state{}} |
|
|
||||||
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
|
|
||||||
{noreply, NewState :: #state{}} |
|
|
||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
|
||||||
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
|
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
|
||||||
handle_call(_Request, _From, State = #state{}) ->
|
|
||||||
{reply, ok, State}.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Handling cast messages
|
|
||||||
-spec(handle_cast(Request :: term(), State :: #state{}) ->
|
|
||||||
{noreply, NewState :: #state{}} |
|
|
||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
|
||||||
handle_cast({forward, Data}, State = #state{}) ->
|
|
||||||
Name = <<"zhongguodianli">>,
|
|
||||||
Pid = iot_endpoint:get_pid(Name),
|
|
||||||
Body = jiffy:encode(Data),
|
|
||||||
|
|
||||||
iot_endpoint:forward(Pid, <<"abc123">>, Body),
|
|
||||||
|
|
||||||
erlang:start_timer(5000, self(), {resend, Pid, Body, 1}),
|
|
||||||
|
|
||||||
{noreply, State}.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Handling all non call/cast messages
|
|
||||||
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
|
|
||||||
{noreply, NewState :: #state{}} |
|
|
||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
|
||||||
handle_info({timeout, _, {resend, Pid, Body, Id}}, State = #state{}) ->
|
|
||||||
iot_endpoint:forward(Pid, <<"abc123">>, jiffy:encode(#{<<"id">> => Id})),
|
|
||||||
erlang:start_timer(5000, self(), {resend, Pid, Body, Id + 1}),
|
|
||||||
|
|
||||||
{noreply, State}.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc This function is called by a gen_server when it is about to
|
|
||||||
%% terminate. It should be the opposite of Module:init/1 and do any
|
|
||||||
%% necessary cleaning up. When it returns, the gen_server terminates
|
|
||||||
%% with Reason. The return value is ignored.
|
|
||||||
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
|
|
||||||
State :: #state{}) -> term()).
|
|
||||||
terminate(_Reason, _State = #state{}) ->
|
|
||||||
ok.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Convert process state when code is changed
|
|
||||||
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
|
|
||||||
Extra :: term()) ->
|
|
||||||
{ok, NewState :: #state{}} | {error, Reason :: term()}).
|
|
||||||
code_change(_OldVsn, State = #state{}, _Extra) ->
|
|
||||||
{ok, State}.
|
|
||||||
|
|
||||||
%%%===================================================================
|
|
||||||
%%% Internal functions
|
|
||||||
%%%===================================================================
|
|
||||||
Loading…
x
Reference in New Issue
Block a user