diff --git a/apps/iot/src/endpoint/endpoint.erl b/apps/iot/src/endpoint/endpoint.erl index 89d1a1e..2e45372 100644 --- a/apps/iot/src/endpoint/endpoint.erl +++ b/apps/iot/src/endpoint/endpoint.erl @@ -14,7 +14,7 @@ -export([start_link/1]). -export([get_name/1, get_pid/1, forward/3, reload/2, clean_up/1]). -export([get_alias_pid/1, is_support/1, get_protocol/1]). --export([endpoint_record/1]). +-export([endpoint_record/1, parse_config/2]). %%%=================================================================== %%% API @@ -75,69 +75,212 @@ is_support(Protocol) when is_atom(Protocol) -> -spec endpoint_record(EndpointInfo :: #{}) -> error | {ok, Endpoint :: #endpoint{}}. endpoint_record(#{<<"id">> := Id, <<"matcher">> := Matcher, <<"title">> := Title, <<"type">> := Type, <<"config">> := ConfigJson, <<"status">> := Status, <<"updated_at">> := UpdatedAt, <<"created_at">> := CreatedAt}) -> - try - Config = parse_config(Type, ConfigJson), - {ok, #endpoint { - id = Id, - matcher = Matcher, - title = Title, - config = Config, - status = Status, - updated_at = UpdatedAt, - created_at = CreatedAt - }} - catch throw:_ -> - error + case parse_config(Type, ConfigJson) of + {ok, Config} -> + {ok, #endpoint { + id = Id, + matcher = Matcher, + title = Title, + config = Config, + status = Status, + updated_at = UpdatedAt, + created_at = CreatedAt + }}; + {error, _Reason} -> + error end. +-spec parse_config(Protocol :: binary(), Config :: map()) -> {ok, #mqtt_endpoint{} | #kafka_endpoint{} | #http_endpoint{}} | {error, Errors :: [Error :: binary()]}. parse_config(<<"mqtt">>, #{<<"host">> := Host, <<"port">> := Port0, <<"client_id">> := ClientId, <<"username">> := Username, <<"password">> := Password, <<"topic">> := Topic, <<"qos">> := Qos}) -> Port = if is_binary(Port0) -> binary_to_integer(Port0); is_integer(Port0) -> Port0 end, - #mqtt_endpoint{ - host = Host, - port = Port, - client_id = ClientId, - username = Username, - password = Password, - topic = Topic, - qos = Qos - }; + + Errors = lists:filtermap(fun(Term) -> + case check_mqtt_argument(Term) of + ok -> + false; + {error, Error} -> + {true, Error} + end + end, [{host, Host}, {port, Port}, {username, Username}, {password, Password}, {topic, Topic}, {qos, Qos}]), + case Errors =:= [] of + true -> + {ok, #mqtt_endpoint{ + host = Host, + port = Port, + client_id = ClientId, + username = Username, + password = Password, + topic = Topic, + qos = Qos + }}; + false -> + {error, Errors} + end; parse_config(<<"http">>, #{<<"url">> := Url, <<"pool_size">> := PoolSize}) -> - #http_endpoint{ + {ok, #http_endpoint{ url = Url, pool_size = PoolSize - }; + }}; parse_config(<<"kafka">>, #{<<"sasl_config">> := #{<<"username">> := Username, <<"password">> := Password, <<"mechanism">> := Mechanism0}, <<"bootstrap_servers">> := BootstrapServers, <<"topic">> := Topic}) -> - Mechanism = case Mechanism0 of - <<"sha_256">> -> - scram_sha_256; - <<"sha_512">> -> - scram_sha_512; - <<"plain">> -> - plain; - _ -> - plain - end, - - #kafka_endpoint{ - sasl_config = {Mechanism, Username, Password}, - bootstrap_servers = parse_bootstrap_servers(BootstrapServers), - topic = Topic - }; -parse_config(<<"kafka">>, #{<<"bootstrap_servers">> := BootstrapServers, <<"topic">> := Topic}) -> - #kafka_endpoint{ - sasl_config = undefined, - bootstrap_servers = parse_bootstrap_servers(BootstrapServers), - topic = Topic - }; -parse_config(_, _) -> - throw(invalid_config). - -parse_bootstrap_servers(BootstrapServers) when is_list(BootstrapServers) -> - lists:filtermap(fun(S) -> - case binary:split(S, <<":">>) of - [Host0, Port0] -> - {true, {binary_to_list(Host0), binary_to_integer(Port0)}}; - _ -> - false + Errors = lists:filtermap(fun(Term) -> + case check_kafka_argument(Term) of + ok -> + false; + {error, Error} -> + {true, Error} end - end, BootstrapServers). \ No newline at end of file + end, [{username, Username}, {password, Password}, {topic, Topic}, {mechanism, Mechanism0}, {bootstrap_servers, BootstrapServers}]), + case Errors =:= [] of + true -> + Mechanism = parse_kafka_mechanism(Mechanism0), + {ok, #kafka_endpoint{ + sasl_config = {Mechanism, Username, Password}, + bootstrap_servers = parse_kafka_bootstrap_servers(BootstrapServers), + topic = Topic + }}; + false -> + {error, Errors} + end; +parse_config(<<"kafka">>, #{<<"bootstrap_servers">> := BootstrapServers, <<"topic">> := Topic}) -> + Errors = lists:filtermap(fun(Term) -> + case check_kafka_argument(Term) of + ok -> + false; + {error, Error} -> + {true, Error} + end + end, [{topic, Topic}, {bootstrap_servers, BootstrapServers}]), + case Errors =:= [] of + true -> + {ok, #kafka_endpoint{ + sasl_config = undefined, + bootstrap_servers = parse_kafka_bootstrap_servers(BootstrapServers), + topic = Topic + }}; + false -> + {error, Errors} + end; +parse_config(_, _) -> + {error, invalid_config}. + +-spec parse_kafka_bootstrap_servers(BootstrapServers :: [binary()]) -> Servers :: [{Host :: string(), Port :: integer()}]. +parse_kafka_bootstrap_servers(BootstrapServers) when is_list(BootstrapServers) -> + lists:map(fun(S) -> + [Host0, Port0] = binary:split(S, <<":">>), + {binary_to_list(Host0), binary_to_integer(Port0)} + end, BootstrapServers). + +-spec parse_kafka_mechanism(Mechanism0 :: binary()) -> atom(). +parse_kafka_mechanism(Mechanism0) when is_binary(Mechanism0) -> + case Mechanism0 of + <<"sha_256">> -> + scram_sha_256; + <<"sha_512">> -> + scram_sha_512; + <<"plain">> -> + plain; + _ -> + plain + end. + +-spec check_kafka_argument(tuple()) -> ok | {error, Reason :: binary()}. +check_kafka_argument({mechanism, Mechanism}) -> + case lists:member(Mechanism, [<<"sha_256">>, <<"sha_512">>, <<"plain">>]) of + true -> + ok; + false -> + {error, <<"mechanism invalid">>} + end; +check_kafka_argument({username, Username}) -> + case Username /= <<>> of + true -> + ok; + false -> + {error, <<"username is empty">>} + end; +check_kafka_argument({password, Password}) -> + case Password /= <<>> of + true -> + ok; + false -> + {error, <<"password is empty">>} + end; +check_kafka_argument({topic, Topic}) -> + case Topic /= <<>> of + true -> + ok; + false -> + {error, <<"topic is empty">>} + end; +check_kafka_argument({bootstrap_servers, BootstrapServers}) -> + case is_list(BootstrapServers) andalso length(BootstrapServers) > 0 of + true -> + InvalidServers = lists:filtermap(fun(S) -> + case binary:split(S, <<":">>) of + [Host0, Port0] -> + Host = binary_to_list(Host0), + Port = binary_to_integer(Port0), + case not string:is_empty(Host) andalso (is_integer(Port) andalso Port > 0) of + true -> + false; + false -> + {true, S} + end; + _ -> + {true, S} + end + end, BootstrapServers), + + case InvalidServers =:= [] of + true -> + ok; + false -> + {error, iolist_to_binary([<<"bootstrap_servers: ">>, lists:join(<<",">>, InvalidServers), <<" format is error">>])} + end; + false -> + {error, <<"bootstrap_servers is empty">>} + end. + +-spec check_mqtt_argument(tuple()) -> ok | {error, Reason :: binary()}. +check_mqtt_argument({host, Host}) -> + case Host /= <<>> of + true -> + ok; + false -> + {error, <<"host is empty">>} + end; +check_mqtt_argument({port, Port}) -> + case is_integer(Port) andalso Port > 0 of + true -> + ok; + false -> + {error, <<"port invalid">>} + end; +check_mqtt_argument({username, Username}) -> + case Username /= <<>> of + true -> + ok; + false -> + {error, <<"username is empty">>} + end; +check_mqtt_argument({password, Password}) -> + case Password /= <<>> of + true -> + ok; + false -> + {error, <<"password is empty">>} + end; +check_mqtt_argument({topic, Topic}) -> + case Topic /= <<>> of + true -> + ok; + false -> + {error, <<"topic is empty">>} + end; +check_mqtt_argument({qos, Qos}) -> + case is_integer(Qos) andalso lists:member(Qos, [0, 1, 2]) of + true -> + ok; + false -> + {error, <<"qos invalid">>} + end. \ No newline at end of file diff --git a/apps/iot/src/http_handlers/endpoint_handler.erl b/apps/iot/src/http_handlers/endpoint_handler.erl index cee9c93..de9c215 100644 --- a/apps/iot/src/http_handlers/endpoint_handler.erl +++ b/apps/iot/src/http_handlers/endpoint_handler.erl @@ -98,6 +98,99 @@ handle_request("POST", "/endpoint/restart", _, #{<<"id">> := Id}) when is_intege end end; +%% http接口的测试 +handle_request("POST", "/endpoint/test", _, #{<<"protocol">> := <<"http">>, <<"config">> := #{<<"url">> := Url, <<"pool_size">> := PoolSize}}) when is_integer(PoolSize), PoolSize > 0 -> + Body = <<"">>, + ContentType = "application/json", + case httpc:request(post, {Url, [], ContentType, Body}, [], []) of + {ok, _} -> + {ok, 200, iot_util:json_data(<<"success">>)}; + {error, Reason} -> + lager:debug("[endpint_handler] test http: ~p, error: ~p", [Url, Reason]), + {ok, 200, iot_util:json_error(-1, <<"url failed">>)} + end; + +%% 测试mqtt +handle_request("POST", "/endpoint/test", _, #{<<"protocol">> := <<"mqtt">>, <<"config">> := Config}) -> + case endpoint:parse_config(<<"mqtt">>, Config) of + {ok, #mqtt_endpoint{host = Host, port = Port, username = Username, password = Password}} -> + %% 用户测试的client + ClientId = "mqtt_client_test:" ++ iot_util:rand_bytes(16), + Opts = [ + {owner, self()}, + {clientid, ClientId}, + {host, binary_to_list(Host)}, + {port, Port}, + {tcp_opts, []}, + {username, binary_to_list(Username)}, + {password, binary_to_list(Password)}, + {keepalive, 86400}, + {auto_ack, true}, + {connect_timeout, 5000}, + {proto_ver, v5}, + {retry_interval, 5000} + ], + + case emqtt:start_link(Opts) of + {ok, ConnPid} -> + lager:debug("[endpoint_mqtt] start connect, options: ~p", [Opts]), + case catch emqtt:connect(ConnPid, 5000) of + {ok, _} -> + lager:debug("[endpoint_mqtt] connect success, pid: ~p", [ConnPid]), + emqtt:stop(ConnPid), + {ok, 200, iot_util:json_data(<<"success">>)}; + {error, Reason} -> + lager:warning("[endpoint_mqtt] connect get error: ~p", [Reason]), + emqtt:stop(ConnPid), + {ok, 200, iot_util:json_error(-1, <<"connect mqtt server failed">>)}; + Error -> + lager:warning("[endpoint_mqtt] connect get error: ~p", [Error]), + emqtt:stop(ConnPid), + {ok, 200, iot_util:json_error(-1, <<"connect mqtt server failed">>)} + end; + Other -> + lager:warning("[endpoint_handler] test connect mqtt with options: ~p, get error: ~p", [Opts, Other]), + {ok, 200, iot_util:json_error(-1, <<"connect mqtt server failed">>)} + end; + {error, Errors} -> + {ok, 200, iot_util:json_error(-1, Errors)} + end; + +%% 测试kafka +handle_request("POST", "/endpoint/test", _, #{<<"protocol">> := <<"kafka">>, <<"config">> := Config}) -> + case endpoint:parse_config(<<"kafka">>, Config) of + {ok, #kafka_endpoint{sasl_config = SaslConfig, bootstrap_servers = BootstrapServers, topic = Topic}} -> + BaseConfig = [ + {reconnect_cool_down_seconds, 5}, + {socket_options, [{keepalive, true}]} + ], + + ClientConfig = case SaslConfig of + {Mechanism, Username, Password} -> + [{sasl, {Mechanism, Username, Password}}|BaseConfig]; + undefined -> + BaseConfig + end, + ClientId = list_to_atom("brod_client_test:" ++ iot_util:rand_bytes(16)), + + case catch brod:start_link_client(BootstrapServers, ClientId, ClientConfig) of + {ok, _ClientPid} -> + case brod:start_producer(ClientId, Topic, _ProducerConfig = []) of + ok -> + ok = brod:stop_client(ClientId), + {ok, 200, iot_util:json_data(<<"ok">>)}; + {error, Reason} -> + lager:debug("[endpoint_kafka] start_producer: ~p, get error: ~p", [ClientId, Reason]), + {ok, 200, iot_util:json_error(-1, <<"config kafka server failed">>)} + end; + Error -> + lager:debug("[endpoint_kafka] start_client: ~p, get error: ~p", [ClientId, Error]), + {ok, 200, iot_util:json_error(-1, <<"config kafka server failed">>)} + end; + {error, Errors} -> + {ok, 200, iot_util:json_error(-1, Errors)} + end; + handle_request(_, Path, _, _) -> Path1 = list_to_binary(Path), {ok, 200, iot_util:json_error(-1, <<"url: ", Path1/binary, " not found">>)}. \ No newline at end of file diff --git a/apps/iot/src/iot.app.src b/apps/iot/src/iot.app.src index 0073eb7..09b7e59 100644 --- a/apps/iot/src/iot.app.src +++ b/apps/iot/src/iot.app.src @@ -26,6 +26,7 @@ erts, runtime_tools, observer, + inets, kernel, stdlib ]}, diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index 14d30f5..e3d6c30 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -298,16 +298,16 @@ handle_event(cast, {handle, {data, #data{service_id = ServiceId, device_uuid = D RouteKey = get_route_key(RouteKey0), endpoint_subscription:publish(RouteKey, ServiceId, Metric), {keep_state, State}; +%% 事件流 +handle_event(cast, {handle, {event, #event{service_id = ServiceId, event_type = EventType, params = Params}}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true}) -> + lager:debug("[iot_host] event uuid: ~p, service_id: ~p, event_type: ~p, params: ~p", [UUID, ServiceId, EventType, Params]), + {keep_state, State}; %% ping的数据是通过aes加密后的,因此需要在有会话的情况下才行 handle_event(cast, {handle, {ping, Metrics}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true}) -> lager:debug("[iot_host] ping host_id uuid: ~p, get ping: ~p", [UUID, Metrics]), {keep_state, State#state{metrics = Metrics}}; -handle_event(cast, {handle, {event, #event{service_id = ServiceId, event_type = EventType, params = Params}}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true}) -> - lager:debug("[iot_host] event uuid: ~p, service_id: ~p, event_type: ~p, params: ~p", [UUID, ServiceId, EventType, Params]), - {keep_state, State}; - %% 心跳机制 handle_event(cast, heartbeat, _, State = #state{heartbeat_counter = HeartbeatCounter}) -> {keep_state, State#state{heartbeat_counter = HeartbeatCounter + 1}}; diff --git a/apps/iot/src/iot_util.erl b/apps/iot/src/iot_util.erl index 5267216..fc568dd 100644 --- a/apps/iot/src/iot_util.erl +++ b/apps/iot/src/iot_util.erl @@ -80,7 +80,7 @@ json_data(Data) -> <<"result">> => Data }, [force_utf8]). -json_error(ErrCode, ErrMessage) when is_integer(ErrCode), is_binary(ErrMessage) -> +json_error(ErrCode, ErrMessage) when is_integer(ErrCode) -> jiffy:encode(#{ <<"error">> => #{ <<"code">> => ErrCode,