From 923b2c7a9e21ff1c17b5f585dd683c01ca4920c3 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Tue, 12 Aug 2025 18:02:16 +0800 Subject: [PATCH] fix endpoint --- apps/iot/src/endpoint/endpoint_bo.erl | 6 +- .../endpoint_sup.erl} | 3 +- apps/iot/src/http/endpoint_handler.erl | 277 +++--------------- 3 files changed, 48 insertions(+), 238 deletions(-) rename apps/iot/src/{iot_endpoint_sup.erl => endpoint/endpoint_sup.erl} (95%) diff --git a/apps/iot/src/endpoint/endpoint_bo.erl b/apps/iot/src/endpoint/endpoint_bo.erl index 843afa3..ba7a893 100644 --- a/apps/iot/src/endpoint/endpoint_bo.erl +++ b/apps/iot/src/endpoint/endpoint_bo.erl @@ -11,7 +11,7 @@ -include("iot.hrl"). %% API --export([get_all_endpoints/0, get_endpoint_by_id/1]). +-export([get_all_endpoints/0, get_endpoint/1]). -spec get_all_endpoints() -> [Endpoint :: map()]. get_all_endpoints() -> @@ -22,6 +22,6 @@ get_all_endpoints() -> [] end. --spec get_endpoint_by_id(Id :: integer()) -> undefined | {ok, EndpointInfo :: map()}. -get_endpoint_by_id(Id) when is_integer(Id) -> +-spec get_endpoint(Id :: integer()) -> undefined | {ok, EndpointInfo :: map()}. +get_endpoint(Id) when is_integer(Id) -> mysql_pool:get_row(mysql_iot, <<"SELECT * FROM endpoint WHERE id = ? and status = 1 LIMIT 1">>, [Id]). \ No newline at end of file diff --git a/apps/iot/src/iot_endpoint_sup.erl b/apps/iot/src/endpoint/endpoint_sup.erl similarity index 95% rename from apps/iot/src/iot_endpoint_sup.erl rename to apps/iot/src/endpoint/endpoint_sup.erl index 671a24f..7962a7e 100644 --- a/apps/iot/src/iot_endpoint_sup.erl +++ b/apps/iot/src/endpoint/endpoint_sup.erl @@ -3,7 +3,7 @@ %% @end %%%------------------------------------------------------------------- --module(iot_endpoint_sup). +-module(endpoint_sup). -behaviour(supervisor). -include("endpoint.hrl"). @@ -47,6 +47,7 @@ ensured_endpoint_started(Endpoint = #endpoint{}) -> {error, Error} end. +-spec delete_endpoint(Id :: integer()) -> ok | {error, Reason :: any()}. delete_endpoint(Id) when is_integer(Id) -> Name = endpoint:get_name(Id), supervisor:terminate_child(?MODULE, Name), diff --git a/apps/iot/src/http/endpoint_handler.erl b/apps/iot/src/http/endpoint_handler.erl index 27bcf38..e5a2a7a 100644 --- a/apps/iot/src/http/endpoint_handler.erl +++ b/apps/iot/src/http/endpoint_handler.erl @@ -17,256 +17,65 @@ %% helper methods %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -handle_request("POST", "/endpoint/create", _, #{<<"name">> := Name, <<"title">> := Title, - <<"http">> := #{<<"url">> := Url, <<"pool_size">> := PoolSize}}) when is_binary(Name), is_binary(Title), is_binary(Url), is_integer(PoolSize) -> - case endpoint_mnesia:check_name(Name) of - true -> - {ok, 200, iot_util:json_error(404, <<"endpoint name exists">>)}; - false -> - Endpoint = #endpoint { - name = Name, - title = Title, - config = #http_endpoint{ - url = Url, - pool_size = PoolSize - }, - updated_at = iot_util:current_time(), - created_at = iot_util:current_time() - }, - case endpoint_mnesia:insert(Endpoint) of - ok -> - {ok, 200, iot_util:json_data(<<"success">>)}; - {error, Reason} -> - lager:warning("[endpoint_handler] create endpoint: ~p, get error: ~p", [Name, Reason]), - {ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)} - end; - {error, Reason} -> - lager:warning("[endpoint_handler] create endpoint: ~p, get error: ~p", [Name, Reason]), - {ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)} - end; - -handle_request("POST", "/endpoint/create", _, #{<<"name">> := Name, <<"title">> := Title, - <<"mqtt">> := #{<<"host">> := Host, <<"port">> := Port, <<"client_id">> := ClientId, <<"username">> := Username, <<"password">> := Password, <<"topic">> := Topic, <<"qos">> := Qos}}) - when is_binary(Name), is_binary(Title), is_binary(Host), is_integer(Port), is_binary(ClientId), is_binary(Username), is_binary(Password), is_binary(Topic), is_integer(Qos) -> - - case endpoint_mnesia:check_name(Name) of - true -> - {ok, 200, iot_util:json_error(404, <<"endpoint name exists">>)}; - false -> - Endpoint = #endpoint { - name = Name, - title = Title, - config = #mqtt_endpoint{ - host = Host, - port = Port, - client_id = ClientId, - username = Username, - password = Password, - topic = Topic, - qos = Qos - }, - updated_at = iot_util:current_time(), - created_at = iot_util:current_time() - }, - case endpoint_mnesia:insert(Endpoint) of - ok -> - {ok, 200, iot_util:json_data(<<"success">>)}; - {error, Reason} -> - lager:warning("[endpoint_handler] create endpoint: ~p, get error: ~p", [Name, Reason]), - {ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)} - end; - {error, Reason} -> - lager:warning("[endpoint_handler] create endpoint: ~p, get error: ~p", [Name, Reason]), - {ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)} - end; - -handle_request("POST", "/endpoint/create", _, #{<<"name">> := Name, <<"title">> := Title, - <<"kafka">> := #{<<"username">> := Username, <<"password">> := Password, <<"topic">> := Topic, <<"bootstrap_servers">> := BootstrapServers}}) - when is_binary(Name), is_binary(Title), is_binary(Password), is_binary(Topic), is_list(BootstrapServers) -> - - case endpoint_mnesia:check_name(Name) of - true -> - {ok, 200, iot_util:json_error(404, <<"endpoint name exists">>)}; - false -> - Endpoint = #endpoint { - name = Name, - title = Title, - config = #kafka_endpoint{ - username = Username, - password = Password, - topic = Topic, - bootstrap_servers = BootstrapServers - }, - updated_at = iot_util:current_time(), - created_at = iot_util:current_time() - }, - case endpoint_mnesia:insert(Endpoint) of - ok -> - {ok, 200, iot_util:json_data(<<"success">>)}; - {error, Reason} -> - lager:warning("[endpoint_handler] create endpoint: ~p, get error: ~p", [Name, Reason]), - {ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)} - end; - {error, Reason} -> - lager:warning("[endpoint_handler] create endpoint: ~p, get error: ~p", [Name, Reason]), - {ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)} - end; - -handle_request("POST", "/endpoint/create", _, #{<<"name">> := Name, <<"title">> := Title, - <<"mysql">> := #{<<"host">> := Host, <<"port">> := Port, <<"username">> := Username, <<"password">> := Password, <<"database">> := Database, <<"table_name">> := TableName}}) - when is_binary(Name), is_binary(Title), is_binary(Host), is_integer(Port), is_binary(Username), is_binary(Password), is_binary(Database), is_binary(TableName) -> - - case endpoint_mnesia:check_name(Name) of - true -> - {ok, 200, iot_util:json_error(404, <<"endpoint name exists">>)}; - false -> - Endpoint = #endpoint { - name = Name, - title = Title, - config = #mysql_endpoint{ - host = Host, - port = Port, - username = Username, - password = Password, - database = Database, - table_name = TableName - }, - updated_at = iot_util:current_time(), - created_at = iot_util:current_time() - }, - case endpoint_mnesia:insert(Endpoint) of - ok -> - {ok, 200, iot_util:json_data(<<"success">>)}; - {error, Reason} -> - lager:warning("[endpoint_handler] create endpoint: ~p, get error: ~p", [Name, Reason]), - {ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)} - end; - {error, Reason} -> - lager:warning("[endpoint_handler] create endpoint: ~p, get error: ~p", [Name, Reason]), - {ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)} - end; - -%% 更新endpoint信息 -handle_request("POST", "/endpoint/update", _, #{<<"id">> := Id, <<"title">> := Title, - <<"http">> := #{<<"url">> := Url, <<"pool_size">> := PoolSize}}) when is_integer(Id), is_binary(Title), is_binary(Url), is_integer(PoolSize) -> - case endpoint_mnesia:get_endpoint(Id) of - error -> +handle_request("POST", "/endpoint/start", _, #{<<"id">> := Id}) when is_integer(Id) -> + case endpoint_bo:get_endpoint(Id) of + undefined -> {ok, 200, iot_util:json_error(404, <<"endpoint not found">>)}; - {ok, Endpoint} -> - NEndpoint = Endpoint#endpoint { - title = Title, - config = #http_endpoint{ - url = Url, - pool_size = PoolSize - }, - updated_at = iot_util:current_time() - }, - case endpoint_mnesia:insert(NEndpoint) of - ok -> + {ok, Endpoint = #endpoint{name = Name}} -> + case endpoint_sup:ensured_endpoint_started(Endpoint) of + {ok, Pid} when is_pid(Pid) -> {ok, 200, iot_util:json_data(<<"success">>)}; {error, Reason} -> - lager:warning("[endpoint_handler] update endpoint: ~p, get error: ~p", [Id, Reason]), - {ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)} + lager:warning("[endpoint_handler] start endpoint: ~p, get error: ~p", [Name, Reason]), + {ok, 200, iot_util:json_error(404, <<"start endpoint error">>)} end end; -handle_request("POST", "/endpoint/update", _, #{<<"id">> := Id, <<"title">> := Title, - <<"mqtt">> := #{<<"host">> := Host, <<"port">> := Port, <<"client_id">> := ClientId, <<"username">> := Username, <<"password">> := Password, <<"topic">> := Topic, <<"qos">> := Qos}}) - when is_integer(Id), is_binary(Title), is_binary(Host), is_integer(Port), is_binary(ClientId), is_binary(Username), is_binary(Password), is_binary(Topic), is_integer(Qos) -> - - case endpoint_mnesia:get_endpoint(Id) of - error -> +handle_request("POST", "/endpoint/stop", _, #{<<"id">> := Id}) when is_integer(Id) -> + case endpoint_bo:get_endpoint(Id) of + undefined -> {ok, 200, iot_util:json_error(404, <<"endpoint not found">>)}; - {ok, Endpoint} -> - NEndpoint = Endpoint#endpoint { - title = Title, - config = #mqtt_endpoint{ - host = Host, - port = Port, - client_id = ClientId, - username = Username, - password = Password, - topic = Topic, - qos = Qos - }, - updated_at = iot_util:current_time() - }, - case endpoint_mnesia:insert(NEndpoint) of - ok -> + {ok, _} -> + case endpoint_sup:delete_endpoint(Id) of + {ok, Pid} when is_pid(Pid) -> {ok, 200, iot_util:json_data(<<"success">>)}; {error, Reason} -> - lager:warning("[endpoint_handler] update endpoint: ~p, get error: ~p", [Id, Reason]), - {ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)} + lager:warning("[endpoint_handler] stop endpoint id: ~p, get error: ~p", [Id, Reason]), + {ok, 200, iot_util:json_error(404, <<"stop endpoint error">>)} end end; -handle_request("POST", "/endpoint/update", _, #{<<"id">> := Id, <<"title">> := Title, - <<"kafka">> := #{<<"username">> := Username, <<"password">> := Password, <<"topic">> := Topic, <<"bootstrap_servers">> := BootstrapServers}}) - when is_integer(Id), is_binary(Title), is_binary(Password), is_binary(Topic), is_list(BootstrapServers) -> - - case endpoint_mnesia:get_endpoint(Id) of - error -> +handle_request("POST", "/endpoint/restart", _, #{<<"id">> := Id}) when is_integer(Id) -> + case endpoint_bo:get_endpoint(Id) of + undefined -> {ok, 200, iot_util:json_error(404, <<"endpoint not found">>)}; - {ok, Endpoint} -> - NEndpoint = Endpoint#endpoint { - title = Title, - config = #kafka_endpoint{ - username = Username, - password = Password, - topic = Topic, - bootstrap_servers = BootstrapServers - }, - updated_at = iot_util:current_time() - }, - case endpoint_mnesia:insert(NEndpoint) of - ok -> - {ok, 200, iot_util:json_data(<<"success">>)}; - {error, Reason} -> - lager:warning("[endpoint_handler] update endpoint: ~p, get error: ~p", [Id, Reason]), - {ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)} + {ok, Endpoint = #endpoint{name = Name}} -> + case endpoint:get_pid(Id) of + undefined -> + case endpoint_sup:ensured_endpoint_started(Endpoint) of + {ok, Pid} when is_pid(Pid) -> + {ok, 200, iot_util:json_data(<<"success">>)}; + {error, Reason} -> + lager:warning("[endpoint_handler] start endpoint: ~p, get error: ~p", [Name, Reason]), + {ok, 200, iot_util:json_error(404, <<"restart endpoint error">>)} + end; + Pid -> + case endpoint_sup:delete_endpoint(Id) of + ok -> + case endpoint_sup:ensured_endpoint_started(Endpoint) of + {ok, Pid} when is_pid(Pid) -> + {ok, 200, iot_util:json_data(<<"success">>)}; + {error, Reason} -> + lager:warning("[endpoint_handler] start endpoint: ~p, get error: ~p", [Name, Reason]), + {ok, 200, iot_util:json_error(404, <<"restart endpoint error">>)} + end; + {error, Reason} -> + lager:warning("[endpoint_handler] start endpoint: ~p, get error: ~p", [Name, Reason]), + {ok, 200, iot_util:json_error(404, <<"stop endpoint error">>)} + end end end; -handle_request("POST", "/endpoint/update", _, #{<<"id">> := Id, <<"title">> := Title, - <<"mysql">> := #{<<"host">> := Host, <<"port">> := Port, <<"username">> := Username, <<"password">> := Password, <<"database">> := Database, <<"table_name">> := TableName}}) - when is_integer(Id), is_binary(Title), is_binary(Host), is_integer(Port), is_binary(Username), is_binary(Password), is_binary(Database), is_binary(TableName) -> - - case endpoint_mnesia:get_endpoint(Id) of - error -> - {ok, 200, iot_util:json_error(404, <<"endpoint not found">>)}; - {ok, Endpoint} -> - NEndpoint = Endpoint#endpoint { - title = Title, - config = #mysql_endpoint{ - host = Host, - port = Port, - username = Username, - password = Password, - database = Database, - table_name = TableName - }, - updated_at = iot_util:current_time() - }, - - case endpoint_mnesia:insert(NEndpoint) of - ok -> - {ok, 200, iot_util:json_data(<<"success">>)}; - {error, Reason} -> - lager:warning("[endpoint_handler] update endpoint: ~p, get error: ~p", [Id, Reason]), - {ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)} - end - end; - -%% 删除Endpoint信息 -handle_request("POST", "/endpoint/delete", _, #{<<"id">> := Id}) when is_integer(Id) -> - case endpoint_mnesia:delete(Id) of - ok -> - {ok, 200, iot_util:json_data(<<"success">>)}; - {error, Reason} -> - lager:debug("[endpoint_handler] delete endpoint: ~p, get error: ~p", [Id, Reason]), - {ok, 200, iot_util:json_error(404, <<"delete endpoint failed">>)} - end; - handle_request(_, Path, _, _) -> Path1 = list_to_binary(Path), {ok, 200, iot_util:json_error(-1, <<"url: ", Path1/binary, " not found">>)}. \ No newline at end of file