This commit is contained in:
anlicheng 2023-08-08 17:00:02 +08:00
parent 373522a976
commit 8ec8b054df

View File

@ -133,30 +133,58 @@ make_endpoint([{<<"config">>, #{<<"protocol">> := <<"https">>, <<"args">> := #{<
make_endpoint(Params, Endpoint#endpoint{config = #http_endpoint{url = Url}});
make_endpoint([{<<"config">>, #{<<"protocol">> := <<"ws">>, <<"args">> := #{<<"url">> := Url}}} | Params], Endpoint) when Url /= <<>> ->
make_endpoint(Params, Endpoint#endpoint{config = #ws_endpoint{url = Url}});
make_endpoint([{<<"config">>, #{<<"protocol">> := <<"kafka">>, <<"args">> := #{<<"username">> := Username, <<"password">> := Password, <<"bootstrap_servers">> := BootstrapServers, <<"topic">> := Topic}}} | Params], Endpoint)
when is_binary(Username) andalso Username /= <<>>
andalso is_binary(Password) andalso Password /= <<>>
andalso is_list(BootstrapServers) andalso length(BootstrapServers) > 0
andalso is_binary(Topic) andalso Topic /= <<>> ->
make_endpoint([{<<"config">>, #{<<"protocol">> := <<"kafka">>, <<"args">> := #{<<"username">> := Username, <<"password">> := Password, <<"bootstrap_servers">> := BootstrapServers, <<"topic">> := Topic}}} | Params], Endpoint) ->
if
not is_binary(Username) orelse Username =:= <<>> ->
throw("username is invalid");
not is_binary(Password) orelse Password =:= <<>> ->
throw("password is invalid");
not (is_list(BootstrapServers) andalso length(BootstrapServers) > 0) ->
throw("bootstrap_servers is invalid");
not is_binary(Topic) orelse Topic =:= <<>> ->
throw("topic is invalid");
true ->
ok
end,
make_endpoint(Params, Endpoint#endpoint{config = #kafka_endpoint{username = Username, password = Password, bootstrap_servers = BootstrapServers, topic = Topic}});
make_endpoint([{<<"config">>, #{<<"protocol">> := <<"mqtt">>, <<"args">> := #{<<"host">> := Host, <<"port">> := Port, <<"username">> := Username, <<"password">> := Password, <<"topic">> := Topic, <<"qos">> := Qos}}} | Params], Endpoint)
when is_binary(Username) andalso Username /= <<>>
andalso is_binary(Password) andalso Password /= <<>>
andalso is_binary(Host) andalso Host /= <<>>
andalso is_integer(Port) andalso Port > 0
andalso (Qos == 0 orelse Qos == 1 orelse Qos == 2)
andalso is_binary(Topic) andalso Topic /= <<>> ->
make_endpoint([{<<"config">>, #{<<"protocol">> := <<"mqtt">>, <<"args">> := #{<<"host">> := Host, <<"port">> := Port, <<"username">> := Username, <<"password">> := Password, <<"topic">> := Topic, <<"qos">> := Qos}}} | Params], Endpoint) ->
if
not is_binary(Username) orelse Username =:= <<>> ->
throw("username is invalid");
not is_binary(Password) orelse Password =:= <<>> ->
throw("password is invalid");
not is_binary(Topic) orelse Topic =:= <<>> ->
throw("topic is invalid");
not is_binary(Host) orelse Host =:= <<>> ->
throw("host is invalid");
not is_integer(Port) orelse Port =< 0 ->
throw("port is invalid");
not (Qos == 0 orelse Qos == 1 orelse Qos == 2) ->
throw("qos is invalid");
true ->
ok
end,
make_endpoint(Params, Endpoint#endpoint{config = #mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos}});
%% mysql的支持
make_endpoint([{<<"config">>, #{<<"protocol">> := <<"mysql">>, <<"args">> := #{<<"host">> := Host, <<"port">> := Port, <<"username">> := Username, <<"password">> := Password, <<"databse">> := Database, <<"table_name">> := TableName}}} | Params], Endpoint)
when is_binary(Username) andalso Username /= <<>>
andalso is_binary(Password) andalso Password /= <<>>
andalso is_binary(Host) andalso Host /= <<>>
andalso is_integer(Port) andalso Port > 0
andalso is_binary(Database) andalso Database /= <<>>
andalso is_binary(TableName) andalso TableName /= <<>> ->
make_endpoint([{<<"config">>, #{<<"protocol">> := <<"mysql">>, <<"args">> := #{<<"host">> := Host, <<"port">> := Port, <<"username">> := Username, <<"password">> := Password, <<"databse">> := Database, <<"table_name">> := TableName}}} | Params], Endpoint) ->
if
not is_binary(Username) orelse Username =:= <<>> ->
throw("username is invalid");
not is_binary(Password) orelse Password =:= <<>> ->
throw("password is invalid");
not is_binary(Host) orelse Host =:= <<>> ->
throw("host is invalid");
not is_integer(Port) orelse Port =< 0 ->
throw("port is invalid");
not is_binary(Database) orelse Database =:= <<>> ->
throw("database is invalid");
not is_binary(TableName) orelse TableName =:= <<>> ->
throw("table_name is invalid");
true ->
ok
end,
make_endpoint(Params, Endpoint#endpoint{config = #mysql_endpoint{host = Host, port = Port, username = Username, password = Password, database = Database, table_name = TableName}});
make_endpoint([{<<"config">>, Config} | _], _) ->