From d21df9074cb9bc2f393225b02cff07ecc7225845 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Tue, 12 Aug 2025 16:09:27 +0800 Subject: [PATCH] fix endpoint --- apps/endpoint/include/endpoint.hrl | 10 +- apps/endpoint/src/endpoint.erl | 3 + apps/endpoint/src/endpoint_mnesia.erl | 97 +++++++++ apps/iot/src/http/endpoint_handler.erl | 270 ++++++++++++++++++++++--- apps/iot/src/http/http_server.erl | 3 +- apps/iot/src/postman/mqtt_postman.erl | 4 +- config/sys-dev.config | 2 +- 7 files changed, 346 insertions(+), 43 deletions(-) create mode 100644 apps/endpoint/src/endpoint_mnesia.erl diff --git a/apps/endpoint/include/endpoint.hrl b/apps/endpoint/include/endpoint.hrl index 0789fbd..844f972 100644 --- a/apps/endpoint/include/endpoint.hrl +++ b/apps/endpoint/include/endpoint.hrl @@ -36,19 +36,15 @@ username = <<>> :: binary(), password = <<>> :: binary(), database = <<>> :: binary(), - table_name = <<>> :: binary(), - pool_size = 10 :: integer() + table_name = <<>> :: binary() }). -record(endpoint, { id :: integer(), + %% 全局唯一,在路由规则中通过名称来指定 + name :: binary(), %% 标题描述 title = <<>> :: binary(), - mapper = <<>> :: binary(), - %% 数据转换规则,基于 - %% fun(LocationCode :: binary(), Fields :: [{<<"key">> => <<>>, <<"value">> => <<>>, <<"unit">> => <<>>}]) - %% fun(LocationCode :: binary(), Fields :: [{<<"key">> => <<>>, <<"value">> => <<>>, <<"unit">> => <<>>}], Timestamp :: integer()) - mapper_fun = fun(_, Fields) -> Fields end :: fun(), %% 配置项, 格式: #{<<"protocol">> => <<"http|https|ws|kafka|mqtt">>, <<"args">> => #{}} config = #http_endpoint{} :: #http_endpoint{} | #mqtt_endpoint{} | #kafka_endpoint{} | #mysql_endpoint{}, %% 更新时间 diff --git a/apps/endpoint/src/endpoint.erl b/apps/endpoint/src/endpoint.erl index b6697c5..5e54725 100644 --- a/apps/endpoint/src/endpoint.erl +++ b/apps/endpoint/src/endpoint.erl @@ -50,6 +50,9 @@ reload(Pid, NEndpoint = #endpoint{}) when is_pid(Pid) -> clean_up(Pid) when is_pid(Pid) -> gen_server:call(Pid, clean_up, 5000). +check_name(Name) when is_binary(Name) -> + ok. + -spec config_equals(any(), any()) -> boolean(). config_equals(#http_endpoint{url = Url}, #http_endpoint{url = Url}) -> true; diff --git a/apps/endpoint/src/endpoint_mnesia.erl b/apps/endpoint/src/endpoint_mnesia.erl new file mode 100644 index 0000000..a1abbdb --- /dev/null +++ b/apps/endpoint/src/endpoint_mnesia.erl @@ -0,0 +1,97 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2025, +%%% @doc +%%% +%%% @end +%%% Created : 12. 8月 2025 15:12 +%%%------------------------------------------------------------------- +-module(endpoint_mnesia). +-author("aresei"). +-include("endpoint.hrl"). +-include_lib("stdlib/include/qlc.hrl"). + +-define(TAB, endpoint). + +%% API +-export([create_table/0]). +-export([insert/1, delete/1, check_name/1]). +-export([get_endpoint/1]). +-export([as_map/1]). + +create_table() -> + %% id生成器 + mnesia:create_table(endpoint, [ + {attributes, record_info(fields, endpoint)}, + {record_name, endpoint}, + {disc_copies, [node()]}, + {type, ordered_set} + ]). + +-spec check_name(Name :: binary()) -> boolean() | {error, Reason :: any()}. +check_name(Name) when is_binary(Name) -> + Fun = fun() -> + Q = qlc:q([E || E <- mnesia:table(?TAB), E#endpoint.name =:= Name]), + case qlc:e(Q) of + [] -> + false; + [_|_] -> + true + end + end, + case mnesia:transaction(Fun) of + {'atomic', Res} -> + Res; + {'aborted', Reason} -> + {error, Reason} + end. + +-spec get_endpoint(Id :: integer()) -> error | {ok, Endpoint :: #endpoint{}}. +get_endpoint(Id) when is_integer(Id) -> + case mnesia:dirty_read(?TAB, Id) of + [] -> + error; + [Endpoint | _] -> + {ok, Endpoint} + end. + +-spec insert(Endpoint :: #endpoint{}) -> ok | {error, Reason :: term()}. +insert(Endpoint = #endpoint{}) -> + case mnesia:transaction(fun() -> mnesia:write(?TAB, Endpoint, write) end) of + {'atomic', ok} -> + ok; + {'aborted', Reason} -> + {error, Reason} + end. + +-spec delete(Id :: integer()) -> ok | {error, Reason :: any()}. +delete(Id) when is_integer(Id) -> + case mnesia:transaction(fun() -> mnesia:delete(?TAB, Id, write) end) of + {'atomic', ok} -> + ok; + {'aborted', Reason} -> + {error, Reason} + end. + +-spec as_map(Endpoint :: #endpoint{}) -> map(). +as_map(#endpoint{id = Id, name = Name, title = Title, config = Config, updated_at = UpdateTs, created_at = CreateTs}) -> + {ConfigKey, ConfigMap} = + case Config of + #http_endpoint{url = Url, pool_size = PoolSize} -> + {<<"http">>, #{<<"url">> => Url, <<"pool_size">> => PoolSize}}; + #mqtt_endpoint{host = Host, port = Port, client_id = ClientId, username = Username, password = Password, topic = Topic, qos = Qos} -> + {<<"mqtt">>, #{<<"host">> => Host, <<"port">> => Port, <<"client_id">> => ClientId, <<"username">> => Username, <<"password">> => Password, <<"topic">> => Topic, <<"qos">> => Qos}}; + #kafka_endpoint{username = Username, password = Password, bootstrap_servers = BootstrapServers, topic = Topic} -> + {<<"kafka">>, #{<<"username">> => Username, <<"password">> => Password, <<"bootstrap_servers">> => BootstrapServers, <<"topic">> => Topic}}; + #mysql_endpoint{host = Host, port = Port, username = Username, password = Password, database = Database, table_name = TableName} -> + {<<"mysql">>, #{<<"host">> => Host, <<"port">> => Port, <<"username">> => Username, <<"password">> => Password, <<"database">> => Database, <<"table_name">> => TableName}} + end, + + Map = #{ + <<"id">> => Id, + <<"name">> => Name, + <<"title">> => Title, + <<"update_ts">> => UpdateTs, + <<"create_ts">> => CreateTs + }, + Map#{ConfigKey => ConfigMap}. \ No newline at end of file diff --git a/apps/iot/src/http/endpoint_handler.erl b/apps/iot/src/http/endpoint_handler.erl index 3bc839d..27bcf38 100644 --- a/apps/iot/src/http/endpoint_handler.erl +++ b/apps/iot/src/http/endpoint_handler.erl @@ -8,7 +8,7 @@ %%%------------------------------------------------------------------- -module(endpoint_handler). -author("licheng5"). --include("iot.hrl"). +-include("endpoint.hrl"). %% API -export([handle_request/4]). @@ -17,48 +17,254 @@ %% helper methods %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -handle_request("POST", "/endpoint/create", _, #{<<"host_id">> := HostId, <<"device_uuid">> := DeviceUUID}) when is_integer(HostId), is_binary(DeviceUUID) -> - lager:debug("[device_handler] host_id: ~p, will reload device uuid: ~p", [HostId, DeviceUUID]), - AliasName = iot_host:get_alias_name(HostId), - case global:whereis_name(AliasName) of - undefined -> - {ok, 200, iot_util:json_error(404, <<"reload device failed">>)}; - HostPid when is_pid(HostPid) -> - case iot_host:reload_device(HostPid, DeviceUUID) of +handle_request("POST", "/endpoint/create", _, #{<<"name">> := Name, <<"title">> := Title, + <<"http">> := #{<<"url">> := Url, <<"pool_size">> := PoolSize}}) when is_binary(Name), is_binary(Title), is_binary(Url), is_integer(PoolSize) -> + case endpoint_mnesia:check_name(Name) of + true -> + {ok, 200, iot_util:json_error(404, <<"endpoint name exists">>)}; + false -> + Endpoint = #endpoint { + name = Name, + title = Title, + config = #http_endpoint{ + url = Url, + pool_size = PoolSize + }, + updated_at = iot_util:current_time(), + created_at = iot_util:current_time() + }, + case endpoint_mnesia:insert(Endpoint) of ok -> {ok, 200, iot_util:json_data(<<"success">>)}; {error, Reason} -> - lager:debug("[device_handler] reload device: ~p, get error: ~p", [DeviceUUID, Reason]), - {ok, 200, iot_util:json_error(404, <<"reload device failed">>)} - end + lager:warning("[endpoint_handler] create endpoint: ~p, get error: ~p", [Name, Reason]), + {ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)} + end; + {error, Reason} -> + lager:warning("[endpoint_handler] create endpoint: ~p, get error: ~p", [Name, Reason]), + {ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)} end; -%% 重新加载对应的主机信息 -handle_request("POST", "/endpoint/reload", _, #{<<"host_id">> := HostId, <<"device_uuid">> := DeviceUUID}) when is_integer(HostId), is_binary(DeviceUUID) -> - lager:debug("[device_handler] host_id: ~p, will reload device uuid: ~p", [HostId, DeviceUUID]), - AliasName = iot_host:get_alias_name(HostId), - case global:whereis_name(AliasName) of - undefined -> - {ok, 200, iot_util:json_error(404, <<"reload device failed">>)}; - HostPid when is_pid(HostPid) -> - case iot_host:reload_device(HostPid, DeviceUUID) of +handle_request("POST", "/endpoint/create", _, #{<<"name">> := Name, <<"title">> := Title, + <<"mqtt">> := #{<<"host">> := Host, <<"port">> := Port, <<"client_id">> := ClientId, <<"username">> := Username, <<"password">> := Password, <<"topic">> := Topic, <<"qos">> := Qos}}) + when is_binary(Name), is_binary(Title), is_binary(Host), is_integer(Port), is_binary(ClientId), is_binary(Username), is_binary(Password), is_binary(Topic), is_integer(Qos) -> + + case endpoint_mnesia:check_name(Name) of + true -> + {ok, 200, iot_util:json_error(404, <<"endpoint name exists">>)}; + false -> + Endpoint = #endpoint { + name = Name, + title = Title, + config = #mqtt_endpoint{ + host = Host, + port = Port, + client_id = ClientId, + username = Username, + password = Password, + topic = Topic, + qos = Qos + }, + updated_at = iot_util:current_time(), + created_at = iot_util:current_time() + }, + case endpoint_mnesia:insert(Endpoint) of ok -> {ok, 200, iot_util:json_data(<<"success">>)}; {error, Reason} -> - lager:debug("[device_handler] reload device: ~p, get error: ~p", [DeviceUUID, Reason]), - {ok, 200, iot_util:json_error(404, <<"reload device failed">>)} + lager:warning("[endpoint_handler] create endpoint: ~p, get error: ~p", [Name, Reason]), + {ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)} + end; + {error, Reason} -> + lager:warning("[endpoint_handler] create endpoint: ~p, get error: ~p", [Name, Reason]), + {ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)} + end; + +handle_request("POST", "/endpoint/create", _, #{<<"name">> := Name, <<"title">> := Title, + <<"kafka">> := #{<<"username">> := Username, <<"password">> := Password, <<"topic">> := Topic, <<"bootstrap_servers">> := BootstrapServers}}) + when is_binary(Name), is_binary(Title), is_binary(Password), is_binary(Topic), is_list(BootstrapServers) -> + + case endpoint_mnesia:check_name(Name) of + true -> + {ok, 200, iot_util:json_error(404, <<"endpoint name exists">>)}; + false -> + Endpoint = #endpoint { + name = Name, + title = Title, + config = #kafka_endpoint{ + username = Username, + password = Password, + topic = Topic, + bootstrap_servers = BootstrapServers + }, + updated_at = iot_util:current_time(), + created_at = iot_util:current_time() + }, + case endpoint_mnesia:insert(Endpoint) of + ok -> + {ok, 200, iot_util:json_data(<<"success">>)}; + {error, Reason} -> + lager:warning("[endpoint_handler] create endpoint: ~p, get error: ~p", [Name, Reason]), + {ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)} + end; + {error, Reason} -> + lager:warning("[endpoint_handler] create endpoint: ~p, get error: ~p", [Name, Reason]), + {ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)} + end; + +handle_request("POST", "/endpoint/create", _, #{<<"name">> := Name, <<"title">> := Title, + <<"mysql">> := #{<<"host">> := Host, <<"port">> := Port, <<"username">> := Username, <<"password">> := Password, <<"database">> := Database, <<"table_name">> := TableName}}) + when is_binary(Name), is_binary(Title), is_binary(Host), is_integer(Port), is_binary(Username), is_binary(Password), is_binary(Database), is_binary(TableName) -> + + case endpoint_mnesia:check_name(Name) of + true -> + {ok, 200, iot_util:json_error(404, <<"endpoint name exists">>)}; + false -> + Endpoint = #endpoint { + name = Name, + title = Title, + config = #mysql_endpoint{ + host = Host, + port = Port, + username = Username, + password = Password, + database = Database, + table_name = TableName + }, + updated_at = iot_util:current_time(), + created_at = iot_util:current_time() + }, + case endpoint_mnesia:insert(Endpoint) of + ok -> + {ok, 200, iot_util:json_data(<<"success">>)}; + {error, Reason} -> + lager:warning("[endpoint_handler] create endpoint: ~p, get error: ~p", [Name, Reason]), + {ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)} + end; + {error, Reason} -> + lager:warning("[endpoint_handler] create endpoint: ~p, get error: ~p", [Name, Reason]), + {ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)} + end; + +%% 更新endpoint信息 +handle_request("POST", "/endpoint/update", _, #{<<"id">> := Id, <<"title">> := Title, + <<"http">> := #{<<"url">> := Url, <<"pool_size">> := PoolSize}}) when is_integer(Id), is_binary(Title), is_binary(Url), is_integer(PoolSize) -> + case endpoint_mnesia:get_endpoint(Id) of + error -> + {ok, 200, iot_util:json_error(404, <<"endpoint not found">>)}; + {ok, Endpoint} -> + NEndpoint = Endpoint#endpoint { + title = Title, + config = #http_endpoint{ + url = Url, + pool_size = PoolSize + }, + updated_at = iot_util:current_time() + }, + case endpoint_mnesia:insert(NEndpoint) of + ok -> + {ok, 200, iot_util:json_data(<<"success">>)}; + {error, Reason} -> + lager:warning("[endpoint_handler] update endpoint: ~p, get error: ~p", [Id, Reason]), + {ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)} end end; -%% 删除对应的主机信息 -handle_request("POST", "/endpoint/delete", _, #{<<"host_id">> := HostId, <<"device_uuid">> := DeviceUUID}) when is_integer(HostId), is_binary(DeviceUUID) -> - AliasName = iot_host:get_alias_name(HostId), - case global:whereis_name(AliasName) of - undefined -> - {ok, 200, iot_util:json_error(404, <<"delete device failed">>)}; - HostPid when is_pid(HostPid) -> - ok = iot_host:delete_device(HostPid, DeviceUUID), - {ok, 200, iot_util:json_data(<<"success">>)} +handle_request("POST", "/endpoint/update", _, #{<<"id">> := Id, <<"title">> := Title, + <<"mqtt">> := #{<<"host">> := Host, <<"port">> := Port, <<"client_id">> := ClientId, <<"username">> := Username, <<"password">> := Password, <<"topic">> := Topic, <<"qos">> := Qos}}) + when is_integer(Id), is_binary(Title), is_binary(Host), is_integer(Port), is_binary(ClientId), is_binary(Username), is_binary(Password), is_binary(Topic), is_integer(Qos) -> + + case endpoint_mnesia:get_endpoint(Id) of + error -> + {ok, 200, iot_util:json_error(404, <<"endpoint not found">>)}; + {ok, Endpoint} -> + NEndpoint = Endpoint#endpoint { + title = Title, + config = #mqtt_endpoint{ + host = Host, + port = Port, + client_id = ClientId, + username = Username, + password = Password, + topic = Topic, + qos = Qos + }, + updated_at = iot_util:current_time() + }, + case endpoint_mnesia:insert(NEndpoint) of + ok -> + {ok, 200, iot_util:json_data(<<"success">>)}; + {error, Reason} -> + lager:warning("[endpoint_handler] update endpoint: ~p, get error: ~p", [Id, Reason]), + {ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)} + end + end; + +handle_request("POST", "/endpoint/update", _, #{<<"id">> := Id, <<"title">> := Title, + <<"kafka">> := #{<<"username">> := Username, <<"password">> := Password, <<"topic">> := Topic, <<"bootstrap_servers">> := BootstrapServers}}) + when is_integer(Id), is_binary(Title), is_binary(Password), is_binary(Topic), is_list(BootstrapServers) -> + + case endpoint_mnesia:get_endpoint(Id) of + error -> + {ok, 200, iot_util:json_error(404, <<"endpoint not found">>)}; + {ok, Endpoint} -> + NEndpoint = Endpoint#endpoint { + title = Title, + config = #kafka_endpoint{ + username = Username, + password = Password, + topic = Topic, + bootstrap_servers = BootstrapServers + }, + updated_at = iot_util:current_time() + }, + case endpoint_mnesia:insert(NEndpoint) of + ok -> + {ok, 200, iot_util:json_data(<<"success">>)}; + {error, Reason} -> + lager:warning("[endpoint_handler] update endpoint: ~p, get error: ~p", [Id, Reason]), + {ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)} + end + end; + +handle_request("POST", "/endpoint/update", _, #{<<"id">> := Id, <<"title">> := Title, + <<"mysql">> := #{<<"host">> := Host, <<"port">> := Port, <<"username">> := Username, <<"password">> := Password, <<"database">> := Database, <<"table_name">> := TableName}}) + when is_integer(Id), is_binary(Title), is_binary(Host), is_integer(Port), is_binary(Username), is_binary(Password), is_binary(Database), is_binary(TableName) -> + + case endpoint_mnesia:get_endpoint(Id) of + error -> + {ok, 200, iot_util:json_error(404, <<"endpoint not found">>)}; + {ok, Endpoint} -> + NEndpoint = Endpoint#endpoint { + title = Title, + config = #mysql_endpoint{ + host = Host, + port = Port, + username = Username, + password = Password, + database = Database, + table_name = TableName + }, + updated_at = iot_util:current_time() + }, + + case endpoint_mnesia:insert(NEndpoint) of + ok -> + {ok, 200, iot_util:json_data(<<"success">>)}; + {error, Reason} -> + lager:warning("[endpoint_handler] update endpoint: ~p, get error: ~p", [Id, Reason]), + {ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)} + end + end; + +%% 删除Endpoint信息 +handle_request("POST", "/endpoint/delete", _, #{<<"id">> := Id}) when is_integer(Id) -> + case endpoint_mnesia:delete(Id) of + ok -> + {ok, 200, iot_util:json_data(<<"success">>)}; + {error, Reason} -> + lager:debug("[endpoint_handler] delete endpoint: ~p, get error: ~p", [Id, Reason]), + {ok, 200, iot_util:json_error(404, <<"delete endpoint failed">>)} end; handle_request(_, Path, _, _) -> diff --git a/apps/iot/src/http/http_server.erl b/apps/iot/src/http/http_server.erl index f23fc61..ea4debe 100644 --- a/apps/iot/src/http/http_server.erl +++ b/apps/iot/src/http/http_server.erl @@ -35,10 +35,9 @@ start() -> {backlog, Backlog}, {max_connections, MaxConnections} ], - {ok, Pid} = cowboy:start_clear(http_listener, TransOpts, #{env => #{dispatch => Dispatcher}}), - lager:debug("[iot_app] the http server start at: ~p, pid is: ~p", [Port, Pid]). + lager:debug("[http_server] the http server start at: ~p, pid is: ~p", [Port, Pid]). init(Req0, Opts = [Mod|_]) -> Method = binary_to_list(cowboy_req:method(Req0)), diff --git a/apps/iot/src/postman/mqtt_postman.erl b/apps/iot/src/postman/mqtt_postman.erl index 6e0177b..41752aa 100644 --- a/apps/iot/src/postman/mqtt_postman.erl +++ b/apps/iot/src/postman/mqtt_postman.erl @@ -97,7 +97,9 @@ handle_info({puback, #{packet_id := PacketId}}, State = #state{inflight = Inflig end; %% 转发信息 -handle_info({post, ReceiverPid, #post_data{id = Id, location_code = LocationCode, body = Message}}, State = #state{conn_pid = ConnPid, inflight = InFlight, topic = Topic0, qos = Qos}) -> +handle_info({post, ReceiverPid, #post_data{id = Id, location_code = LocationCode, body = Message}}, + State = #state{conn_pid = ConnPid, inflight = InFlight, topic = Topic0, qos = Qos}) -> + Topic = re:replace(Topic0, <<"\\${location_code}">>, LocationCode, [global, {return, binary}]), lager:debug("[mqtt_postman] will publish topic: ~p, message: ~ts, qos: ~p", [Topic, Message, Qos]), case emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, Qos}, {retain, true}]) of diff --git a/config/sys-dev.config b/config/sys-dev.config index 29bdc1c..edf5fcf 100644 --- a/config/sys-dev.config +++ b/config/sys-dev.config @@ -63,7 +63,7 @@ [{size, 10}, {max_overflow, 20}, {worker_module, eredis}], [ {host, "127.0.0.1"}, - {port, 26379}, + {port, 6379}, {database, 1} ] }