diff --git a/apps/iot/src/influxdb/influx_client.erl b/apps/iot/src/influxdb/influx_client.erl index 30bbc24..ffe16b8 100644 --- a/apps/iot/src/influxdb/influx_client.erl +++ b/apps/iot/src/influxdb/influx_client.erl @@ -12,9 +12,10 @@ -behaviour(gen_server). %% API --export([start_link/1, write/4, write/5, write_data/4]). +-export([start_link/1, write/4, write/5, write_data/4, create_bucket/3]). -export([get_precision/1]). -export([get_bucket/1]). +-export([create_all_buckets/1]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -93,11 +94,24 @@ get_bucket(DeviceUUID) when is_binary(DeviceUUID) -> write(Pid, Bucket, Org, Points) when is_pid(Pid), is_binary(Bucket), is_binary(Org), is_list(Points) -> write(Pid, Bucket, Org, <<"ms">>, Points). +-spec create_all_buckets(SuperDeviceUUIDs :: list()) -> no_return(). +create_all_buckets(SuperDeviceUUIDs) when is_list(SuperDeviceUUIDs) -> + poolboy:transaction(influx_pool, fun(Pid) -> + lists:foreach(fun(DeviceUUID) -> + Bucket = <<"metric_", DeviceUUID/binary>>, + influx_client:create_bucket(Pid, ?DEFAULT_ORG, Bucket) + end, SuperDeviceUUIDs) + end). + %% Precision的值为: ms|ns|s; 表示时间的精度,默认为毫秒(ms) -spec write(Pid :: pid(), Bucket :: binary(), Org :: binary(), Precision :: binary(), Points :: list()) -> no_return(). write(Pid, Bucket, Org, Precision, Points) when is_pid(Pid), is_binary(Bucket), is_binary(Org), is_binary(Precision), is_list(Points) -> gen_server:cast(Pid, {write, Bucket, Org, Precision, Points}). +-spec create_bucket(Pid :: pid(), Org :: binary(), Bucket :: binary()) -> ok(). +create_bucket(Pid, Org, Bucket) when is_pid(Pid), is_binary(Org), is_binary(Bucket) -> + gen_server:call(Pid, {create_bucket, Org, Bucket}). + %% @doc Spawns the server and registers the local name (unique) -spec(start_link(Opts :: list()) -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). @@ -130,6 +144,48 @@ init([InfluxProps]) -> {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | {stop, Reason :: term(), NewState :: #state{}}). +handle_call({create_bucket, Org, Bucket}, _From, State = #state{host = Host, port = Port, token = Token}) -> + %% 处理headers + Headers = [ + {<<"Content-Type">>, <<"application/json">>}, + {<<"Authorization">>, <<"Token ", Token/binary>>} + ], + + Req = #{ + <<"name">> => Bucket, + <<"orgID">> => Org, + <<"retentionRules">> => [ + #{ + <<"type">> => <<"expire">>, + <<"everySeconds">> => 7776000 + } + ] + }, + Body = iolist_to_binary(jiffy:encode(Req, [force_utf8])), + + Url = uri_string:normalize(#{ + scheme => "http", + host => Host, + port => Port, + path => "/api/v2/buckets" + }), + + lager:debug("[influx_client] url is: ~p, headers: ~p, body: ~ts", [Url, Headers, Body]), + case hackney:request(post, Url, Headers, Body, [{pool, false}]) of + {ok, StatusCode, _RespHeaders, ClientRef} -> + case hackney:body(ClientRef) of + {ok, RespBody} -> + lager:debug("[influx_client] create_bucket status_code: ~p, response body is: ~p", [StatusCode, RespBody]); + {error, Error} -> + lager:warning("[influx_client] create_bucket status_code: ~p, error is: ~p", [Error]) + end, + hackney:close(ClientRef); + {error, Reason} -> + lager:warning("[influx_client] create_bucket result is: ~p", [Reason]) + end, + + {reply, ok, State}; + handle_call(_Request, _From, State = #state{}) -> {reply, ok, State}. diff --git a/apps/iot/src/iot_app.erl b/apps/iot/src/iot_app.erl index ea7eee3..7bffca8 100644 --- a/apps/iot/src/iot_app.erl +++ b/apps/iot/src/iot_app.erl @@ -27,6 +27,10 @@ start(_StartType, _StartArgs) -> %% 启动http服务 start_http_server(), + %% 创建特殊的buckets + SuperDeviceUUIDs = iot_env:get_value(super_device_uuids, []), + influx_client:create_all_buckets(SuperDeviceUUIDs), + {ok, SupPid}. stop(_State) ->