bat write influxdb
This commit is contained in:
parent
4883a5d519
commit
eaed497426
@ -1,5 +1,29 @@
|
|||||||
[
|
[
|
||||||
{super_device_uuids, [
|
{super_device_uuids, [
|
||||||
|
<<"30114946327406592017252591921319">>,
|
||||||
|
<<"30114747234700902417252587174574">>,
|
||||||
|
<<"30114595624033075217252583559890">>,
|
||||||
|
<<"30114100406754099217252571752996">>,
|
||||||
|
<<"30114013466381926417252569680176">>,
|
||||||
|
<<"30008218963725926417250047342671">>,
|
||||||
|
<<"30008101126013337617250044533208">>,
|
||||||
|
<<"30007942149728256017250040742912">>,
|
||||||
|
<<"30007735838625382417250035824079">>,
|
||||||
|
<<"30007384475487846417250027446924">>,
|
||||||
|
<<"30006962058743398417250017375721">>,
|
||||||
|
<<"30006822479436595217250014047898">>,
|
||||||
|
<<"30006688212215808017250010846715">>,
|
||||||
|
<<"30006604734241587217250008856441">>,
|
||||||
|
<<"30006271076938956817250000901434">>,
|
||||||
|
<<"30006167699508838417249998436728">>,
|
||||||
|
<<"30005985602122956817249994095184">>,
|
||||||
|
<<"30005889040436838417249991792972">>,
|
||||||
|
<<"30005793894328320017249989524512">>,
|
||||||
|
<<"30005663919558246417249986425677">>,
|
||||||
|
<<"30005503095682252817249982591332">>,
|
||||||
|
<<"29975356847319040017249263848766">>,
|
||||||
|
<<"29974871910698598417249252286976">>,
|
||||||
|
<<"29973464758616473617249218737857">>,
|
||||||
<<"29717641877153792017243119445063">>
|
<<"29717641877153792017243119445063">>
|
||||||
]}
|
]}
|
||||||
].
|
].
|
||||||
@ -20,11 +20,16 @@
|
|||||||
|
|
||||||
-define(SERVER, ?MODULE).
|
-define(SERVER, ?MODULE).
|
||||||
|
|
||||||
|
%% buffer的数量
|
||||||
|
-define(BUFFER_SIZE, 1500).
|
||||||
|
|
||||||
-define(DEFAULT_BUCKET, <<"metric">>).
|
-define(DEFAULT_BUCKET, <<"metric">>).
|
||||||
-define(DEFAULT_ORG, <<"nannong">>).
|
-define(DEFAULT_ORG, <<"nannong">>).
|
||||||
|
|
||||||
-record(state, {
|
-record(state, {
|
||||||
pool_pid :: pid()
|
pool_pid :: pid(),
|
||||||
|
%% 缓冲区, 格式: #{bucket => []}
|
||||||
|
buffers = #{}
|
||||||
}).
|
}).
|
||||||
|
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
@ -39,23 +44,24 @@ write_data(Measurement, Tags, FieldsList, Timestamp) when is_binary(Measurement)
|
|||||||
#{<<"key">> := Key} ->
|
#{<<"key">> := Key} ->
|
||||||
Values = maps:remove(<<"key">>, Fields),
|
Values = maps:remove(<<"key">>, Fields),
|
||||||
NFields = #{Key => Values},
|
NFields = #{Key => Values},
|
||||||
Point = influx_point:new(Measurement, Tags, NFields, Timestamp),
|
Point = influx_point:new(Measurement, Tags, NFields, format_timestamp(Timestamp)),
|
||||||
[Point];
|
[Point];
|
||||||
_ ->
|
_ ->
|
||||||
[]
|
[]
|
||||||
end
|
end
|
||||||
end, FieldsList),
|
end, FieldsList),
|
||||||
|
|
||||||
Precision = case length(integer_to_list(Timestamp)) of
|
|
||||||
10 -> <<"s">>;
|
|
||||||
13 -> <<"ms">>;
|
|
||||||
16 -> <<"u">>;
|
|
||||||
19 -> <<"ns">>;
|
|
||||||
_ -> <<"ms">>
|
|
||||||
end,
|
|
||||||
Bucket = get_bucket(Measurement),
|
Bucket = get_bucket(Measurement),
|
||||||
|
|
||||||
gen_server:cast(?SERVER, {write_data, Bucket, ?DEFAULT_ORG, Precision, Points}).
|
gen_server:cast(?SERVER, {write_data, Bucket, Points}).
|
||||||
|
|
||||||
|
format_timestamp(Timestamp) when is_integer(Timestamp) ->
|
||||||
|
case length(integer_to_list(Timestamp)) of
|
||||||
|
10 -> Timestamp * 1000;
|
||||||
|
13 -> Timestamp;
|
||||||
|
16 -> Timestamp div 1000;
|
||||||
|
19 -> Timestamp div 1000_000;
|
||||||
|
_ -> Timestamp
|
||||||
|
end.
|
||||||
|
|
||||||
-spec get_bucket(DeviceUUID :: binary()) -> binary().
|
-spec get_bucket(DeviceUUID :: binary()) -> binary().
|
||||||
get_bucket(DeviceUUID) when is_binary(DeviceUUID) ->
|
get_bucket(DeviceUUID) when is_binary(DeviceUUID) ->
|
||||||
@ -94,6 +100,9 @@ init([]) ->
|
|||||||
Result = catch create_buckets(PoolPid, SuperDeviceUUIDs),
|
Result = catch create_buckets(PoolPid, SuperDeviceUUIDs),
|
||||||
lager:debug("[influx_client_pool] create buckets result is: ~p", [Result]),
|
lager:debug("[influx_client_pool] create buckets result is: ~p", [Result]),
|
||||||
|
|
||||||
|
%% 定时刷新逻辑
|
||||||
|
erlang:start_timer(5000, self(), flush_ticker),
|
||||||
|
|
||||||
{ok, #state{pool_pid = PoolPid}}.
|
{ok, #state{pool_pid = PoolPid}}.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
@ -115,11 +124,20 @@ handle_call(_Request, _From, State = #state{}) ->
|
|||||||
{noreply, NewState :: #state{}} |
|
{noreply, NewState :: #state{}} |
|
||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
handle_cast({write_data, _Bucket, _Org, _Precision, []}, State) ->
|
handle_cast({write_data, _Bucket, []}, State) ->
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
handle_cast({write_data, Bucket, Org, Precision, Points}, State = #state{pool_pid = PoolPid}) ->
|
handle_cast({write_data, Bucket, Points}, State = #state{pool_pid = PoolPid, buffers = Buffers}) ->
|
||||||
poolboy:transaction(PoolPid, fun(Pid) -> influx_client:write(Pid, Bucket, Org, Precision, Points) end),
|
Q = maps:get(Bucket, Buffers, queue:new()),
|
||||||
{noreply, State}.
|
NQ = lists:foldl(fun(Point, Q0) -> queue:in(Point, Q0) end, Q, Points),
|
||||||
|
|
||||||
|
%% 超过缓冲区设置的大小则批量导入
|
||||||
|
case queue:len(NQ) >= ?BUFFER_SIZE of
|
||||||
|
true ->
|
||||||
|
poolboy:transaction(PoolPid, fun(Pid) -> influx_client:write(Pid, Bucket, ?DEFAULT_ORG, <<"ms">>, queue:to_list(NQ)) end),
|
||||||
|
{noreply, State#state{buffers = maps:put(Bucket, queue:new(), Buffers)}};
|
||||||
|
false ->
|
||||||
|
{noreply, State#state{buffers = maps:put(Bucket, NQ, Buffers)}}
|
||||||
|
end.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
%% @doc Handling all non call/cast messages
|
%% @doc Handling all non call/cast messages
|
||||||
@ -127,6 +145,20 @@ handle_cast({write_data, Bucket, Org, Precision, Points}, State = #state{pool_pi
|
|||||||
{noreply, NewState :: #state{}} |
|
{noreply, NewState :: #state{}} |
|
||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
|
handle_info({timeout, _, flush_ticker}, State = #state{buffers = Buffers, pool_pid = PoolPid}) ->
|
||||||
|
erlang:start_timer(5000, self(), flush_ticker),
|
||||||
|
|
||||||
|
lists:foreach(fun({Bucket, Q}) ->
|
||||||
|
case queue:len(Q) > 0 of
|
||||||
|
true ->
|
||||||
|
poolboy:transaction(PoolPid, fun(Pid) -> influx_client:write(Pid, Bucket, ?DEFAULT_ORG, <<"ms">>, queue:to_list(Q)) end);
|
||||||
|
false ->
|
||||||
|
ok
|
||||||
|
end
|
||||||
|
end, maps:to_list(Buffers)),
|
||||||
|
|
||||||
|
{noreply, State#state{buffers = #{}}};
|
||||||
|
|
||||||
handle_info(_Info, State = #state{}) ->
|
handle_info(_Info, State = #state{}) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user