add name server

This commit is contained in:
anlicheng 2025-08-17 00:49:37 +08:00
parent 4495fd0774
commit 9599185742
10 changed files with 249 additions and 69 deletions

View File

@ -13,7 +13,7 @@
-export([parse/1]).
%% <<"cpu,name=xyz,uuid=\"this,=isx=test,\" key=345,key1=23.5,key2=yes 123457">>,
-spec parse(Metric :: binary()) -> map().
-spec parse(Metric :: binary()) -> error | {ok, map()}.
parse(Metric) when is_binary(Metric) ->
case catch lexer(Metric) of
[[Measurement|Tags], Fields, [Timestamp|_]] ->

View File

@ -11,6 +11,13 @@
-export([get_service_config/1]).
%% API
-spec get_service_config(ServiceId :: binary()) -> {ok, DeviceInfo :: map()} | undefined.
%% TODO
-spec get_service_config(ServiceId :: binary()) -> {ok, ConfigJson :: binary()} | error.
get_service_config(ServiceId) when is_binary(ServiceId) ->
mysql_pool:get_row(mysql_iot, <<"SELECT * FROM micro_service WHERE id = ? LIMIT 1">>, [ServiceId]).
case mysql_pool:get_row(mysql_iot, <<"SELECT * FROM micro_service WHERE id = ? LIMIT 1">>, [ServiceId]) of
undefined ->
error;
{ok, #{<<"config">> := Config}} ->
{ok, Config}
end.

View File

@ -13,33 +13,42 @@
%% API
-export([start_link/1]).
-export([get_name/1, get_pid/1, forward/4, reload/2, clean_up/1]).
-export([get_alias_pid/1]).
-export([config_equals/2]).
%%%===================================================================
%%% API
%%%===================================================================
-spec start_link(Endpoint :: #endpoint{}) -> {'ok', pid()} | 'ignore' | {'error', term()}.
start_link(Endpoint = #endpoint{name = Name0, config = #http_endpoint{}}) ->
Name = get_name(Name0),
endpoint_http:start_link(Name, Endpoint);
start_link(Endpoint = #endpoint{name = Name0, config = #mqtt_endpoint{}}) ->
Name = get_name(Name0),
endpoint_mqtt:start_link(Name, Endpoint);
start_link(Endpoint = #endpoint{name = Name0, config = #mysql_endpoint{}}) ->
Name = get_name(Name0),
endpoint_mysql:start_link(Name, Endpoint).
start_link(Endpoint = #endpoint{id = Id, name = Name, config = #http_endpoint{}}) ->
LocalName = get_name(Id),
AliasName = get_alias_name(Name),
endpoint_http:start_link(LocalName, AliasName, Endpoint);
start_link(Endpoint = #endpoint{id = Id, name = Name, config = #mqtt_endpoint{}}) ->
LocalName = get_name(Id),
AliasName = get_alias_name(Name),
endpoint_mqtt:start_link(LocalName, AliasName, Endpoint);
start_link(Endpoint = #endpoint{id = Id, name = Name, config = #mysql_endpoint{}}) ->
LocalName = get_name(Id),
AliasName = get_alias_name(Name),
endpoint_mysql:start_link(LocalName, AliasName, Endpoint).
-spec get_name(Name :: binary()) -> atom().
get_name(Name) when is_binary(Name) ->
-spec get_name(Id :: integer()) -> atom().
get_name(Id) when is_integer(Id) ->
list_to_atom("endpoint:" ++ integer_to_list(Id)).
-spec get_pid(Id :: integer()) -> undefined | pid().
get_pid(Id) when is_binary(Id) ->
whereis(get_name(Id)).
get_alias_name(Name) when is_binary(Name) ->
list_to_atom("endpoint:" ++ binary_to_list(Name)).
-spec get_pid(Name :: binary()) -> undefined | pid().
get_pid(Name) when is_binary(Name) ->
whereis(get_name(Name)).
get_alias_pid(Name) when is_binary(Name) ->
global:whereis_name(get_alias_name(Name)).
-spec forward(Pid :: undefined | pid(), ServiceId :: binary(), Format :: binary(), Metric :: integer()) -> no_return().
forward(undefined, _, _, _) ->
ok;
-spec forward(Pid :: pid(), ServiceId :: binary(), Format :: binary(), Metric :: binary()) -> no_return().
forward(Pid, ServiceId, Format, Metric) when is_pid(Pid), is_binary(ServiceId), is_binary(Format), is_binary(Metric) ->
gen_server:cast(Pid, {forward, ServiceId, Format, Metric}).

View File

@ -13,7 +13,7 @@
-behaviour(gen_server).
%% API
-export([start_link/2]).
-export([start_link/3]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
@ -30,10 +30,10 @@
%%%===================================================================
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link(Name :: atom(), Endpoint :: #endpoint{}) ->
-spec(start_link(LocalName :: atom(), AliasName :: atom(), Endpoint :: #endpoint{}) ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link(Name, Endpoint = #endpoint{config = #http_endpoint{}}) when is_atom(Name) ->
gen_server:start_link({local, Name}, ?MODULE, [Endpoint], []).
start_link(LocalName, AliasName, Endpoint = #endpoint{config = #http_endpoint{}}) when is_atom(LocalName), is_atom(AliasName) ->
gen_server:start_link({local, LocalName}, ?MODULE, [AliasName, Endpoint], []).
%%%===================================================================
%%% gen_server callbacks
@ -44,8 +44,9 @@ start_link(Name, Endpoint = #endpoint{config = #http_endpoint{}}) when is_atom(N
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([Endpoint]) ->
init([AliasName, Endpoint]) ->
Buffer = endpoint_buffer:new(Endpoint, 10),
iot_name_server:register(AliasName, self()),
{ok, #state{endpoint = Endpoint, buffer = Buffer}}.
%% @private

View File

@ -13,7 +13,7 @@
-behaviour(gen_server).
%% API
-export([start_link/2]).
-export([start_link/3]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
@ -41,8 +41,8 @@
%% @doc Creates a gen_statem process which calls Module:init/1 to
%% initialize. To ensure a synchronized start-up procedure, this
%% function does not return until Module:init/1 has returned.
start_link(Name, Endpoint = #endpoint{}) when is_atom(Name) ->
gen_server:start_link({local, Name}, ?MODULE, [Endpoint], []).
start_link(LocalName, AliasName, Endpoint = #endpoint{}) when is_atom(LocalName), is_atom(AliasName) ->
gen_server:start_link({local, LocalName}, ?MODULE, [AliasName, Endpoint], []).
%%%===================================================================
%%% gen_statem callbacks
@ -52,7 +52,8 @@ start_link(Name, Endpoint = #endpoint{}) when is_atom(Name) ->
%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or
%% gen_statem:start_link/[3,4], this function is called by the new
%% process to initialize.
init([Endpoint]) ->
init([AliasName, Endpoint]) ->
iot_name_server:register(AliasName, self()),
erlang:process_flag(trap_exit, true),
%% ,
erlang:start_timer(0, self(), create_postman),

View File

@ -13,7 +13,7 @@
-behaviour(gen_server).
%% API
-export([start_link/2]).
-export([start_link/3]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
@ -39,8 +39,8 @@
%% @doc Creates a gen_statem process which calls Module:init/1 to
%% initialize. To ensure a synchronized start-up procedure, this
%% function does not return until Module:init/1 has returned.
start_link(Name, Endpoint = #endpoint{}) when is_atom(Name) ->
gen_statem:start_link({local, Name}, ?MODULE, [Endpoint], []).
start_link(LocalName, AliasName, Endpoint = #endpoint{}) when is_atom(LocalName), is_atom(AliasName) ->
gen_statem:start_link({local, LocalName}, ?MODULE, [AliasName, Endpoint], []).
%%%===================================================================
%%% gen_statem callbacks
@ -50,7 +50,8 @@ start_link(Name, Endpoint = #endpoint{}) when is_atom(Name) ->
%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or
%% gen_statem:start_link/[3,4], this function is called by the new
%% process to initialize.
init([Endpoint]) ->
init([AliasName, Endpoint]) ->
iot_name_server:register(AliasName, self()),
erlang:process_flag(trap_exit, true),
%% ,
erlang:start_timer(0, self(), create_postman),
@ -129,13 +130,18 @@ handle_info({next_data, _Id, _Tuple}, State = #state{status = ?DISCONNECTED}) ->
handle_info({next_data, Id, ServiceId, Format, Metric}, State = #state{status = ?CONNECTED, pool_pid = PoolPid, buffer = Buffer,
endpoint = #endpoint{title = Title, config = #mysql_endpoint{table_name = Table, fields_map = FieldsMap}}}) ->
{ok, InsertSql, Values} = insert_sql(Table, ServiceId, Format, FieldsMap, Metric),
case poolboy:transaction(PoolPid, fun(ConnPid) -> mysql:query(ConnPid, InsertSql, Values) end) of
ok ->
NBuffer = endpoint_buffer:ack(Id, Buffer),
{noreply, State#state{buffer = NBuffer}};
Error ->
lager:warning("[endpoint_mysql] endpoint: ~p, insert mysql get error: ~p", [Title, Error]),
case insert_sql(Table, ServiceId, Format, FieldsMap, Metric) of
{ok, InsertSql, Values} ->
case poolboy:transaction(PoolPid, fun(ConnPid) -> mysql:query(ConnPid, InsertSql, Values) end) of
ok ->
NBuffer = endpoint_buffer:ack(Id, Buffer),
{noreply, State#state{buffer = NBuffer}};
Error ->
lager:warning("[endpoint_mysql] endpoint: ~p, insert mysql get error: ~p", [Title, Error]),
{noreply, State}
end;
error ->
lager:debug("[endpoint_mysql] endpoint: ~p, make sql error", [Title]),
{noreply, State}
end;
@ -176,7 +182,8 @@ code_change(_OldVsn, State = #state{}, _Extra) ->
retry_connect() ->
erlang:start_timer(?RETRY_INTERVAL, self(), create_postman).
-spec insert_sql(Table :: binary(), ServiceId :: binary(), Format :: binary(), FieldsMap :: map(), Metric :: binary()) -> {ok, Sql :: binary(), Values :: list()}.
-spec insert_sql(Table :: binary(), ServiceId :: binary(), Format :: binary(), FieldsMap :: map(), Metric :: binary()) ->
error | {ok, Sql :: binary(), Values :: list()}.
insert_sql(Table, ServiceId, <<"line">>, FieldsMap, Metric) when is_binary(Table), is_binary(ServiceId), is_binary(Metric) ->
case line_format:parse(Metric) of
error ->
@ -184,16 +191,16 @@ insert_sql(Table, ServiceId, <<"line">>, FieldsMap, Metric) when is_binary(Table
{ok, #{<<"measurement">> := Measurement, <<"tags">> := Tags, <<"fields">> := Fields, <<"timestamp">> := Timestamp}} ->
Map = maps:merge(Tags, Fields),
NMap = Map#{<<"measurement">> => Measurement, <<"timestamp">> => Timestamp},
lists:flatmap(fun({TableField, F}) ->
TableFields = lists:flatmap(fun({TableField, F}) ->
case maps:find(F, NMap) of
error ->
[];
{ok, Val} ->
[{TableField, Val}]
end
end, maps:to_list(FieldsMap)),
end, maps:to_list(FieldsMap)),
{Keys, Values} = kvs(Fields),
{Keys, Values} = kvs(TableFields),
FieldSql = iolist_to_binary(lists:join(<<", ">>, Keys)),
Placeholders = lists:duplicate(length(Keys), <<"?">>),
ValuesPlaceholder = iolist_to_binary(lists:join(<<", ">>, Placeholders)),

View File

@ -33,13 +33,18 @@ handle_request("POST", "/endpoint/start", _, #{<<"id">> := Id}) when is_integer(
case endpoint_bo:get_endpoint(Id) of
undefined ->
{ok, 200, iot_util:json_error(404, <<"endpoint not found">>)};
{ok, Endpoint = #endpoint{name = Name}} ->
case endpoint_sup:ensured_endpoint_started(Endpoint) of
{ok, Pid} when is_pid(Pid) ->
{ok, 200, iot_util:json_data(<<"success">>)};
{error, Reason} ->
lager:warning("[endpoint_handler] start endpoint: ~p, get error: ~p", [Name, Reason]),
{ok, 200, iot_util:json_error(404, <<"start endpoint error">>)}
{ok, EndpointInfo} ->
case endpoint_bo:endpoint_record(EndpointInfo) of
{ok, Endpoint = #endpoint{name = Name}} ->
case endpoint_sup:ensured_endpoint_started(Endpoint) of
{ok, Pid} when is_pid(Pid) ->
{ok, 200, iot_util:json_data(<<"success">>)};
{error, Reason} ->
lager:warning("[endpoint_handler] start endpoint: ~p, get error: ~p", [Name, Reason]),
{ok, 200, iot_util:json_error(404, <<"start endpoint error">>)}
end;
error ->
{ok, 200, iot_util:json_error(404, <<"endpoint invalid">>)}
end
end;
@ -61,19 +66,11 @@ handle_request("POST", "/endpoint/restart", _, #{<<"id">> := Id}) when is_intege
case endpoint_bo:get_endpoint(Id) of
undefined ->
{ok, 200, iot_util:json_error(404, <<"endpoint not found">>)};
{ok, Endpoint = #endpoint{name = Name}} ->
case endpoint:get_pid(Id) of
undefined ->
case endpoint_sup:ensured_endpoint_started(Endpoint) of
{ok, Pid} when is_pid(Pid) ->
{ok, 200, iot_util:json_data(<<"success">>)};
{error, Reason} ->
lager:warning("[endpoint_handler] start endpoint: ~p, get error: ~p", [Name, Reason]),
{ok, 200, iot_util:json_error(404, <<"restart endpoint error">>)}
end;
Pid ->
case endpoint_sup:delete_endpoint(Id) of
ok ->
{ok, EndpointInfo} ->
case endpoint_bo:endpoint_record(EndpointInfo) of
{ok, Endpoint = #endpoint{name = Name}} ->
case endpoint:get_pid(Id) of
undefined ->
case endpoint_sup:ensured_endpoint_started(Endpoint) of
{ok, Pid} when is_pid(Pid) ->
{ok, 200, iot_util:json_data(<<"success">>)};
@ -81,10 +78,23 @@ handle_request("POST", "/endpoint/restart", _, #{<<"id">> := Id}) when is_intege
lager:warning("[endpoint_handler] start endpoint: ~p, get error: ~p", [Name, Reason]),
{ok, 200, iot_util:json_error(404, <<"restart endpoint error">>)}
end;
{error, Reason} ->
lager:warning("[endpoint_handler] start endpoint: ~p, get error: ~p", [Name, Reason]),
{ok, 200, iot_util:json_error(404, <<"stop endpoint error">>)}
end
Pid ->
case endpoint_sup:delete_endpoint(Id) of
ok ->
case endpoint_sup:ensured_endpoint_started(Endpoint) of
{ok, Pid} when is_pid(Pid) ->
{ok, 200, iot_util:json_data(<<"success">>)};
{error, Reason} ->
lager:warning("[endpoint_handler] start endpoint: ~p, get error: ~p", [Name, Reason]),
{ok, 200, iot_util:json_error(404, <<"restart endpoint error">>)}
end;
{error, Reason} ->
lager:warning("[endpoint_handler] start endpoint: ~p, get error: ~p", [Name, Reason]),
{ok, 200, iot_util:json_error(404, <<"stop endpoint error">>)}
end
end;
error ->
{ok, 200, iot_util:json_error(404, <<"endpoint invalid">>)}
end
end;

View File

@ -18,7 +18,6 @@
mysql,
gpb,
esockd,
endpoint,
mnesia,
crypto,
public_key,

View File

@ -0,0 +1,137 @@
%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2025, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 17. 8 2025 00:26
%%%-------------------------------------------------------------------
-module(iot_name_server).
-author("anlicheng").
-behaviour(gen_server).
%% API
-export([start_link/0]).
-export([whereis/1, register/2]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-define(TAB, iot_name_server).
-record(state, {
%% #{Pid => Name}
pid_names = #{},
refs = []
}).
%%%===================================================================
%%% API
%%%===================================================================
-spec register(Name :: atom(), Pid :: pid()) -> ok.
register(Name, Pid) when is_atom(Name), is_pid(Pid) ->
gen_server:call(?SERVER, {register, Name, Pid}).
-spec whereis(Name :: atom()) -> undefined | pid().
whereis(Name) when is_atom(Name) ->
case ets:lookup(?TAB, Name) of
[] ->
undefined;
[{Name, Pid}|_] ->
case is_process_alive(Pid) of
true ->
Pid;
false ->
undefined
end
end.
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link() ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% @private
%% @doc Initializes the server
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([]) ->
%%
ets:new(?TAB, [named_table, ordered_set, public, {keypos, 1}]),
{ok, #state{}}.
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call({register, Name, Pid}, _From, State = #state{refs = Refs, pid_names = PidNames}) ->
true = ets:insert(?TAB, {Name, Pid}),
MRef = erlang:monitor(process, Pid),
{reply, ok, State#state{refs = [MRef|Refs], pid_names = maps:put(Pid, Name, PidNames)}}.
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast(_Request, State = #state{}) ->
{noreply, State}.
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info({'DOWN', MRef, process, Pid, Reason}, State = #state{refs = Refs, pid_names = PidNames}) ->
lager:debug("[iot_name_server] pid: ~p, down with reason: ~p", [Reason]),
case lists:member(MRef, Refs) of
true ->
case maps:take(Pid, PidNames) of
error ->
{noreply, State#state{refs = lists:delete(MRef, Refs)}};
{Name, NPidNames} ->
true = ets:delete(?TAB, Name),
{noreply, State#state{pid_names = NPidNames, refs = lists:delete(MRef, Refs)}}
end;
false ->
{noreply, State}
end.
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(_Reason, _State = #state{}) ->
ok.
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================

View File

@ -28,6 +28,15 @@ init([]) ->
SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600},
Specs = [
#{
id => 'iot_name_server',
start => {'iot_name_server', start_link, []},
restart => permanent,
shutdown => 2000,
type => worker,
modules => ['iot_name_server']
},
#{
id => 'iot_endpoint_sup',
start => {'iot_endpoint_sup', start_link, []},