fix endpoint

This commit is contained in:
anlicheng 2025-08-12 18:02:16 +08:00
parent 87c3e46289
commit 923b2c7a9e
3 changed files with 48 additions and 238 deletions

View File

@ -11,7 +11,7 @@
-include("iot.hrl"). -include("iot.hrl").
%% API %% API
-export([get_all_endpoints/0, get_endpoint_by_id/1]). -export([get_all_endpoints/0, get_endpoint/1]).
-spec get_all_endpoints() -> [Endpoint :: map()]. -spec get_all_endpoints() -> [Endpoint :: map()].
get_all_endpoints() -> get_all_endpoints() ->
@ -22,6 +22,6 @@ get_all_endpoints() ->
[] []
end. end.
-spec get_endpoint_by_id(Id :: integer()) -> undefined | {ok, EndpointInfo :: map()}. -spec get_endpoint(Id :: integer()) -> undefined | {ok, EndpointInfo :: map()}.
get_endpoint_by_id(Id) when is_integer(Id) -> get_endpoint(Id) when is_integer(Id) ->
mysql_pool:get_row(mysql_iot, <<"SELECT * FROM endpoint WHERE id = ? and status = 1 LIMIT 1">>, [Id]). mysql_pool:get_row(mysql_iot, <<"SELECT * FROM endpoint WHERE id = ? and status = 1 LIMIT 1">>, [Id]).

View File

