fix endpoint

This commit is contained in:
anlicheng 2025-11-14 14:40:53 +08:00
parent 8004046136
commit e0aefc4fe9
5 changed files with 300 additions and 63 deletions

View File

@ -14,7 +14,7 @@
-export([start_link/1]).
-export([get_name/1, get_pid/1, forward/3, reload/2, clean_up/1]).
-export([get_alias_pid/1, is_support/1, get_protocol/1]).
-export([endpoint_record/1]).
-export([endpoint_record/1, parse_config/2]).
%%%===================================================================
%%% API
@ -75,69 +75,212 @@ is_support(Protocol) when is_atom(Protocol) ->
-spec endpoint_record(EndpointInfo :: #{}) -> error | {ok, Endpoint :: #endpoint{}}.
endpoint_record(#{<<"id">> := Id, <<"matcher">> := Matcher, <<"title">> := Title, <<"type">> := Type, <<"config">> := ConfigJson,
<<"status">> := Status, <<"updated_at">> := UpdatedAt, <<"created_at">> := CreatedAt}) ->
try
Config = parse_config(Type, ConfigJson),
{ok, #endpoint {
id = Id,
matcher = Matcher,
title = Title,
config = Config,
status = Status,
updated_at = UpdatedAt,
created_at = CreatedAt
}}
catch throw:_ ->
error
case parse_config(Type, ConfigJson) of
{ok, Config} ->
{ok, #endpoint {
id = Id,
matcher = Matcher,
title = Title,
config = Config,
status = Status,
updated_at = UpdatedAt,
created_at = CreatedAt
}};
{error, _Reason} ->
error
end.
-spec parse_config(Protocol :: binary(), Config :: map()) -> {ok, #mqtt_endpoint{} | #kafka_endpoint{} | #http_endpoint{}} | {error, Errors :: [Error :: binary()]}.
parse_config(<<"mqtt">>, #{<<"host">> := Host, <<"port">> := Port0, <<"client_id">> := ClientId, <<"username">> := Username, <<"password">> := Password, <<"topic">> := Topic, <<"qos">> := Qos}) ->
Port = if is_binary(Port0) -> binary_to_integer(Port0); is_integer(Port0) -> Port0 end,
#mqtt_endpoint{
host = Host,
port = Port,
client_id = ClientId,
username = Username,
password = Password,
topic = Topic,
qos = Qos
};
Errors = lists:filtermap(fun(Term) ->
case check_mqtt_argument(Term) of
ok ->
false;
{error, Error} ->
{true, Error}
end
end, [{host, Host}, {port, Port}, {username, Username}, {password, Password}, {topic, Topic}, {qos, Qos}]),
case Errors =:= [] of
true ->
{ok, #mqtt_endpoint{
host = Host,
port = Port,
client_id = ClientId,
username = Username,
password = Password,
topic = Topic,
qos = Qos
}};
false ->
{error, Errors}
end;
parse_config(<<"http">>, #{<<"url">> := Url, <<"pool_size">> := PoolSize}) ->
#http_endpoint{
{ok, #http_endpoint{
url = Url,
pool_size = PoolSize
};
}};
parse_config(<<"kafka">>, #{<<"sasl_config">> := #{<<"username">> := Username, <<"password">> := Password, <<"mechanism">> := Mechanism0}, <<"bootstrap_servers">> := BootstrapServers, <<"topic">> := Topic}) ->
Mechanism = case Mechanism0 of
<<"sha_256">> ->
scram_sha_256;
<<"sha_512">> ->
scram_sha_512;
<<"plain">> ->
plain;
_ ->
plain
end,
#kafka_endpoint{
sasl_config = {Mechanism, Username, Password},
bootstrap_servers = parse_bootstrap_servers(BootstrapServers),
topic = Topic
};
parse_config(<<"kafka">>, #{<<"bootstrap_servers">> := BootstrapServers, <<"topic">> := Topic}) ->
#kafka_endpoint{
sasl_config = undefined,
bootstrap_servers = parse_bootstrap_servers(BootstrapServers),
topic = Topic
};
parse_config(_, _) ->
throw(invalid_config).
parse_bootstrap_servers(BootstrapServers) when is_list(BootstrapServers) ->
lists:filtermap(fun(S) ->
case binary:split(S, <<":">>) of
[Host0, Port0] ->
{true, {binary_to_list(Host0), binary_to_integer(Port0)}};
_ ->
false
Errors = lists:filtermap(fun(Term) ->
case check_kafka_argument(Term) of
ok ->
false;
{error, Error} ->
{true, Error}
end
end, BootstrapServers).
end, [{username, Username}, {password, Password}, {topic, Topic}, {mechanism, Mechanism0}, {bootstrap_servers, BootstrapServers}]),
case Errors =:= [] of
true ->
Mechanism = parse_kafka_mechanism(Mechanism0),
{ok, #kafka_endpoint{
sasl_config = {Mechanism, Username, Password},
bootstrap_servers = parse_kafka_bootstrap_servers(BootstrapServers),
topic = Topic
}};
false ->
{error, Errors}
end;
parse_config(<<"kafka">>, #{<<"bootstrap_servers">> := BootstrapServers, <<"topic">> := Topic}) ->
Errors = lists:filtermap(fun(Term) ->
case check_kafka_argument(Term) of
ok ->
false;
{error, Error} ->
{true, Error}
end
end, [{topic, Topic}, {bootstrap_servers, BootstrapServers}]),
case Errors =:= [] of
true ->
{ok, #kafka_endpoint{
sasl_config = undefined,
bootstrap_servers = parse_kafka_bootstrap_servers(BootstrapServers),
topic = Topic
}};
false ->
{error, Errors}
end;
parse_config(_, _) ->
{error, invalid_config}.
-spec parse_kafka_bootstrap_servers(BootstrapServers :: [binary()]) -> Servers :: [{Host :: string(), Port :: integer()}].
parse_kafka_bootstrap_servers(BootstrapServers) when is_list(BootstrapServers) ->
lists:map(fun(S) ->
[Host0, Port0] = binary:split(S, <<":">>),
{binary_to_list(Host0), binary_to_integer(Port0)}
end, BootstrapServers).
-spec parse_kafka_mechanism(Mechanism0 :: binary()) -> atom().
parse_kafka_mechanism(Mechanism0) when is_binary(Mechanism0) ->
case Mechanism0 of
<<"sha_256">> ->
scram_sha_256;
<<"sha_512">> ->
scram_sha_512;
<<"plain">> ->
plain;
_ ->
plain
end.
-spec check_kafka_argument(tuple()) -> ok | {error, Reason :: binary()}.
check_kafka_argument({mechanism, Mechanism}) ->
case lists:member(Mechanism, [<<"sha_256">>, <<"sha_512">>, <<"plain">>]) of
true ->
ok;
false ->
{error, <<"mechanism invalid">>}
end;
check_kafka_argument({username, Username}) ->
case Username /= <<>> of
true ->
ok;
false ->
{error, <<"username is empty">>}
end;
check_kafka_argument({password, Password}) ->
case Password /= <<>> of
true ->
ok;
false ->
{error, <<"password is empty">>}
end;
check_kafka_argument({topic, Topic}) ->
case Topic /= <<>> of
true ->
ok;
false ->
{error, <<"topic is empty">>}
end;
check_kafka_argument({bootstrap_servers, BootstrapServers}) ->
case is_list(BootstrapServers) andalso length(BootstrapServers) > 0 of
true ->
InvalidServers = lists:filtermap(fun(S) ->
case binary:split(S, <<":">>) of
[Host0, Port0] ->
Host = binary_to_list(Host0),
Port = binary_to_integer(Port0),
case not string:is_empty(Host) andalso (is_integer(Port) andalso Port > 0) of
true ->
false;
false ->
{true, S}
end;
_ ->
{true, S}
end
end, BootstrapServers),
case InvalidServers =:= [] of
true ->
ok;
false ->
{error, iolist_to_binary([<<"bootstrap_servers: ">>, lists:join(<<",">>, InvalidServers), <<" format is error">>])}
end;
false ->
{error, <<"bootstrap_servers is empty">>}
end.
-spec check_mqtt_argument(tuple()) -> ok | {error, Reason :: binary()}.
check_mqtt_argument({host, Host}) ->
case Host /= <<>> of
true ->
ok;
false ->
{error, <<"host is empty">>}
end;
check_mqtt_argument({port, Port}) ->
case is_integer(Port) andalso Port > 0 of
true ->
ok;
false ->
{error, <<"port invalid">>}
end;
check_mqtt_argument({username, Username}) ->
case Username /= <<>> of
true ->
ok;
false ->
{error, <<"username is empty">>}
end;
check_mqtt_argument({password, Password}) ->
case Password /= <<>> of
true ->
ok;
false ->
{error, <<"password is empty">>}
end;
check_mqtt_argument({topic, Topic}) ->
case Topic /= <<>> of
true ->
ok;
false ->
{error, <<"topic is empty">>}
end;
check_mqtt_argument({qos, Qos}) ->
case is_integer(Qos) andalso lists:member(Qos, [0, 1, 2]) of
true ->
ok;
false ->
{error, <<"qos invalid">>}
end.

