add influx debug info
This commit is contained in:
parent
eaed497426
commit
7cf373e04f
@ -195,18 +195,18 @@ handle_cast({write, Bucket, Org, Precision, Points}, State = #state{host = Host,
|
|||||||
query => Query
|
query => Query
|
||||||
}),
|
}),
|
||||||
|
|
||||||
lager:debug("[influx_client] url is: ~p, headers: ~p, body: ~ts", [Url, Headers, Body]),
|
|
||||||
case hackney:request(post, Url, Headers, GZipBody, [{pool, false}]) of
|
case hackney:request(post, Url, Headers, GZipBody, [{pool, false}]) of
|
||||||
{ok, StatusCode, _RespHeaders, ClientRef} ->
|
{ok, StatusCode, _RespHeaders, ClientRef} ->
|
||||||
case hackney:body(ClientRef) of
|
case hackney:body(ClientRef) of
|
||||||
{ok, RespBody} ->
|
{ok, RespBody} ->
|
||||||
lager:debug("[influx_client] status_code: ~p, response body is: ~p", [StatusCode, RespBody]);
|
lager:debug("[influx_client] url: ~p, lines: ~p, first line: ~ts, status_code: ~p, response is: ~p",
|
||||||
|
[Url, length(Points), hd(PointLines), StatusCode, RespBody]);
|
||||||
{error, Error} ->
|
{error, Error} ->
|
||||||
lager:warning("[influx_client] status_code: ~p, error is: ~p", [Error])
|
lager:warning("[influx_client] url: ~p, status_code: ~p, error is: ~p", [Url, Error])
|
||||||
end,
|
end,
|
||||||
hackney:close(ClientRef);
|
hackney:close(ClientRef);
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
lager:warning("[influx_client] request result is: ~p", [Reason])
|
lager:warning("[influx_client] url: ~p, request result is: ~p", [Url, Reason])
|
||||||
end,
|
end,
|
||||||
|
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|||||||
@ -131,8 +131,10 @@ handle_cast({write_data, Bucket, Points}, State = #state{pool_pid = PoolPid, buf
|
|||||||
NQ = lists:foldl(fun(Point, Q0) -> queue:in(Point, Q0) end, Q, Points),
|
NQ = lists:foldl(fun(Point, Q0) -> queue:in(Point, Q0) end, Q, Points),
|
||||||
|
|
||||||
%% 超过缓冲区设置的大小则批量导入
|
%% 超过缓冲区设置的大小则批量导入
|
||||||
case queue:len(NQ) >= ?BUFFER_SIZE of
|
Len = queue:len(NQ),
|
||||||
|
case Len >= ?BUFFER_SIZE of
|
||||||
true ->
|
true ->
|
||||||
|
lager:debug("[influx_client_pool] bucket: ~p, flush buffer size: ~p", [Bucket, Len]),
|
||||||
poolboy:transaction(PoolPid, fun(Pid) -> influx_client:write(Pid, Bucket, ?DEFAULT_ORG, <<"ms">>, queue:to_list(NQ)) end),
|
poolboy:transaction(PoolPid, fun(Pid) -> influx_client:write(Pid, Bucket, ?DEFAULT_ORG, <<"ms">>, queue:to_list(NQ)) end),
|
||||||
{noreply, State#state{buffers = maps:put(Bucket, queue:new(), Buffers)}};
|
{noreply, State#state{buffers = maps:put(Bucket, queue:new(), Buffers)}};
|
||||||
false ->
|
false ->
|
||||||
@ -148,14 +150,17 @@ handle_cast({write_data, Bucket, Points}, State = #state{pool_pid = PoolPid, buf
|
|||||||
handle_info({timeout, _, flush_ticker}, State = #state{buffers = Buffers, pool_pid = PoolPid}) ->
|
handle_info({timeout, _, flush_ticker}, State = #state{buffers = Buffers, pool_pid = PoolPid}) ->
|
||||||
erlang:start_timer(5000, self(), flush_ticker),
|
erlang:start_timer(5000, self(), flush_ticker),
|
||||||
|
|
||||||
lists:foreach(fun({Bucket, Q}) ->
|
Acc = lists:foldl(fun({Bucket, Q}, Acc0) ->
|
||||||
case queue:len(Q) > 0 of
|
Len = queue:len(Q),
|
||||||
|
case Len > 0 of
|
||||||
true ->
|
true ->
|
||||||
poolboy:transaction(PoolPid, fun(Pid) -> influx_client:write(Pid, Bucket, ?DEFAULT_ORG, <<"ms">>, queue:to_list(Q)) end);
|
poolboy:transaction(PoolPid, fun(Pid) -> influx_client:write(Pid, Bucket, ?DEFAULT_ORG, <<"ms">>, queue:to_list(Q)) end),
|
||||||
|
Acc0 + Len;
|
||||||
false ->
|
false ->
|
||||||
ok
|
Acc0
|
||||||
end
|
end
|
||||||
end, maps:to_list(Buffers)),
|
end, 0, maps:to_list(Buffers)),
|
||||||
|
lager:debug("[influx_client_pool] flush_ticker acc write num: ~p", [Acc]),
|
||||||
|
|
||||||
{noreply, State#state{buffers = #{}}};
|
{noreply, State#state{buffers = #{}}};
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user