@ -3,7 +3,7 @@
%% @end %% @end
%%%------------------------------------------------------------------- %%%-------------------------------------------------------------------
-module(iot_endpoint_sup). -module(endpoint_sup).
-behaviour(supervisor). -behaviour(supervisor).
-include("endpoint.hrl"). -include("endpoint.hrl").
@ -47,6 +47,7 @@ ensured_endpoint_started(Endpoint = #endpoint{}) ->
{error, Error} {error, Error}
end. end.
-spec delete_endpoint(Id :: integer()) -> ok | {error, Reason :: any()}.
delete_endpoint(Id) when is_integer(Id) -> delete_endpoint(Id) when is_integer(Id) ->
Name = endpoint:get_name(Id), Name = endpoint:get_name(Id),
supervisor:terminate_child(?MODULE, Name), supervisor:terminate_child(?MODULE, Name),

View File

@ -17,256 +17,65 @@
%% helper methods %% helper methods
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
handle_request("POST", "/endpoint/create", _, #{<<"name">> := Name, <<"title">> := Title, handle_request("POST", "/endpoint/start", _, #{<<"id">> := Id}) when is_integer(Id) ->
<<"http">> := #{<<"url">> := Url, <<"pool_size">> := PoolSize}}) when is_binary(Name), is_binary(Title), is_binary(Url), is_integer(PoolSize) -> case endpoint_bo:get_endpoint(Id) of
case endpoint_mnesia:check_name(Name) of undefined ->
true ->
{ok, 200, iot_util:json_error(404, <<"endpoint name exists">>)};
false ->
Endpoint = #endpoint {
name = Name,
title = Title,
config = #http_endpoint{
url = Url,
pool_size = PoolSize
},
updated_at = iot_util:current_time(),
created_at = iot_util:current_time()
},
case endpoint_mnesia:insert(Endpoint) of
ok ->
{ok, 200, iot_util:json_data(<<"success">>)};
{error, Reason} ->
lager:warning("[endpoint_handler] create endpoint: ~p, get error: ~p", [Name, Reason]),
{ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)}
end;
{error, Reason} ->
lager:warning("[endpoint_handler] create endpoint: ~p, get error: ~p", [Name, Reason]),
{ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)}
end;
handle_request("POST", "/endpoint/create", _, #{<<"name">> := Name, <<"title">> := Title,
<<"mqtt">> := #{<<"host">> := Host, <<"port">> := Port, <<"client_id">> := ClientId, <<"username">> := Username, <<"password">> := Password, <<"topic">> := Topic, <<"qos">> := Qos}})
when is_binary(Name), is_binary(Title), is_binary(Host), is_integer(Port), is_binary(ClientId), is_binary(Username), is_binary(Password), is_binary(Topic), is_integer(Qos) ->
case endpoint_mnesia:check_name(Name) of
true ->
{ok, 200, iot_util:json_error(404, <<"endpoint name exists">>)};
false ->
Endpoint = #endpoint {
name = Name,
title = Title,
config = #mqtt_endpoint{
host = Host,
port = Port,
client_id = ClientId,
username = Username,
password = Password,
topic = Topic,
qos = Qos
},
updated_at = iot_util:current_time(),
created_at = iot_util:current_time()
},
case endpoint_mnesia:insert(Endpoint) of
ok ->
{ok, 200, iot_util:json_data(<<"success">>)};
{error, Reason} ->
lager:warning("[endpoint_handler] create endpoint: ~p, get error: ~p", [Name, Reason]),
{ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)}
end;
{error, Reason} ->
lager:warning("[endpoint_handler] create endpoint: ~p, get error: ~p", [Name, Reason]),
{ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)}
end;
handle_request("POST", "/endpoint/create", _, #{<<"name">> := Name, <<"title">> := Title,
<<"kafka">> := #{<<"username">> := Username, <<"password">> := Password, <<"topic">> := Topic, <<"bootstrap_servers">> := BootstrapServers}})
when is_binary(Name), is_binary(Title), is_binary(Password), is_binary(Topic), is_list(BootstrapServers) ->
case endpoint_mnesia:check_name(Name) of
true ->
{ok, 200, iot_util:json_error(404, <<"endpoint name exists">>)};
false ->
Endpoint = #endpoint {
name = Name,
title = Title,
config = #kafka_endpoint{
username = Username,
password = Password,
topic = Topic,
bootstrap_servers = BootstrapServers
},
updated_at = iot_util:current_time(),
created_at = iot_util:current_time()
},
case endpoint_mnesia:insert(Endpoint) of
ok ->
{ok, 200, iot_util:json_data(<<"success">>)};
{error, Reason} ->
lager:warning("[endpoint_handler] create endpoint: ~p, get error: ~p", [Name, Reason]),
{ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)}
end;
{error, Reason} ->
lager:warning("[endpoint_handler] create endpoint: ~p, get error: ~p", [Name, Reason]),
{ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)}
end;
handle_request("POST", "/endpoint/create", _, #{<<"name">> := Name, <<"title">> := Title,
<<"mysql">> := #{<<"host">> := Host, <<"port">> := Port, <<"username">> := Username, <<"password">> := Password, <<"database">> := Database, <<"table_name">> := TableName}})
when is_binary(Name), is_binary(Title), is_binary(Host), is_integer(Port), is_binary(Username), is_binary(Password), is_binary(Database), is_binary(TableName) ->
case endpoint_mnesia:check_name(Name) of
true ->
{ok, 200, iot_util:json_error(404, <<"endpoint name exists">>)};
false ->
Endpoint = #endpoint {
name = Name,
title = Title,
config = #mysql_endpoint{
host = Host,
port = Port,
username = Username,
password = Password,
database = Database,
table_name = TableName
},
updated_at = iot_util:current_time(),
created_at = iot_util:current_time()
},
case endpoint_mnesia:insert(Endpoint) of
ok ->
{ok, 200, iot_util:json_data(<<"success">>)};
{error, Reason} ->
lager:warning("[endpoint_handler] create endpoint: ~p, get error: ~p", [Name, Reason]),
{ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)}
end;
{error, Reason} ->
lager:warning("[endpoint_handler] create endpoint: ~p, get error: ~p", [Name, Reason]),
{ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)}
end;
%% endpoint信息
handle_request("POST", "/endpoint/update", _, #{<<"id">> := Id, <<"title">> := Title,
<<"http">> := #{<<"url">> := Url, <<"pool_size">> := PoolSize}}) when is_integer(Id), is_binary(Title), is_binary(Url), is_integer(PoolSize) ->
case endpoint_mnesia:get_endpoint(Id) of
error ->
{ok, 200, iot_util:json_error(404, <<"endpoint not found">>)}; {ok, 200, iot_util:json_error(404, <<"endpoint not found">>)};
{ok, Endpoint} -> {ok, Endpoint = #endpoint{name = Name}} ->
NEndpoint = Endpoint#endpoint { case endpoint_sup:ensured_endpoint_started(Endpoint) of
title = Title, {ok, Pid} when is_pid(Pid) ->
config = #http_endpoint{
url = Url,
pool_size = PoolSize
},
updated_at = iot_util:current_time()
},
case endpoint_mnesia:insert(NEndpoint) of
ok ->
{ok, 200, iot_util:json_data(<<"success">>)}; {ok, 200, iot_util:json_data(<<"success">>)};
{error, Reason} -> {error, Reason} ->
lager:warning("[endpoint_handler] update endpoint: ~p, get error: ~p", [Id, Reason]), lager:warning("[endpoint_handler] start endpoint: ~p, get error: ~p", [Name, Reason]),
{ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)} {ok, 200, iot_util:json_error(404, <<"start endpoint error">>)}
end end
end; end;
handle_request("POST", "/endpoint/update", _, #{<<"id">> := Id, <<"title">> := Title, handle_request("POST", "/endpoint/stop", _, #{<<"id">> := Id}) when is_integer(Id) ->
<<"mqtt">> := #{<<"host">> := Host, <<"port">> := Port, <<"client_id">> := ClientId, <<"username">> := Username, <<"password">> := Password, <<"topic">> := Topic, <<"qos">> := Qos}}) case endpoint_bo:get_endpoint(Id) of
when is_integer(Id), is_binary(Title), is_binary(Host), is_integer(Port), is_binary(ClientId), is_binary(Username), is_binary(Password), is_binary(Topic), is_integer(Qos) -> undefined ->
case endpoint_mnesia:get_endpoint(Id) of
error ->
{ok, 200, iot_util:json_error(404, <<"endpoint not found">>)}; {ok, 200, iot_util:json_error(404, <<"endpoint not found">>)};
{ok, Endpoint} -> {ok, _} ->
NEndpoint = Endpoint#endpoint { case endpoint_sup:delete_endpoint(Id) of
title = Title, {ok, Pid} when is_pid(Pid) ->
config = #mqtt_endpoint{
host = Host,
port = Port,
client_id = ClientId,
username = Username,
password = Password,
topic = Topic,
qos = Qos
},
updated_at = iot_util:current_time()
},
case endpoint_mnesia:insert(NEndpoint) of
ok ->
{ok, 200, iot_util:json_data(<<"success">>)}; {ok, 200, iot_util:json_data(<<"success">>)};
{error, Reason} -> {error, Reason} ->
lager:warning("[endpoint_handler] update endpoint: ~p, get error: ~p", [Id, Reason]), lager:warning("[endpoint_handler] stop endpoint id: ~p, get error: ~p", [Id, Reason]),
{ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)} {ok, 200, iot_util:json_error(404, <<"stop endpoint error">>)}
end end
end; end;
handle_request("POST", "/endpoint/update", _, #{<<"id">> := Id, <<"title">> := Title, handle_request("POST", "/endpoint/restart", _, #{<<"id">> := Id}) when is_integer(Id) ->
<<"kafka">> := #{<<"username">> := Username, <<"password">> := Password, <<"topic">> := Topic, <<"bootstrap_servers">> := BootstrapServers}}) case endpoint_bo:get_endpoint(Id) of
when is_integer(Id), is_binary(Title), is_binary(Password), is_binary(Topic), is_list(BootstrapServers) -> undefined ->
case endpoint_mnesia:get_endpoint(Id) of
error ->
{ok, 200, iot_util:json_error(404, <<"endpoint not found">>)}; {ok, 200, iot_util:json_error(404, <<"endpoint not found">>)};
{ok, Endpoint} -> {ok, Endpoint = #endpoint{name = Name}} ->
NEndpoint = Endpoint#endpoint { case endpoint:get_pid(Id) of
title = Title, undefined ->
config = #kafka_endpoint{ case endpoint_sup:ensured_endpoint_started(Endpoint) of
username = Username, {ok, Pid} when is_pid(Pid) ->
password = Password, {ok, 200, iot_util:json_data(<<"success">>)};
topic = Topic, {error, Reason} ->
bootstrap_servers = BootstrapServers lager:warning("[endpoint_handler] start endpoint: ~p, get error: ~p", [Name, Reason]),
}, {ok, 200, iot_util:json_error(404, <<"restart endpoint error">>)}
updated_at = iot_util:current_time() end;
}, Pid ->
case endpoint_mnesia:insert(NEndpoint) of case endpoint_sup:delete_endpoint(Id) of
ok -> ok ->
{ok, 200, iot_util:json_data(<<"success">>)}; case endpoint_sup:ensured_endpoint_started(Endpoint) of
{error, Reason} -> {ok, Pid} when is_pid(Pid) ->
lager:warning("[endpoint_handler] update endpoint: ~p, get error: ~p", [Id, Reason]), {ok, 200, iot_util:json_data(<<"success">>)};
{ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)} {error, Reason} ->
lager:warning("[endpoint_handler] start endpoint: ~p, get error: ~p", [Name, Reason]),
{ok, 200, iot_util:json_error(404, <<"restart endpoint error">>)}
end;
{error, Reason} ->
lager:warning("[endpoint_handler] start endpoint: ~p, get error: ~p", [Name, Reason]),
{ok, 200, iot_util:json_error(404, <<"stop endpoint error">>)}
end
end end
end; end;
handle_request("POST", "/endpoint/update", _, #{<<"id">> := Id, <<"title">> := Title,
<<"mysql">> := #{<<"host">> := Host, <<"port">> := Port, <<"username">> := Username, <<"password">> := Password, <<"database">> := Database, <<"table_name">> := TableName}})
when is_integer(Id), is_binary(Title), is_binary(Host), is_integer(Port), is_binary(Username), is_binary(Password), is_binary(Database), is_binary(TableName) ->
case endpoint_mnesia:get_endpoint(Id) of
error ->
{ok, 200, iot_util:json_error(404, <<"endpoint not found">>)};
{ok, Endpoint} ->
NEndpoint = Endpoint#endpoint {
title = Title,
config = #mysql_endpoint{
host = Host,
port = Port,
username = Username,
password = Password,
database = Database,
table_name = TableName
},
updated_at = iot_util:current_time()
},
case endpoint_mnesia:insert(NEndpoint) of
ok ->
{ok, 200, iot_util:json_data(<<"success">>)};
{error, Reason} ->
lager:warning("[endpoint_handler] update endpoint: ~p, get error: ~p", [Id, Reason]),
{ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)}
end
end;
%% Endpoint信息
handle_request("POST", "/endpoint/delete", _, #{<<"id">> := Id}) when is_integer(Id) ->
case endpoint_mnesia:delete(Id) of
ok ->
{ok, 200, iot_util:json_data(<<"success">>)};
{error, Reason} ->
lager:debug("[endpoint_handler] delete endpoint: ~p, get error: ~p", [Id, Reason]),
{ok, 200, iot_util:json_error(404, <<"delete endpoint failed">>)}
end;
handle_request(_, Path, _, _) -> handle_request(_, Path, _, _) ->
Path1 = list_to_binary(Path), Path1 = list_to_binary(Path),
{ok, 200, iot_util:json_error(-1, <<"url: ", Path1/binary, " not found">>)}. {ok, 200, iot_util:json_error(-1, <<"url: ", Path1/binary, " not found">>)}.