diff --git a/apps/iot/src/data_format/line_format.erl b/apps/iot/src/data_format/line_format.erl index 775ad3c..53d865b 100644 --- a/apps/iot/src/data_format/line_format.erl +++ b/apps/iot/src/data_format/line_format.erl @@ -13,7 +13,7 @@ -export([parse/1]). %% <<"cpu,name=xyz,uuid=\"this,=isx=test,\" key=345,key1=23.5,key2=yes 123457">>, --spec parse(Metric :: binary()) -> map(). +-spec parse(Metric :: binary()) -> error | {ok, map()}. parse(Metric) when is_binary(Metric) -> case catch lexer(Metric) of [[Measurement|Tags], Fields, [Timestamp|_]] -> diff --git a/apps/iot/src/database/micro_service_bo.erl b/apps/iot/src/database/micro_service_bo.erl index 8f54008..03a2b86 100644 --- a/apps/iot/src/database/micro_service_bo.erl +++ b/apps/iot/src/database/micro_service_bo.erl @@ -11,6 +11,13 @@ -export([get_service_config/1]). %% API --spec get_service_config(ServiceId :: binary()) -> {ok, DeviceInfo :: map()} | undefined. + +%% TODO +-spec get_service_config(ServiceId :: binary()) -> {ok, ConfigJson :: binary()} | error. get_service_config(ServiceId) when is_binary(ServiceId) -> - mysql_pool:get_row(mysql_iot, <<"SELECT * FROM micro_service WHERE id = ? LIMIT 1">>, [ServiceId]). \ No newline at end of file + case mysql_pool:get_row(mysql_iot, <<"SELECT * FROM micro_service WHERE id = ? LIMIT 1">>, [ServiceId]) of + undefined -> + error; + {ok, #{<<"config">> := Config}} -> + {ok, Config} + end. \ No newline at end of file diff --git a/apps/iot/src/endpoint/endpoint.erl b/apps/iot/src/endpoint/endpoint.erl index 8a47ac1..0026b97 100644 --- a/apps/iot/src/endpoint/endpoint.erl +++ b/apps/iot/src/endpoint/endpoint.erl @@ -13,33 +13,42 @@ %% API -export([start_link/1]). -export([get_name/1, get_pid/1, forward/4, reload/2, clean_up/1]). +-export([get_alias_pid/1]). +-export([config_equals/2]). %%%=================================================================== %%% API %%%=================================================================== -spec start_link(Endpoint :: #endpoint{}) -> {'ok', pid()} | 'ignore' | {'error', term()}. -start_link(Endpoint = #endpoint{name = Name0, config = #http_endpoint{}}) -> - Name = get_name(Name0), - endpoint_http:start_link(Name, Endpoint); -start_link(Endpoint = #endpoint{name = Name0, config = #mqtt_endpoint{}}) -> - Name = get_name(Name0), - endpoint_mqtt:start_link(Name, Endpoint); -start_link(Endpoint = #endpoint{name = Name0, config = #mysql_endpoint{}}) -> - Name = get_name(Name0), - endpoint_mysql:start_link(Name, Endpoint). +start_link(Endpoint = #endpoint{id = Id, name = Name, config = #http_endpoint{}}) -> + LocalName = get_name(Id), + AliasName = get_alias_name(Name), + endpoint_http:start_link(LocalName, AliasName, Endpoint); +start_link(Endpoint = #endpoint{id = Id, name = Name, config = #mqtt_endpoint{}}) -> + LocalName = get_name(Id), + AliasName = get_alias_name(Name), + endpoint_mqtt:start_link(LocalName, AliasName, Endpoint); +start_link(Endpoint = #endpoint{id = Id, name = Name, config = #mysql_endpoint{}}) -> + LocalName = get_name(Id), + AliasName = get_alias_name(Name), + endpoint_mysql:start_link(LocalName, AliasName, Endpoint). --spec get_name(Name :: binary()) -> atom(). -get_name(Name) when is_binary(Name) -> +-spec get_name(Id :: integer()) -> atom(). +get_name(Id) when is_integer(Id) -> + list_to_atom("endpoint:" ++ integer_to_list(Id)). + +-spec get_pid(Id :: integer()) -> undefined | pid(). +get_pid(Id) when is_binary(Id) -> + whereis(get_name(Id)). + +get_alias_name(Name) when is_binary(Name) -> list_to_atom("endpoint:" ++ binary_to_list(Name)). --spec get_pid(Name :: binary()) -> undefined | pid(). -get_pid(Name) when is_binary(Name) -> - whereis(get_name(Name)). +get_alias_pid(Name) when is_binary(Name) -> + global:whereis_name(get_alias_name(Name)). --spec forward(Pid :: undefined | pid(), ServiceId :: binary(), Format :: binary(), Metric :: integer()) -> no_return(). -forward(undefined, _, _, _) -> - ok; +-spec forward(Pid :: pid(), ServiceId :: binary(), Format :: binary(), Metric :: binary()) -> no_return(). forward(Pid, ServiceId, Format, Metric) when is_pid(Pid), is_binary(ServiceId), is_binary(Format), is_binary(Metric) -> gen_server:cast(Pid, {forward, ServiceId, Format, Metric}). diff --git a/apps/iot/src/endpoint/endpoint_http.erl b/apps/iot/src/endpoint/endpoint_http.erl index 5121d0c..906536a 100644 --- a/apps/iot/src/endpoint/endpoint_http.erl +++ b/apps/iot/src/endpoint/endpoint_http.erl @@ -13,7 +13,7 @@ -behaviour(gen_server). %% API --export([start_link/2]). +-export([start_link/3]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -30,10 +30,10 @@ %%%=================================================================== %% @doc Spawns the server and registers the local name (unique) --spec(start_link(Name :: atom(), Endpoint :: #endpoint{}) -> +-spec(start_link(LocalName :: atom(), AliasName :: atom(), Endpoint :: #endpoint{}) -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). -start_link(Name, Endpoint = #endpoint{config = #http_endpoint{}}) when is_atom(Name) -> - gen_server:start_link({local, Name}, ?MODULE, [Endpoint], []). +start_link(LocalName, AliasName, Endpoint = #endpoint{config = #http_endpoint{}}) when is_atom(LocalName), is_atom(AliasName) -> + gen_server:start_link({local, LocalName}, ?MODULE, [AliasName, Endpoint], []). %%%=================================================================== %%% gen_server callbacks @@ -44,8 +44,9 @@ start_link(Name, Endpoint = #endpoint{config = #http_endpoint{}}) when is_atom(N -spec(init(Args :: term()) -> {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). -init([Endpoint]) -> +init([AliasName, Endpoint]) -> Buffer = endpoint_buffer:new(Endpoint, 10), + iot_name_server:register(AliasName, self()), {ok, #state{endpoint = Endpoint, buffer = Buffer}}. %% @private diff --git a/apps/iot/src/endpoint/endpoint_mqtt.erl b/apps/iot/src/endpoint/endpoint_mqtt.erl index 787fefc..86a315f 100644 --- a/apps/iot/src/endpoint/endpoint_mqtt.erl +++ b/apps/iot/src/endpoint/endpoint_mqtt.erl @@ -13,7 +13,7 @@ -behaviour(gen_server). %% API --export([start_link/2]). +-export([start_link/3]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -41,8 +41,8 @@ %% @doc Creates a gen_statem process which calls Module:init/1 to %% initialize. To ensure a synchronized start-up procedure, this %% function does not return until Module:init/1 has returned. -start_link(Name, Endpoint = #endpoint{}) when is_atom(Name) -> - gen_server:start_link({local, Name}, ?MODULE, [Endpoint], []). +start_link(LocalName, AliasName, Endpoint = #endpoint{}) when is_atom(LocalName), is_atom(AliasName) -> + gen_server:start_link({local, LocalName}, ?MODULE, [AliasName, Endpoint], []). %%%=================================================================== %%% gen_statem callbacks @@ -52,7 +52,8 @@ start_link(Name, Endpoint = #endpoint{}) when is_atom(Name) -> %% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or %% gen_statem:start_link/[3,4], this function is called by the new %% process to initialize. -init([Endpoint]) -> +init([AliasName, Endpoint]) -> + iot_name_server:register(AliasName, self()), erlang:process_flag(trap_exit, true), %% 创建转发器, 避免阻塞当前进程的创建,因此采用了延时初始化的机制 erlang:start_timer(0, self(), create_postman), diff --git a/apps/iot/src/endpoint/endpoint_mysql.erl b/apps/iot/src/endpoint/endpoint_mysql.erl index 077f43c..45812b6 100644 --- a/apps/iot/src/endpoint/endpoint_mysql.erl +++ b/apps/iot/src/endpoint/endpoint_mysql.erl @@ -13,7 +13,7 @@ -behaviour(gen_server). %% API --export([start_link/2]). +-export([start_link/3]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -39,8 +39,8 @@ %% @doc Creates a gen_statem process which calls Module:init/1 to %% initialize. To ensure a synchronized start-up procedure, this %% function does not return until Module:init/1 has returned. -start_link(Name, Endpoint = #endpoint{}) when is_atom(Name) -> - gen_statem:start_link({local, Name}, ?MODULE, [Endpoint], []). +start_link(LocalName, AliasName, Endpoint = #endpoint{}) when is_atom(LocalName), is_atom(AliasName) -> + gen_statem:start_link({local, LocalName}, ?MODULE, [AliasName, Endpoint], []). %%%=================================================================== %%% gen_statem callbacks @@ -50,7 +50,8 @@ start_link(Name, Endpoint = #endpoint{}) when is_atom(Name) -> %% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or %% gen_statem:start_link/[3,4], this function is called by the new %% process to initialize. -init([Endpoint]) -> +init([AliasName, Endpoint]) -> + iot_name_server:register(AliasName, self()), erlang:process_flag(trap_exit, true), %% 创建转发器, 避免阻塞当前进程的创建,因此采用了延时初始化的机制 erlang:start_timer(0, self(), create_postman), @@ -129,13 +130,18 @@ handle_info({next_data, _Id, _Tuple}, State = #state{status = ?DISCONNECTED}) -> handle_info({next_data, Id, ServiceId, Format, Metric}, State = #state{status = ?CONNECTED, pool_pid = PoolPid, buffer = Buffer, endpoint = #endpoint{title = Title, config = #mysql_endpoint{table_name = Table, fields_map = FieldsMap}}}) -> - {ok, InsertSql, Values} = insert_sql(Table, ServiceId, Format, FieldsMap, Metric), - case poolboy:transaction(PoolPid, fun(ConnPid) -> mysql:query(ConnPid, InsertSql, Values) end) of - ok -> - NBuffer = endpoint_buffer:ack(Id, Buffer), - {noreply, State#state{buffer = NBuffer}}; - Error -> - lager:warning("[endpoint_mysql] endpoint: ~p, insert mysql get error: ~p", [Title, Error]), + case insert_sql(Table, ServiceId, Format, FieldsMap, Metric) of + {ok, InsertSql, Values} -> + case poolboy:transaction(PoolPid, fun(ConnPid) -> mysql:query(ConnPid, InsertSql, Values) end) of + ok -> + NBuffer = endpoint_buffer:ack(Id, Buffer), + {noreply, State#state{buffer = NBuffer}}; + Error -> + lager:warning("[endpoint_mysql] endpoint: ~p, insert mysql get error: ~p", [Title, Error]), + {noreply, State} + end; + error -> + lager:debug("[endpoint_mysql] endpoint: ~p, make sql error", [Title]), {noreply, State} end; @@ -176,7 +182,8 @@ code_change(_OldVsn, State = #state{}, _Extra) -> retry_connect() -> erlang:start_timer(?RETRY_INTERVAL, self(), create_postman). --spec insert_sql(Table :: binary(), ServiceId :: binary(), Format :: binary(), FieldsMap :: map(), Metric :: binary()) -> {ok, Sql :: binary(), Values :: list()}. +-spec insert_sql(Table :: binary(), ServiceId :: binary(), Format :: binary(), FieldsMap :: map(), Metric :: binary()) -> + error | {ok, Sql :: binary(), Values :: list()}. insert_sql(Table, ServiceId, <<"line">>, FieldsMap, Metric) when is_binary(Table), is_binary(ServiceId), is_binary(Metric) -> case line_format:parse(Metric) of error -> @@ -184,16 +191,16 @@ insert_sql(Table, ServiceId, <<"line">>, FieldsMap, Metric) when is_binary(Table {ok, #{<<"measurement">> := Measurement, <<"tags">> := Tags, <<"fields">> := Fields, <<"timestamp">> := Timestamp}} -> Map = maps:merge(Tags, Fields), NMap = Map#{<<"measurement">> => Measurement, <<"timestamp">> => Timestamp}, - lists:flatmap(fun({TableField, F}) -> + TableFields = lists:flatmap(fun({TableField, F}) -> case maps:find(F, NMap) of error -> []; {ok, Val} -> [{TableField, Val}] end - end, maps:to_list(FieldsMap)), + end, maps:to_list(FieldsMap)), - {Keys, Values} = kvs(Fields), + {Keys, Values} = kvs(TableFields), FieldSql = iolist_to_binary(lists:join(<<", ">>, Keys)), Placeholders = lists:duplicate(length(Keys), <<"?">>), ValuesPlaceholder = iolist_to_binary(lists:join(<<", ">>, Placeholders)), diff --git a/apps/iot/src/http_handlers/endpoint_handler.erl b/apps/iot/src/http_handlers/endpoint_handler.erl index 18ea043..6d34389 100644 --- a/apps/iot/src/http_handlers/endpoint_handler.erl +++ b/apps/iot/src/http_handlers/endpoint_handler.erl @@ -33,13 +33,18 @@ handle_request("POST", "/endpoint/start", _, #{<<"id">> := Id}) when is_integer( case endpoint_bo:get_endpoint(Id) of undefined -> {ok, 200, iot_util:json_error(404, <<"endpoint not found">>)}; - {ok, Endpoint = #endpoint{name = Name}} -> - case endpoint_sup:ensured_endpoint_started(Endpoint) of - {ok, Pid} when is_pid(Pid) -> - {ok, 200, iot_util:json_data(<<"success">>)}; - {error, Reason} -> - lager:warning("[endpoint_handler] start endpoint: ~p, get error: ~p", [Name, Reason]), - {ok, 200, iot_util:json_error(404, <<"start endpoint error">>)} + {ok, EndpointInfo} -> + case endpoint_bo:endpoint_record(EndpointInfo) of + {ok, Endpoint = #endpoint{name = Name}} -> + case endpoint_sup:ensured_endpoint_started(Endpoint) of + {ok, Pid} when is_pid(Pid) -> + {ok, 200, iot_util:json_data(<<"success">>)}; + {error, Reason} -> + lager:warning("[endpoint_handler] start endpoint: ~p, get error: ~p", [Name, Reason]), + {ok, 200, iot_util:json_error(404, <<"start endpoint error">>)} + end; + error -> + {ok, 200, iot_util:json_error(404, <<"endpoint invalid">>)} end end; @@ -61,19 +66,11 @@ handle_request("POST", "/endpoint/restart", _, #{<<"id">> := Id}) when is_intege case endpoint_bo:get_endpoint(Id) of undefined -> {ok, 200, iot_util:json_error(404, <<"endpoint not found">>)}; - {ok, Endpoint = #endpoint{name = Name}} -> - case endpoint:get_pid(Id) of - undefined -> - case endpoint_sup:ensured_endpoint_started(Endpoint) of - {ok, Pid} when is_pid(Pid) -> - {ok, 200, iot_util:json_data(<<"success">>)}; - {error, Reason} -> - lager:warning("[endpoint_handler] start endpoint: ~p, get error: ~p", [Name, Reason]), - {ok, 200, iot_util:json_error(404, <<"restart endpoint error">>)} - end; - Pid -> - case endpoint_sup:delete_endpoint(Id) of - ok -> + {ok, EndpointInfo} -> + case endpoint_bo:endpoint_record(EndpointInfo) of + {ok, Endpoint = #endpoint{name = Name}} -> + case endpoint:get_pid(Id) of + undefined -> case endpoint_sup:ensured_endpoint_started(Endpoint) of {ok, Pid} when is_pid(Pid) -> {ok, 200, iot_util:json_data(<<"success">>)}; @@ -81,10 +78,23 @@ handle_request("POST", "/endpoint/restart", _, #{<<"id">> := Id}) when is_intege lager:warning("[endpoint_handler] start endpoint: ~p, get error: ~p", [Name, Reason]), {ok, 200, iot_util:json_error(404, <<"restart endpoint error">>)} end; - {error, Reason} -> - lager:warning("[endpoint_handler] start endpoint: ~p, get error: ~p", [Name, Reason]), - {ok, 200, iot_util:json_error(404, <<"stop endpoint error">>)} - end + Pid -> + case endpoint_sup:delete_endpoint(Id) of + ok -> + case endpoint_sup:ensured_endpoint_started(Endpoint) of + {ok, Pid} when is_pid(Pid) -> + {ok, 200, iot_util:json_data(<<"success">>)}; + {error, Reason} -> + lager:warning("[endpoint_handler] start endpoint: ~p, get error: ~p", [Name, Reason]), + {ok, 200, iot_util:json_error(404, <<"restart endpoint error">>)} + end; + {error, Reason} -> + lager:warning("[endpoint_handler] start endpoint: ~p, get error: ~p", [Name, Reason]), + {ok, 200, iot_util:json_error(404, <<"stop endpoint error">>)} + end + end; + error -> + {ok, 200, iot_util:json_error(404, <<"endpoint invalid">>)} end end; diff --git a/apps/iot/src/iot.app.src b/apps/iot/src/iot.app.src index 000642f..a4c9956 100644 --- a/apps/iot/src/iot.app.src +++ b/apps/iot/src/iot.app.src @@ -18,7 +18,6 @@ mysql, gpb, esockd, - endpoint, mnesia, crypto, public_key, diff --git a/apps/iot/src/iot_name_server.erl b/apps/iot/src/iot_name_server.erl new file mode 100644 index 0000000..8bbc0c7 --- /dev/null +++ b/apps/iot/src/iot_name_server.erl @@ -0,0 +1,137 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2025, +%%% @doc +%%% +%%% @end +%%% Created : 17. 8月 2025 00:26 +%%%------------------------------------------------------------------- +-module(iot_name_server). +-author("anlicheng"). + +-behaviour(gen_server). + +%% API +-export([start_link/0]). +-export([whereis/1, register/2]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). +-define(TAB, iot_name_server). + +-record(state, { + %% #{Pid => Name} + pid_names = #{}, + refs = [] +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +-spec register(Name :: atom(), Pid :: pid()) -> ok. +register(Name, Pid) when is_atom(Name), is_pid(Pid) -> + gen_server:call(?SERVER, {register, Name, Pid}). + +-spec whereis(Name :: atom()) -> undefined | pid(). +whereis(Name) when is_atom(Name) -> + case ets:lookup(?TAB, Name) of + [] -> + undefined; + [{Name, Pid}|_] -> + case is_process_alive(Pid) of + true -> + Pid; + false -> + undefined + end + end. + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link() -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([]) -> + %% 初始化存储 + ets:new(?TAB, [named_table, ordered_set, public, {keypos, 1}]), + {ok, #state{}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call({register, Name, Pid}, _From, State = #state{refs = Refs, pid_names = PidNames}) -> + true = ets:insert(?TAB, {Name, Pid}), + MRef = erlang:monitor(process, Pid), + {reply, ok, State#state{refs = [MRef|Refs], pid_names = maps:put(Pid, Name, PidNames)}}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast(_Request, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info({'DOWN', MRef, process, Pid, Reason}, State = #state{refs = Refs, pid_names = PidNames}) -> + lager:debug("[iot_name_server] pid: ~p, down with reason: ~p", [Reason]), + case lists:member(MRef, Refs) of + true -> + case maps:take(Pid, PidNames) of + error -> + {noreply, State#state{refs = lists:delete(MRef, Refs)}}; + {Name, NPidNames} -> + true = ets:delete(?TAB, Name), + {noreply, State#state{pid_names = NPidNames, refs = lists:delete(MRef, Refs)}} + end; + false -> + {noreply, State} + end. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== diff --git a/apps/iot/src/iot_sup.erl b/apps/iot/src/iot_sup.erl index 413a134..5e60842 100644 --- a/apps/iot/src/iot_sup.erl +++ b/apps/iot/src/iot_sup.erl @@ -28,6 +28,15 @@ init([]) -> SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600}, Specs = [ + #{ + id => 'iot_name_server', + start => {'iot_name_server', start_link, []}, + restart => permanent, + shutdown => 2000, + type => worker, + modules => ['iot_name_server'] + }, + #{ id => 'iot_endpoint_sup', start => {'iot_endpoint_sup', start_link, []},