diff --git a/apps/iot/include/endpoint.hrl b/apps/iot/include/endpoint.hrl index 844f972..1a48ad2 100644 --- a/apps/iot/include/endpoint.hrl +++ b/apps/iot/include/endpoint.hrl @@ -47,6 +47,7 @@ title = <<>> :: binary(), %% 配置项, 格式: #{<<"protocol">> => <<"http|https|ws|kafka|mqtt">>, <<"args">> => #{}} config = #http_endpoint{} :: #http_endpoint{} | #mqtt_endpoint{} | #kafka_endpoint{} | #mysql_endpoint{}, + status = 0, %% 更新时间 updated_at = 0 :: integer(), %% 创建时间 diff --git a/apps/iot/src/endpoint/endpoint_bo.erl b/apps/iot/src/endpoint/endpoint_bo.erl index ba7a893..535d86b 100644 --- a/apps/iot/src/endpoint/endpoint_bo.erl +++ b/apps/iot/src/endpoint/endpoint_bo.erl @@ -8,10 +8,11 @@ %%%------------------------------------------------------------------- -module(endpoint_bo). -author("aresei"). --include("iot.hrl"). +-include("endpoint.hrl"). %% API -export([get_all_endpoints/0, get_endpoint/1]). +-export([endpoint_record/1]). -spec get_all_endpoints() -> [Endpoint :: map()]. get_all_endpoints() -> @@ -24,4 +25,56 @@ get_all_endpoints() -> -spec get_endpoint(Id :: integer()) -> undefined | {ok, EndpointInfo :: map()}. get_endpoint(Id) when is_integer(Id) -> - mysql_pool:get_row(mysql_iot, <<"SELECT * FROM endpoint WHERE id = ? and status = 1 LIMIT 1">>, [Id]). \ No newline at end of file + mysql_pool:get_row(mysql_iot, <<"SELECT * FROM endpoint WHERE id = ? and status = 1 LIMIT 1">>, [Id]). + +-spec endpoint_record(EndpointInfo :: #{}) -> error | {ok, Endpoint :: #endpoint{}}. +endpoint_record(#{<<"id">> := Id, <<"name">> := Name, <<"title">> := Title, <<"type">> := Type, <<"config_json">> := ConfigJson, + <<"status">> := Status, <<"updated_at">> := UpdatedAt, <<"created_at">> := CreatedAt}) -> + try + Config = parse_config(Type, catch jiffy:decode(ConfigJson, [return_maps])), + {ok, #endpoint { + id = Id, + name = Name, + title = Title, + config = Config, + status = Status, + updated_at = UpdatedAt, + created_at = CreatedAt + }} + catch throw:_ -> + error + end. + +parse_config(<<"mysql">>, #{<<"host">> := Host, <<"port">> := Port, <<"username">> := Username, <<"password">> := Password, <<"database">> := Database, <<"table_name">> := TableName}) -> + #mysql_endpoint{ + host = Host, + port = Port, + username = Username, + password = Password, + database = Database, + table_name = TableName + }; +parse_config(<<"mqtt">>, #{<<"host">> := Host, <<"port">> := Port, <<"client_id">> := ClientId, <<"username">> := Username, <<"password">> := Password, <<"topic">> := Topic, <<"qos">> := Qos}) -> + #mqtt_endpoint{ + host = Host, + port = Port, + client_id = ClientId, + username = Username, + password = Password, + topic = Topic, + qos = Qos + }; +parse_config(<<"http">>, #{<<"url">> := Url, <<"pool_size">> := PoolSize}) -> + #http_endpoint{ + url = Url, + pool_size = PoolSize + }; +parse_config(<<"kafka">>, #{<<"username">> := Username, <<"password">> := Password, <<"bootstrap_servers">> := BootstrapServers, <<"topic">> := Topic}) -> + #kafka_endpoint{ + username = Username, + password = Password, + bootstrap_servers = BootstrapServers, + topic = Topic + }; +parse_config(_, _) -> + throw(invalid_config). \ No newline at end of file diff --git a/apps/iot/src/endpoint/endpoint_sup.erl b/apps/iot/src/endpoint/endpoint_sup.erl index 7962a7e..101f1ae 100644 --- a/apps/iot/src/endpoint/endpoint_sup.erl +++ b/apps/iot/src/endpoint/endpoint_sup.erl @@ -30,8 +30,14 @@ start_link() -> init([]) -> SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600}, {ok, Endpoints} = endpoint_bo:get_all_endpoints(), - ChildSpecs = lists:map(fun child_spec/1, Endpoints), - + ChildSpecs = lists:flatmap(fun(EndpointInfo) -> + case endpoint_bo:endpoint_record(EndpointInfo) of + error -> + []; + {ok, Endpoint} -> + [Endpoint] + end + end, Endpoints), {ok, {SupFlags, ChildSpecs}}. %% internal functions