fix endpoint

This commit is contained in:
anlicheng 2025-08-13 15:18:02 +08:00
parent 15be60ca08
commit 2e3e34efa4
3 changed files with 64 additions and 4 deletions

View File

@ -47,6 +47,7 @@
title = <<>> :: binary(),
%% , : #{<<"protocol">> => <<"http|https|ws|kafka|mqtt">>, <<"args">> => #{}}
config = #http_endpoint{} :: #http_endpoint{} | #mqtt_endpoint{} | #kafka_endpoint{} | #mysql_endpoint{},
status = 0,
%%
updated_at = 0 :: integer(),
%%

View File

@ -8,10 +8,11 @@
%%%-------------------------------------------------------------------
-module(endpoint_bo).
-author("aresei").
-include("iot.hrl").
-include("endpoint.hrl").
%% API
-export([get_all_endpoints/0, get_endpoint/1]).
-export([endpoint_record/1]).
-spec get_all_endpoints() -> [Endpoint :: map()].
get_all_endpoints() ->
@ -24,4 +25,56 @@ get_all_endpoints() ->
-spec get_endpoint(Id :: integer()) -> undefined | {ok, EndpointInfo :: map()}.
get_endpoint(Id) when is_integer(Id) ->
mysql_pool:get_row(mysql_iot, <<"SELECT * FROM endpoint WHERE id = ? and status = 1 LIMIT 1">>, [Id]).
mysql_pool:get_row(mysql_iot, <<"SELECT * FROM endpoint WHERE id = ? and status = 1 LIMIT 1">>, [Id]).
-spec endpoint_record(EndpointInfo :: #{}) -> error | {ok, Endpoint :: #endpoint{}}.
endpoint_record(#{<<"id">> := Id, <<"name">> := Name, <<"title">> := Title, <<"type">> := Type, <<"config_json">> := ConfigJson,
<<"status">> := Status, <<"updated_at">> := UpdatedAt, <<"created_at">> := CreatedAt}) ->
try
Config = parse_config(Type, catch jiffy:decode(ConfigJson, [return_maps])),
{ok, #endpoint {
id = Id,
name = Name,
title = Title,
config = Config,
status = Status,
updated_at = UpdatedAt,
created_at = CreatedAt
}}
catch throw:_ ->
error
end.
parse_config(<<"mysql">>, #{<<"host">> := Host, <<"port">> := Port, <<"username">> := Username, <<"password">> := Password, <<"database">> := Database, <<"table_name">> := TableName}) ->
#mysql_endpoint{
host = Host,
port = Port,
username = Username,
password = Password,
database = Database,
table_name = TableName
};
parse_config(<<"mqtt">>, #{<<"host">> := Host, <<"port">> := Port, <<"client_id">> := ClientId, <<"username">> := Username, <<"password">> := Password, <<"topic">> := Topic, <<"qos">> := Qos}) ->
#mqtt_endpoint{
host = Host,
port = Port,
client_id = ClientId,
username = Username,
password = Password,
topic = Topic,
qos = Qos
};
parse_config(<<"http">>, #{<<"url">> := Url, <<"pool_size">> := PoolSize}) ->
#http_endpoint{
url = Url,
pool_size = PoolSize
};
parse_config(<<"kafka">>, #{<<"username">> := Username, <<"password">> := Password, <<"bootstrap_servers">> := BootstrapServers, <<"topic">> := Topic}) ->
#kafka_endpoint{
username = Username,
password = Password,
bootstrap_servers = BootstrapServers,
topic = Topic
};
parse_config(_, _) ->
throw(invalid_config).

View File

@ -30,8 +30,14 @@ start_link() ->
init([]) ->
SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600},
{ok, Endpoints} = endpoint_bo:get_all_endpoints(),
ChildSpecs = lists:map(fun child_spec/1, Endpoints),
ChildSpecs = lists:flatmap(fun(EndpointInfo) ->
case endpoint_bo:endpoint_record(EndpointInfo) of
error ->
[];
{ok, Endpoint} ->
[Endpoint]
end
end, Endpoints),
{ok, {SupFlags, ChildSpecs}}.
%% internal functions