fix endpoint

This commit is contained in:
anlicheng 2025-08-12 16:09:27 +08:00
parent 4ea8882585
commit d21df9074c
7 changed files with 346 additions and 43 deletions

View File

@ -36,19 +36,15 @@
username = <<>> :: binary(),
password = <<>> :: binary(),
database = <<>> :: binary(),
table_name = <<>> :: binary(),
pool_size = 10 :: integer()
table_name = <<>> :: binary()
}).
-record(endpoint, {
id :: integer(),
%%
name :: binary(),
%%
title = <<>> :: binary(),
mapper = <<>> :: binary(),
%%
%% fun(LocationCode :: binary(), Fields :: [{<<"key">> => <<>>, <<"value">> => <<>>, <<"unit">> => <<>>}])
%% fun(LocationCode :: binary(), Fields :: [{<<"key">> => <<>>, <<"value">> => <<>>, <<"unit">> => <<>>}], Timestamp :: integer())
mapper_fun = fun(_, Fields) -> Fields end :: fun(),
%% , : #{<<"protocol">> => <<"http|https|ws|kafka|mqtt">>, <<"args">> => #{}}
config = #http_endpoint{} :: #http_endpoint{} | #mqtt_endpoint{} | #kafka_endpoint{} | #mysql_endpoint{},
%%

View File

@ -50,6 +50,9 @@ reload(Pid, NEndpoint = #endpoint{}) when is_pid(Pid) ->
clean_up(Pid) when is_pid(Pid) ->
gen_server:call(Pid, clean_up, 5000).
check_name(Name) when is_binary(Name) ->
ok.
-spec config_equals(any(), any()) -> boolean().
config_equals(#http_endpoint{url = Url}, #http_endpoint{url = Url}) ->
true;

View File

@ -0,0 +1,97 @@
%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2025, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 12. 8 2025 15:12
%%%-------------------------------------------------------------------
-module(endpoint_mnesia).
-author("aresei").
-include("endpoint.hrl").
-include_lib("stdlib/include/qlc.hrl").
-define(TAB, endpoint).
%% API
-export([create_table/0]).
-export([insert/1, delete/1, check_name/1]).
-export([get_endpoint/1]).
-export([as_map/1]).
create_table() ->
%% id生成器
mnesia:create_table(endpoint, [
{attributes, record_info(fields, endpoint)},
{record_name, endpoint},
{disc_copies, [node()]},
{type, ordered_set}
]).
-spec check_name(Name :: binary()) -> boolean() | {error, Reason :: any()}.
check_name(Name) when is_binary(Name) ->
Fun = fun() ->
Q = qlc:q([E || E <- mnesia:table(?TAB), E#endpoint.name =:= Name]),
case qlc:e(Q) of
[] ->
false;
[_|_] ->
true
end
end,
case mnesia:transaction(Fun) of
{'atomic', Res} ->
Res;
{'aborted', Reason} ->
{error, Reason}
end.
-spec get_endpoint(Id :: integer()) -> error | {ok, Endpoint :: #endpoint{}}.
get_endpoint(Id) when is_integer(Id) ->
case mnesia:dirty_read(?TAB, Id) of
[] ->
error;
[Endpoint | _] ->
{ok, Endpoint}
end.
-spec insert(Endpoint :: #endpoint{}) -> ok | {error, Reason :: term()}.
insert(Endpoint = #endpoint{}) ->
case mnesia:transaction(fun() -> mnesia:write(?TAB, Endpoint, write) end) of
{'atomic', ok} ->
ok;
{'aborted', Reason} ->
{error, Reason}
end.
-spec delete(Id :: integer()) -> ok | {error, Reason :: any()}.
delete(Id) when is_integer(Id) ->
case mnesia:transaction(fun() -> mnesia:delete(?TAB, Id, write) end) of
{'atomic', ok} ->
ok;
{'aborted', Reason} ->
{error, Reason}
end.
-spec as_map(Endpoint :: #endpoint{}) -> map().
as_map(#endpoint{id = Id, name = Name, title = Title, config = Config, updated_at = UpdateTs, created_at = CreateTs}) ->
{ConfigKey, ConfigMap} =
case Config of
#http_endpoint{url = Url, pool_size = PoolSize} ->
{<<"http">>, #{<<"url">> => Url, <<"pool_size">> => PoolSize}};
#mqtt_endpoint{host = Host, port = Port, client_id = ClientId, username = Username, password = Password, topic = Topic, qos = Qos} ->
{<<"mqtt">>, #{<<"host">> => Host, <<"port">> => Port, <<"client_id">> => ClientId, <<"username">> => Username, <<"password">> => Password, <<"topic">> => Topic, <<"qos">> => Qos}};
#kafka_endpoint{username = Username, password = Password, bootstrap_servers = BootstrapServers, topic = Topic} ->
{<<"kafka">>, #{<<"username">> => Username, <<"password">> => Password, <<"bootstrap_servers">> => BootstrapServers, <<"topic">> => Topic}};
#mysql_endpoint{host = Host, port = Port, username = Username, password = Password, database = Database, table_name = TableName} ->
{<<"mysql">>, #{<<"host">> => Host, <<"port">> => Port, <<"username">> => Username, <<"password">> => Password, <<"database">> => Database, <<"table_name">> => TableName}}
end,
Map = #{
<<"id">> => Id,
<<"name">> => Name,
<<"title">> => Title,
<<"update_ts">> => UpdateTs,
<<"create_ts">> => CreateTs
},
Map#{ConfigKey => ConfigMap}.

View File

@ -8,7 +8,7 @@
%%%-------------------------------------------------------------------
-module(endpoint_handler).
-author("licheng5").
-include("iot.hrl").
-include("endpoint.hrl").
%% API
-export([handle_request/4]).
@ -17,48 +17,254 @@
%% helper methods
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
handle_request("POST", "/endpoint/create", _, #{<<"host_id">> := HostId, <<"device_uuid">> := DeviceUUID}) when is_integer(HostId), is_binary(DeviceUUID) ->
lager:debug("[device_handler] host_id: ~p, will reload device uuid: ~p", [HostId, DeviceUUID]),
AliasName = iot_host:get_alias_name(HostId),
case global:whereis_name(AliasName) of
undefined ->
{ok, 200, iot_util:json_error(404, <<"reload device failed">>)};
HostPid when is_pid(HostPid) ->
case iot_host:reload_device(HostPid, DeviceUUID) of
handle_request("POST", "/endpoint/create", _, #{<<"name">> := Name, <<"title">> := Title,
<<"http">> := #{<<"url">> := Url, <<"pool_size">> := PoolSize}}) when is_binary(Name), is_binary(Title), is_binary(Url), is_integer(PoolSize) ->
case endpoint_mnesia:check_name(Name) of
true ->
{ok, 200, iot_util:json_error(404, <<"endpoint name exists">>)};
false ->
Endpoint = #endpoint {
name = Name,
title = Title,
config = #http_endpoint{
url = Url,
pool_size = PoolSize
},
updated_at = iot_util:current_time(),
created_at = iot_util:current_time()
},
case endpoint_mnesia:insert(Endpoint) of
ok ->
{ok, 200, iot_util:json_data(<<"success">>)};
{error, Reason} ->
lager:debug("[device_handler] reload device: ~p, get error: ~p", [DeviceUUID, Reason]),
{ok, 200, iot_util:json_error(404, <<"reload device failed">>)}
end
lager:warning("[endpoint_handler] create endpoint: ~p, get error: ~p", [Name, Reason]),
{ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)}
end;
{error, Reason} ->
lager:warning("[endpoint_handler] create endpoint: ~p, get error: ~p", [Name, Reason]),
{ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)}
end;
%%
handle_request("POST", "/endpoint/reload", _, #{<<"host_id">> := HostId, <<"device_uuid">> := DeviceUUID}) when is_integer(HostId), is_binary(DeviceUUID) ->
lager:debug("[device_handler] host_id: ~p, will reload device uuid: ~p", [HostId, DeviceUUID]),
AliasName = iot_host:get_alias_name(HostId),
case global:whereis_name(AliasName) of
undefined ->
{ok, 200, iot_util:json_error(404, <<"reload device failed">>)};
HostPid when is_pid(HostPid) ->
case iot_host:reload_device(HostPid, DeviceUUID) of
handle_request("POST", "/endpoint/create", _, #{<<"name">> := Name, <<"title">> := Title,
<<"mqtt">> := #{<<"host">> := Host, <<"port">> := Port, <<"client_id">> := ClientId, <<"username">> := Username, <<"password">> := Password, <<"topic">> := Topic, <<"qos">> := Qos}})
when is_binary(Name), is_binary(Title), is_binary(Host), is_integer(Port), is_binary(ClientId), is_binary(Username), is_binary(Password), is_binary(Topic), is_integer(Qos) ->
case endpoint_mnesia:check_name(Name) of
true ->
{ok, 200, iot_util:json_error(404, <<"endpoint name exists">>)};
false ->
Endpoint = #endpoint {
name = Name,
title = Title,
config = #mqtt_endpoint{
host = Host,
port = Port,
client_id = ClientId,
username = Username,
password = Password,
topic = Topic,
qos = Qos
},
updated_at = iot_util:current_time(),
created_at = iot_util:current_time()
},
case endpoint_mnesia:insert(Endpoint) of
ok ->
{ok, 200, iot_util:json_data(<<"success">>)};
{error, Reason} ->
lager:debug("[device_handler] reload device: ~p, get error: ~p", [DeviceUUID, Reason]),
{ok, 200, iot_util:json_error(404, <<"reload device failed">>)}
lager:warning("[endpoint_handler] create endpoint: ~p, get error: ~p", [Name, Reason]),
{ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)}
end;
{error, Reason} ->
lager:warning("[endpoint_handler] create endpoint: ~p, get error: ~p", [Name, Reason]),
{ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)}
end;
handle_request("POST", "/endpoint/create", _, #{<<"name">> := Name, <<"title">> := Title,
<<"kafka">> := #{<<"username">> := Username, <<"password">> := Password, <<"topic">> := Topic, <<"bootstrap_servers">> := BootstrapServers}})
when is_binary(Name), is_binary(Title), is_binary(Password), is_binary(Topic), is_list(BootstrapServers) ->
case endpoint_mnesia:check_name(Name) of
true ->
{ok, 200, iot_util:json_error(404, <<"endpoint name exists">>)};
false ->
Endpoint = #endpoint {
name = Name,
title = Title,
config = #kafka_endpoint{
username = Username,
password = Password,
topic = Topic,
bootstrap_servers = BootstrapServers
},
updated_at = iot_util:current_time(),
created_at = iot_util:current_time()
},
case endpoint_mnesia:insert(Endpoint) of
ok ->
{ok, 200, iot_util:json_data(<<"success">>)};
{error, Reason} ->
lager:warning("[endpoint_handler] create endpoint: ~p, get error: ~p", [Name, Reason]),
{ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)}
end;
{error, Reason} ->
lager:warning("[endpoint_handler] create endpoint: ~p, get error: ~p", [Name, Reason]),
{ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)}
end;
handle_request("POST", "/endpoint/create", _, #{<<"name">> := Name, <<"title">> := Title,
<<"mysql">> := #{<<"host">> := Host, <<"port">> := Port, <<"username">> := Username, <<"password">> := Password, <<"database">> := Database, <<"table_name">> := TableName}})
when is_binary(Name), is_binary(Title), is_binary(Host), is_integer(Port), is_binary(Username), is_binary(Password), is_binary(Database), is_binary(TableName) ->
case endpoint_mnesia:check_name(Name) of
true ->
{ok, 200, iot_util:json_error(404, <<"endpoint name exists">>)};
false ->
Endpoint = #endpoint {
name = Name,
title = Title,
config = #mysql_endpoint{
host = Host,
port = Port,
username = Username,
password = Password,
database = Database,
table_name = TableName
},
updated_at = iot_util:current_time(),
created_at = iot_util:current_time()
},
case endpoint_mnesia:insert(Endpoint) of
ok ->
{ok, 200, iot_util:json_data(<<"success">>)};
{error, Reason} ->
lager:warning("[endpoint_handler] create endpoint: ~p, get error: ~p", [Name, Reason]),
{ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)}
end;
{error, Reason} ->
lager:warning("[endpoint_handler] create endpoint: ~p, get error: ~p", [Name, Reason]),
{ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)}
end;
%% endpoint信息
handle_request("POST", "/endpoint/update", _, #{<<"id">> := Id, <<"title">> := Title,
<<"http">> := #{<<"url">> := Url, <<"pool_size">> := PoolSize}}) when is_integer(Id), is_binary(Title), is_binary(Url), is_integer(PoolSize) ->
case endpoint_mnesia:get_endpoint(Id) of
error ->
{ok, 200, iot_util:json_error(404, <<"endpoint not found">>)};
{ok, Endpoint} ->
NEndpoint = Endpoint#endpoint {
title = Title,
config = #http_endpoint{
url = Url,
pool_size = PoolSize
},
updated_at = iot_util:current_time()
},
case endpoint_mnesia:insert(NEndpoint) of
ok ->
{ok, 200, iot_util:json_data(<<"success">>)};
{error, Reason} ->
lager:warning("[endpoint_handler] update endpoint: ~p, get error: ~p", [Id, Reason]),
{ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)}
end
end;
%%
handle_request("POST", "/endpoint/delete", _, #{<<"host_id">> := HostId, <<"device_uuid">> := DeviceUUID}) when is_integer(HostId), is_binary(DeviceUUID) ->
AliasName = iot_host:get_alias_name(HostId),
case global:whereis_name(AliasName) of
undefined ->
{ok, 200, iot_util:json_error(404, <<"delete device failed">>)};
HostPid when is_pid(HostPid) ->
ok = iot_host:delete_device(HostPid, DeviceUUID),
{ok, 200, iot_util:json_data(<<"success">>)}
handle_request("POST", "/endpoint/update", _, #{<<"id">> := Id, <<"title">> := Title,
<<"mqtt">> := #{<<"host">> := Host, <<"port">> := Port, <<"client_id">> := ClientId, <<"username">> := Username, <<"password">> := Password, <<"topic">> := Topic, <<"qos">> := Qos}})
when is_integer(Id), is_binary(Title), is_binary(Host), is_integer(Port), is_binary(ClientId), is_binary(Username), is_binary(Password), is_binary(Topic), is_integer(Qos) ->
case endpoint_mnesia:get_endpoint(Id) of
error ->
{ok, 200, iot_util:json_error(404, <<"endpoint not found">>)};
{ok, Endpoint} ->
NEndpoint = Endpoint#endpoint {
title = Title,
config = #mqtt_endpoint{
host = Host,
port = Port,
client_id = ClientId,
username = Username,
password = Password,
topic = Topic,
qos = Qos
},
updated_at = iot_util:current_time()
},
case endpoint_mnesia:insert(NEndpoint) of
ok ->
{ok, 200, iot_util:json_data(<<"success">>)};
{error, Reason} ->
lager:warning("[endpoint_handler] update endpoint: ~p, get error: ~p", [Id, Reason]),
{ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)}
end
end;
handle_request("POST", "/endpoint/update", _, #{<<"id">> := Id, <<"title">> := Title,
<<"kafka">> := #{<<"username">> := Username, <<"password">> := Password, <<"topic">> := Topic, <<"bootstrap_servers">> := BootstrapServers}})
when is_integer(Id), is_binary(Title), is_binary(Password), is_binary(Topic), is_list(BootstrapServers) ->
case endpoint_mnesia:get_endpoint(Id) of
error ->
{ok, 200, iot_util:json_error(404, <<"endpoint not found">>)};
{ok, Endpoint} ->
NEndpoint = Endpoint#endpoint {
title = Title,
config = #kafka_endpoint{
username = Username,
password = Password,
topic = Topic,
bootstrap_servers = BootstrapServers
},
updated_at = iot_util:current_time()
},
case endpoint_mnesia:insert(NEndpoint) of
ok ->
{ok, 200, iot_util:json_data(<<"success">>)};
{error, Reason} ->
lager:warning("[endpoint_handler] update endpoint: ~p, get error: ~p", [Id, Reason]),
{ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)}
end
end;
handle_request("POST", "/endpoint/update", _, #{<<"id">> := Id, <<"title">> := Title,
<<"mysql">> := #{<<"host">> := Host, <<"port">> := Port, <<"username">> := Username, <<"password">> := Password, <<"database">> := Database, <<"table_name">> := TableName}})
when is_integer(Id), is_binary(Title), is_binary(Host), is_integer(Port), is_binary(Username), is_binary(Password), is_binary(Database), is_binary(TableName) ->
case endpoint_mnesia:get_endpoint(Id) of
error ->
{ok, 200, iot_util:json_error(404, <<"endpoint not found">>)};
{ok, Endpoint} ->
NEndpoint = Endpoint#endpoint {
title = Title,
config = #mysql_endpoint{
host = Host,
port = Port,
username = Username,
password = Password,
database = Database,
table_name = TableName
},
updated_at = iot_util:current_time()
},
case endpoint_mnesia:insert(NEndpoint) of
ok ->
{ok, 200, iot_util:json_data(<<"success">>)};
{error, Reason} ->
lager:warning("[endpoint_handler] update endpoint: ~p, get error: ~p", [Id, Reason]),
{ok, 200, iot_util:json_error(404, <<"check endpoint name get error">>)}
end
end;
%% Endpoint信息
handle_request("POST", "/endpoint/delete", _, #{<<"id">> := Id}) when is_integer(Id) ->
case endpoint_mnesia:delete(Id) of
ok ->
{ok, 200, iot_util:json_data(<<"success">>)};
{error, Reason} ->
lager:debug("[endpoint_handler] delete endpoint: ~p, get error: ~p", [Id, Reason]),
{ok, 200, iot_util:json_error(404, <<"delete endpoint failed">>)}
end;
handle_request(_, Path, _, _) ->

View File

@ -35,10 +35,9 @@ start() ->
{backlog, Backlog},
{max_connections, MaxConnections}
],
{ok, Pid} = cowboy:start_clear(http_listener, TransOpts, #{env => #{dispatch => Dispatcher}}),
lager:debug("[iot_app] the http server start at: ~p, pid is: ~p", [Port, Pid]).
lager:debug("[http_server] the http server start at: ~p, pid is: ~p", [Port, Pid]).
init(Req0, Opts = [Mod|_]) ->
Method = binary_to_list(cowboy_req:method(Req0)),

View File

@ -97,7 +97,9 @@ handle_info({puback, #{packet_id := PacketId}}, State = #state{inflight = Inflig
end;
%%
handle_info({post, ReceiverPid, #post_data{id = Id, location_code = LocationCode, body = Message}}, State = #state{conn_pid = ConnPid, inflight = InFlight, topic = Topic0, qos = Qos}) ->
handle_info({post, ReceiverPid, #post_data{id = Id, location_code = LocationCode, body = Message}},
State = #state{conn_pid = ConnPid, inflight = InFlight, topic = Topic0, qos = Qos}) ->
Topic = re:replace(Topic0, <<"\\${location_code}">>, LocationCode, [global, {return, binary}]),
lager:debug("[mqtt_postman] will publish topic: ~p, message: ~ts, qos: ~p", [Topic, Message, Qos]),
case emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, Qos}, {retain, true}]) of

View File

@ -63,7 +63,7 @@
[{size, 10}, {max_overflow, 20}, {worker_module, eredis}],
[
{host, "127.0.0.1"},
{port, 26379},
{port, 6379},
{database, 1}
]
}