code format
This commit is contained in:
parent
d8ce62f42c
commit
40421eddc8
@ -44,12 +44,7 @@ handle_request("GET", "/endpoint/info", #{<<"name">> := Name}, _) ->
|
||||
%% 重新加载对应的主机信息
|
||||
handle_request("POST", "/endpoint/create", _, Params = #{<<"name">> := Name}) ->
|
||||
RequiredFields = [<<"name">>, <<"title">>, <<"matcher">>, <<"mapper">>, <<"config">>],
|
||||
case lists:all(fun(Key) -> maps:is_key(Key, Params) end, RequiredFields) of
|
||||
true ->
|
||||
ok;
|
||||
false ->
|
||||
throw(<<"missed required param">>)
|
||||
end,
|
||||
iot_util:assert(lists:all(fun(Key) -> maps:is_key(Key, Params) end, RequiredFields), <<"missed required param">>),
|
||||
|
||||
case mnesia_endpoint:get_endpoint(Name) of
|
||||
undefined ->
|
||||
@ -144,57 +139,32 @@ make_endpoint([{<<"config">>, #{<<"protocol">> := <<"https">>, <<"args">> := #{<
|
||||
make_endpoint([{<<"config">>, #{<<"protocol">> := <<"ws">>, <<"args">> := #{<<"url">> := Url}}} | Params], Endpoint) when Url /= <<>> ->
|
||||
make_endpoint(Params, Endpoint#endpoint{config = #ws_endpoint{url = Url}});
|
||||
make_endpoint([{<<"config">>, #{<<"protocol">> := <<"kafka">>, <<"args">> := #{<<"username">> := Username, <<"password">> := Password, <<"bootstrap_servers">> := BootstrapServers, <<"topic">> := Topic}}} | Params], Endpoint) ->
|
||||
if
|
||||
not is_binary(Username) orelse Username =:= <<>> ->
|
||||
throw("username is invalid");
|
||||
not is_binary(Password) orelse Password =:= <<>> ->
|
||||
throw("password is invalid");
|
||||
not (is_list(BootstrapServers) andalso length(BootstrapServers) > 0) ->
|
||||
throw("bootstrap_servers is invalid");
|
||||
not is_binary(Topic) orelse Topic =:= <<>> ->
|
||||
throw("topic is invalid");
|
||||
true ->
|
||||
ok
|
||||
end,
|
||||
iot_util:assert(is_binary(Username) andalso Username /= <<>>, <<"username is invalid">>),
|
||||
iot_util:assert(is_binary(Password) andalso Password /= <<>>, <<"password is invalid">>),
|
||||
iot_util:assert(is_list(BootstrapServers) andalso length(BootstrapServers) > 0, <<"bootstrap_servers is invalid">>),
|
||||
iot_util:assert(is_binary(Topic) andalso Topic /= <<>>, <<"topic is invalid">>),
|
||||
|
||||
make_endpoint(Params, Endpoint#endpoint{config = #kafka_endpoint{username = Username, password = Password, bootstrap_servers = BootstrapServers, topic = Topic}});
|
||||
|
||||
make_endpoint([{<<"config">>, #{<<"protocol">> := <<"mqtt">>, <<"args">> := #{<<"host">> := Host, <<"port">> := Port, <<"username">> := Username, <<"password">> := Password, <<"topic">> := Topic, <<"qos">> := Qos}}} | Params], Endpoint) ->
|
||||
if
|
||||
not is_binary(Username) orelse Username =:= <<>> ->
|
||||
throw("username is invalid");
|
||||
not is_binary(Password) orelse Password =:= <<>> ->
|
||||
throw("password is invalid");
|
||||
not is_binary(Topic) orelse Topic =:= <<>> ->
|
||||
throw("topic is invalid");
|
||||
not is_binary(Host) orelse Host =:= <<>> ->
|
||||
throw("host is invalid");
|
||||
not is_integer(Port) orelse Port =< 0 ->
|
||||
throw("port is invalid");
|
||||
not (Qos == 0 orelse Qos == 1 orelse Qos == 2) ->
|
||||
throw("qos is invalid");
|
||||
true ->
|
||||
ok
|
||||
end,
|
||||
iot_util:assert(is_binary(Username) andalso Username /= <<>>, <<"username is invalid">>),
|
||||
iot_util:assert(is_binary(Password) andalso Password /= <<>>, <<"password is invalid">>),
|
||||
iot_util:assert(is_binary(Topic) andalso Topic /= <<>>, <<"topic is invalid">>),
|
||||
iot_util:assert(is_binary(Host) andalso Host /= <<>>, <<"host is invalid">>),
|
||||
iot_util:assert(is_integer(Port) andalso Port > 0, <<"port is invalid">>),
|
||||
iot_util:assert(Qos == 0 orelse Qos == 1 orelse Qos == 2, <<"qos is invalid">>),
|
||||
|
||||
make_endpoint(Params, Endpoint#endpoint{config = #mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos}});
|
||||
|
||||
%% mysql的支持
|
||||
make_endpoint([{<<"config">>, #{<<"protocol">> := <<"mysql">>, <<"args">> := #{<<"host">> := Host, <<"port">> := Port, <<"username">> := Username, <<"password">> := Password, <<"database">> := Database, <<"table_name">> := TableName}}} | Params], Endpoint) ->
|
||||
if
|
||||
not is_binary(Username) orelse Username =:= <<>> ->
|
||||
throw("username is invalid");
|
||||
not is_binary(Password) orelse Password =:= <<>> ->
|
||||
throw("password is invalid");
|
||||
not is_binary(Host) orelse Host =:= <<>> ->
|
||||
throw("host is invalid");
|
||||
not is_integer(Port) orelse Port =< 0 ->
|
||||
throw("port is invalid");
|
||||
not is_binary(Database) orelse Database =:= <<>> ->
|
||||
throw("database is invalid");
|
||||
not is_binary(TableName) orelse TableName =:= <<>> ->
|
||||
throw("table_name is invalid");
|
||||
true ->
|
||||
ok
|
||||
end,
|
||||
iot_util:assert(is_binary(Username) andalso Username /= <<>>, <<"username is invalid">>),
|
||||
iot_util:assert(is_binary(Password) andalso Password /= <<>>, <<"password is invalid">>),
|
||||
iot_util:assert(is_binary(Host) andalso Host /= <<>>, <<"host is invalid">>),
|
||||
iot_util:assert(is_integer(Port) andalso Port > 0, <<"port is invalid">>),
|
||||
iot_util:assert(is_binary(Database) andalso Database /= <<>>, <<"database is invalid">>),
|
||||
iot_util:assert(is_binary(TableName) andalso TableName /= <<>>, <<"table_name is invalid">>),
|
||||
|
||||
make_endpoint(Params, Endpoint#endpoint{config = #mysql_endpoint{host = Host, port = Port, username = Username, password = Password, database = Database, table_name = TableName}});
|
||||
|
||||
make_endpoint([{<<"config">>, Config} | _], _) ->
|
||||
|
||||
@ -37,9 +37,9 @@ handle_request("POST", "/host/reload", _, #{<<"uuid">> := UUID}) when is_binary(
|
||||
lager:debug("[host_handler] will reload host uuid: ~p", [UUID]),
|
||||
case iot_host_sup:ensured_host_started(UUID) of
|
||||
{ok, Pid} when is_pid(Pid) ->
|
||||
lager:debug("[host_handler] already_started reload host uuid: ~p, success", [UUID]),
|
||||
case iot_host:reload(Pid) of
|
||||
ok ->
|
||||
lager:debug("[host_handler] already_started reload host uuid: ~p, success", [UUID]),
|
||||
{ok, 200, iot_util:json_data(<<"success">>)};
|
||||
{error, ReloadError} ->
|
||||
lager:debug("[host_handler] reload host uuid: ~p, error: ~p", [UUID, ReloadError]),
|
||||
@ -52,10 +52,9 @@ handle_request("POST", "/host/reload", _, #{<<"uuid">> := UUID}) when is_binary(
|
||||
|
||||
%% 删除对应的主机信息
|
||||
handle_request("POST", "/host/delete", _, #{<<"uuid">> := UUID}) when is_binary(UUID) ->
|
||||
lager:debug("[host_handler] will delete host uuid: ~p", [UUID]),
|
||||
|
||||
case iot_host_sup:delete_host(UUID) of
|
||||
ok ->
|
||||
lager:debug("[host_handler] will delete host uuid: ~p", [UUID]),
|
||||
{ok, 200, iot_util:json_data(<<"success">>)};
|
||||
{error, Reason} ->
|
||||
lager:debug("[host_handler] delete host uuid: ~p, get error is: ~p", [UUID, Reason]),
|
||||
|
||||
@ -12,7 +12,7 @@
|
||||
-behaviour(gen_server).
|
||||
|
||||
%% API
|
||||
-export([start_link/1, write/4, write/5, test/0]).
|
||||
-export([start_link/1, write/4, write/5]).
|
||||
-export([get_precision/1]).
|
||||
|
||||
%% gen_server callbacks
|
||||
@ -32,20 +32,6 @@
|
||||
%%% API
|
||||
%%%===================================================================
|
||||
|
||||
test() ->
|
||||
UUID = <<"device123123">>,
|
||||
|
||||
lists:foreach(fun(Id) ->
|
||||
Point = influx_point:new(<<"shui_biao">>,
|
||||
[{<<"uuid">>, UUID}, {<<"service_name">>, <<"shui_biao">>}],
|
||||
[{<<"cost">>, Id}],
|
||||
iot_util:timestamp()),
|
||||
|
||||
poolboy:transaction(influx_pool, fun(Pid) ->
|
||||
write(Pid, <<"iot">>, <<"iot">>, [Point])
|
||||
end)
|
||||
end, lists:seq(1, 100)).
|
||||
|
||||
%% 获取时间标识符号
|
||||
-spec get_precision(Timestamp :: integer()) -> binary().
|
||||
get_precision(Timestamp) when is_integer(Timestamp) ->
|
||||
|
||||
@ -13,7 +13,7 @@
|
||||
-export([timestamp/0, number_format/2, current_time/0, timestamp_of_seconds/0, float_to_binary/2, int_format/2]).
|
||||
-export([step/3, chunks/2, rand_bytes/1, uuid/0, md5/1, parse_mapper/1]).
|
||||
-export([json_data/1, json_error/2]).
|
||||
-export([queue_limited_in/3, assert_call/2]).
|
||||
-export([queue_limited_in/3, assert_call/2, assert/2]).
|
||||
|
||||
%% 时间,精确到毫秒
|
||||
timestamp() ->
|
||||
@ -130,4 +130,11 @@ float_to_binary(V, _) when is_integer(V) ->
|
||||
integer_to_binary(V);
|
||||
float_to_binary(V, Decimals) when is_float(V), is_integer(Decimals) ->
|
||||
S = float_to_list(V, [{decimals, Decimals}, compact]),
|
||||
list_to_binary(S).
|
||||
list_to_binary(S).
|
||||
|
||||
assert(true, _) ->
|
||||
ok;
|
||||
assert(false, F) when is_function(F) ->
|
||||
F();
|
||||
assert(false, Msg) ->
|
||||
throw(Msg).
|
||||
@ -82,12 +82,35 @@ to_map(#endpoint{name = Name, title = Title, matcher = Matcher, mapper = Mapper,
|
||||
}.
|
||||
|
||||
config_map(#http_endpoint{url = Url}) ->
|
||||
#{<<"protocol">> => <<"http">>, <<"args">> => #{<<"url">> => Url}};
|
||||
#{<<"protocol">> => <<"http">>, <<"args">> => #{
|
||||
<<"url">> => Url
|
||||
}};
|
||||
config_map(#ws_endpoint{url = Url}) ->
|
||||
#{<<"protocol">> => <<"ws">>, <<"args">> => #{<<"url">> => Url}};
|
||||
#{<<"protocol">> => <<"ws">>, <<"args">> => #{
|
||||
<<"url">> => Url
|
||||
}};
|
||||
config_map(#kafka_endpoint{username = Username, password = Password, bootstrap_servers = BootstrapServers, topic = Topic}) ->
|
||||
#{<<"protocol">> => <<"kafka">>, <<"args">> => #{<<"username">> => Username, <<"password">> => Password, <<"bootstrap_servers">> => BootstrapServers, <<"topic">> => Topic}};
|
||||
#{<<"protocol">> => <<"kafka">>, <<"args">> => #{
|
||||
<<"username">> => Username,
|
||||
<<"password">> => Password,
|
||||
<<"bootstrap_servers">> => BootstrapServers,
|
||||
<<"topic">> => Topic
|
||||
}};
|
||||
config_map(#mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos}) ->
|
||||
#{<<"protocol">> => <<"mqtt">>, <<"args">> => #{<<"host">> => Host, <<"port">> => Port, <<"username">> => Username, <<"password">> => Password, <<"topic">> => Topic, <<"qos">> => Qos}};
|
||||
#{<<"protocol">> => <<"mqtt">>, <<"args">> => #{
|
||||
<<"host">> => Host,
|
||||
<<"port">> => Port,
|
||||
<<"username">> => Username,
|
||||
<<"password">> => Password,
|
||||
<<"topic">> => Topic,
|
||||
<<"qos">> => Qos
|
||||
}};
|
||||
config_map(#mysql_endpoint{host = Host, port = Port, username = Username, password = Password, database = Database, table_name = TableName}) ->
|
||||
#{<<"protocol">> => <<"mysql">>, <<"args">> => #{<<"host">> => Host, <<"port">> => Port, <<"username">> => Username, <<"password">> => Password, <<"database">> => Database, <<"table_name">> => TableName}}.
|
||||
#{<<"protocol">> => <<"mysql">>, <<"args">> => #{
|
||||
<<"host">> => Host,
|
||||
<<"port">> => Port,
|
||||
<<"username">> => Username,
|
||||
<<"password">> => Password,
|
||||
<<"database">> => Database,
|
||||
<<"table_name">> => TableName
|
||||
}}.
|
||||
|
||||
@ -13,7 +13,21 @@
|
||||
%% API
|
||||
-export([rsa_encode/1]).
|
||||
-export([insert_services/1]).
|
||||
-export([insert_endpoints/0, test_http/0, test_mysql/0, test_mqtt/0]).
|
||||
-export([insert_endpoints/0, test_http/0, test_mysql/0, test_mqtt/0, test_influxdb/0]).
|
||||
|
||||
test_influxdb() ->
|
||||
UUID = <<"device123123">>,
|
||||
|
||||
lists:foreach(fun(Id) ->
|
||||
Point = influx_point:new(<<"shui_biao">>,
|
||||
[{<<"uuid">>, UUID}, {<<"service_name">>, <<"shui_biao">>}],
|
||||
[{<<"cost">>, Id}],
|
||||
iot_util:timestamp()),
|
||||
|
||||
poolboy:transaction(influx_pool, fun(Pid) ->
|
||||
influx_client:write(Pid, <<"iot">>, <<"iot">>, [Point])
|
||||
end)
|
||||
end, lists:seq(1, 100)).
|
||||
|
||||
test_http() ->
|
||||
Name = <<"zhongguodianli">>,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user