View File

@ -98,6 +98,99 @@ handle_request("POST", "/endpoint/restart", _, #{<<"id">> := Id}) when is_intege
end
end;
%% http接口的测试
handle_request("POST", "/endpoint/test", _, #{<<"protocol">> := <<"http">>, <<"config">> := #{<<"url">> := Url, <<"pool_size">> := PoolSize}}) when is_integer(PoolSize), PoolSize > 0 ->
Body = <<"">>,
ContentType = "application/json",
case httpc:request(post, {Url, [], ContentType, Body}, [], []) of
{ok, _} ->
{ok, 200, iot_util:json_data(<<"success">>)};
{error, Reason} ->
lager:debug("[endpint_handler] test http: ~p, error: ~p", [Url, Reason]),
{ok, 200, iot_util:json_error(-1, <<"url failed">>)}
end;
%% mqtt
handle_request("POST", "/endpoint/test", _, #{<<"protocol">> := <<"mqtt">>, <<"config">> := Config}) ->
case endpoint:parse_config(<<"mqtt">>, Config) of
{ok, #mqtt_endpoint{host = Host, port = Port, username = Username, password = Password}} ->
%% client
ClientId = "mqtt_client_test:" ++ iot_util:rand_bytes(16),
Opts = [
{owner, self()},
{clientid, ClientId},
{host, binary_to_list(Host)},
{port, Port},
{tcp_opts, []},
{username, binary_to_list(Username)},
{password, binary_to_list(Password)},
{keepalive, 86400},
{auto_ack, true},
{connect_timeout, 5000},
{proto_ver, v5},
{retry_interval, 5000}
],
case emqtt:start_link(Opts) of
{ok, ConnPid} ->
lager:debug("[endpoint_mqtt] start connect, options: ~p", [Opts]),
case catch emqtt:connect(ConnPid, 5000) of
{ok, _} ->
lager:debug("[endpoint_mqtt] connect success, pid: ~p", [ConnPid]),
emqtt:stop(ConnPid),
{ok, 200, iot_util:json_data(<<"success">>)};
{error, Reason} ->
lager:warning("[endpoint_mqtt] connect get error: ~p", [Reason]),
emqtt:stop(ConnPid),
{ok, 200, iot_util:json_error(-1, <<"connect mqtt server failed">>)};
Error ->
lager:warning("[endpoint_mqtt] connect get error: ~p", [Error]),
emqtt:stop(ConnPid),
{ok, 200, iot_util:json_error(-1, <<"connect mqtt server failed">>)}
end;
Other ->
lager:warning("[endpoint_handler] test connect mqtt with options: ~p, get error: ~p", [Opts, Other]),
{ok, 200, iot_util:json_error(-1, <<"connect mqtt server failed">>)}
end;
{error, Errors} ->
{ok, 200, iot_util:json_error(-1, Errors)}
end;
%% kafka
handle_request("POST", "/endpoint/test", _, #{<<"protocol">> := <<"kafka">>, <<"config">> := Config}) ->
case endpoint:parse_config(<<"kafka">>, Config) of
{ok, #kafka_endpoint{sasl_config = SaslConfig, bootstrap_servers = BootstrapServers, topic = Topic}} ->
BaseConfig = [
{reconnect_cool_down_seconds, 5},
{socket_options, [{keepalive, true}]}
],
ClientConfig = case SaslConfig of
{Mechanism, Username, Password} ->
[{sasl, {Mechanism, Username, Password}}|BaseConfig];
undefined ->
BaseConfig
end,
ClientId = list_to_atom("brod_client_test:" ++ iot_util:rand_bytes(16)),
case catch brod:start_link_client(BootstrapServers, ClientId, ClientConfig) of
{ok, _ClientPid} ->
case brod:start_producer(ClientId, Topic, _ProducerConfig = []) of
ok ->
ok = brod:stop_client(ClientId),
{ok, 200, iot_util:json_data(<<"ok">>)};
{error, Reason} ->
lager:debug("[endpoint_kafka] start_producer: ~p, get error: ~p", [ClientId, Reason]),
{ok, 200, iot_util:json_error(-1, <<"config kafka server failed">>)}
end;
Error ->
lager:debug("[endpoint_kafka] start_client: ~p, get error: ~p", [ClientId, Error]),
{ok, 200, iot_util:json_error(-1, <<"config kafka server failed">>)}
end;
{error, Errors} ->
{ok, 200, iot_util:json_error(-1, Errors)}
end;
handle_request(_, Path, _, _) ->
Path1 = list_to_binary(Path),
{ok, 200, iot_util:json_error(-1, <<"url: ", Path1/binary, " not found">>)}.

View File

@ -26,6 +26,7 @@
erts,
runtime_tools,
observer,
inets,
kernel,
stdlib
]},

View File

@ -298,16 +298,16 @@ handle_event(cast, {handle, {data, #data{service_id = ServiceId, device_uuid = D
RouteKey = get_route_key(RouteKey0),
endpoint_subscription:publish(RouteKey, ServiceId, Metric),
{keep_state, State};
%%
handle_event(cast, {handle, {event, #event{service_id = ServiceId, event_type = EventType, params = Params}}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true}) ->
lager:debug("[iot_host] event uuid: ~p, service_id: ~p, event_type: ~p, params: ~p", [UUID, ServiceId, EventType, Params]),
{keep_state, State};
%% ping的数据是通过aes加密后的
handle_event(cast, {handle, {ping, Metrics}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true}) ->
lager:debug("[iot_host] ping host_id uuid: ~p, get ping: ~p", [UUID, Metrics]),
{keep_state, State#state{metrics = Metrics}};
handle_event(cast, {handle, {event, #event{service_id = ServiceId, event_type = EventType, params = Params}}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true}) ->
lager:debug("[iot_host] event uuid: ~p, service_id: ~p, event_type: ~p, params: ~p", [UUID, ServiceId, EventType, Params]),
{keep_state, State};
%%
handle_event(cast, heartbeat, _, State = #state{heartbeat_counter = HeartbeatCounter}) ->
{keep_state, State#state{heartbeat_counter = HeartbeatCounter + 1}};

View File

@ -80,7 +80,7 @@ json_data(Data) ->
<<"result">> => Data
}, [force_utf8]).
json_error(ErrCode, ErrMessage) when is_integer(ErrCode), is_binary(ErrMessage) ->
json_error(ErrCode, ErrMessage) when is_integer(ErrCode) ->
jiffy:encode(#{
<<"error">> => #{
<<"code">> => ErrCode,