simple influxdb

This commit is contained in:
anlicheng 2025-04-08 11:22:40 +08:00
parent 80c3508072
commit ba6b7d9f43

View File

@ -149,18 +149,10 @@ handle_cast({write_data, Bucket, Points}, State = #state{pool_pid = PoolPid, buf
{stop, Reason :: term(), NewState :: #state{}}).
handle_info({timeout, _, flush_ticker}, State = #state{buffers = Buffers, pool_pid = PoolPid}) ->
erlang:start_timer(5000, self(), flush_ticker),
Acc = lists:foldl(fun({Bucket, Q}, Acc0) ->
[begin
Len = queue:len(Q),
case Len > 0 of
true ->
poolboy:transaction(PoolPid, fun(Pid) -> influx_client:write(Pid, Bucket, ?DEFAULT_ORG, <<"ms">>, queue:to_list(Q)) end),
Acc0 + Len;
false ->
Acc0
end
end, 0, maps:to_list(Buffers)),
lager:debug("[influx_client_pool] flush_ticker acc write num: ~p", [Acc]),
Len > 0 andalso poolboy:transaction(PoolPid, fun(Pid) -> influx_client:write(Pid, Bucket, ?DEFAULT_ORG, <<"ms">>, queue:to_list(Q)) end)
end || {Bucket, Q} <- maps:to_list(Buffers)],
{noreply, State#state{buffers = #{}}};