fix endpoints

This commit is contained in:
anlicheng 2025-11-11 22:28:28 +08:00
parent 304b83420d
commit 34cddd70f1
4 changed files with 11 additions and 11 deletions

View File

@ -59,7 +59,7 @@ clean_up(Pid) when is_pid(Pid) ->
gen_server:call(Pid, clean_up, 5000).
-spec endpoint_record(EndpointInfo :: #{}) -> error | {ok, Endpoint :: #endpoint{}}.
endpoint_record(#{<<"id">> := Id, <<"matcher">> := Matcher, <<"title">> := Title, <<"type">> := Type, <<"config_json">> := ConfigJson,
endpoint_record(#{<<"id">> := Id, <<"matcher">> := Matcher, <<"title">> := Title, <<"type">> := Type, <<"config">> := ConfigJson,
<<"status">> := Status, <<"updated_at">> := UpdatedAt, <<"created_at">> := CreatedAt}) ->
try
Config = parse_config(Type, ConfigJson),
@ -76,7 +76,8 @@ endpoint_record(#{<<"id">> := Id, <<"matcher">> := Matcher, <<"title">> := Title
error
end.
parse_config(<<"mqtt">>, #{<<"host">> := Host, <<"port">> := Port, <<"client_id">> := ClientId, <<"username">> := Username, <<"password">> := Password, <<"topic">> := Topic, <<"qos">> := Qos}) ->
parse_config(<<"mqtt">>, #{<<"host">> := Host, <<"port">> := Port0, <<"client_id">> := ClientId, <<"username">> := Username, <<"password">> := Password, <<"topic">> := Topic, <<"qos">> := Qos}) ->
Port = if is_binary(Port0) -> binary_to_integer(Port0); is_integer(Port0) -> Port0 end,
#mqtt_endpoint{
host = Host,
port = Port,

View File

@ -95,7 +95,7 @@ handle_cast({forward, ServiceId, Metric}, State = #state{buffer = Buffer}) ->
{stop, Reason :: term(), NewState :: #state{}}).
handle_info({timeout, _, create_postman}, State = #state{buffer = Buffer, status = ?DISCONNECTED,
endpoint = #endpoint{title = Title, config = #mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, client_id = ClientId}}}) ->
lager:debug("[endpoint_mqtt] endpoint: ~p, create postman", [Title]),
lager:debug("[endpoint_mqtt] endpoint: ~ts, create postman", [Title]),
Opts = [
{owner, self()},
{clientid, ClientId},

View File

@ -30,13 +30,12 @@ start_link() ->
init([]) ->
SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600},
Endpoints = iot_api:get_all_endpoints(),
lager:debug("ep: ~p", [Endpoints]),
ChildSpecs = lists:flatmap(fun(EndpointInfo) ->
case endpoint:endpoint_record(EndpointInfo) of
error ->
[];
{ok, Endpoint} ->
[Endpoint]
[child_spec(Endpoint)]
end
end, Endpoints),
{ok, {SupFlags, ChildSpecs}}.

View File

@ -35,12 +35,12 @@ handle_request("POST", "/endpoint/start", _, #{<<"id">> := Id}) when is_integer(
{ok, 200, iot_util:json_error(404, <<"endpoint not found">>)};
{ok, EndpointInfo} ->
case endpoint:endpoint_record(EndpointInfo) of
{ok, Endpoint = #endpoint{name = Name}} ->
{ok, Endpoint = #endpoint{title = Title}} ->
case endpoint_sup:ensured_endpoint_started(Endpoint) of
{ok, Pid} when is_pid(Pid) ->
{ok, 200, iot_util:json_data(<<"success">>)};
{error, Reason} ->
lager:warning("[endpoint_handler] start endpoint: ~p, get error: ~p", [Name, Reason]),
lager:warning("[endpoint_handler] start endpoint: ~p, get error: ~p", [Title, Reason]),
{ok, 200, iot_util:json_error(404, <<"start endpoint error">>)}
end;
error ->
@ -68,14 +68,14 @@ handle_request("POST", "/endpoint/restart", _, #{<<"id">> := Id}) when is_intege
{ok, 200, iot_util:json_error(404, <<"endpoint not found">>)};
{ok, EndpointInfo} ->
case endpoint:endpoint_record(EndpointInfo) of
{ok, Endpoint = #endpoint{name = Name}} ->
{ok, Endpoint = #endpoint{title = Title}} ->
case endpoint:get_pid(Id) of
undefined ->
case endpoint_sup:ensured_endpoint_started(Endpoint) of
{ok, Pid} when is_pid(Pid) ->
{ok, 200, iot_util:json_data(<<"success">>)};
{error, Reason} ->
lager:warning("[endpoint_handler] start endpoint: ~p, get error: ~p", [Name, Reason]),
lager:warning("[endpoint_handler] start endpoint: ~p, get error: ~p", [Title, Reason]),
{ok, 200, iot_util:json_error(404, <<"restart endpoint error">>)}
end;
Pid ->
@ -85,11 +85,11 @@ handle_request("POST", "/endpoint/restart", _, #{<<"id">> := Id}) when is_intege
{ok, Pid} when is_pid(Pid) ->
{ok, 200, iot_util:json_data(<<"success">>)};
{error, Reason} ->
lager:warning("[endpoint_handler] start endpoint: ~p, get error: ~p", [Name, Reason]),
lager:warning("[endpoint_handler] start endpoint: ~p, get error: ~p", [Title, Reason]),
{ok, 200, iot_util:json_error(404, <<"restart endpoint error">>)}
end;
{error, Reason} ->
lager:warning("[endpoint_handler] start endpoint: ~p, get error: ~p", [Name, Reason]),
lager:warning("[endpoint_handler] start endpoint: ~p, get error: ~p", [Title, Reason]),
{ok, 200, iot_util:json_error(404, <<"stop endpoint error">>)}
end
end;