diff --git a/apps/iot/src/influxdb/influx_client.erl b/apps/iot/src/influxdb/influx_client.erl index c16379f..ebdf7d4 100644 --- a/apps/iot/src/influxdb/influx_client.erl +++ b/apps/iot/src/influxdb/influx_client.erl @@ -195,18 +195,18 @@ handle_cast({write, Bucket, Org, Precision, Points}, State = #state{host = Host, query => Query }), - lager:debug("[influx_client] url is: ~p, headers: ~p, body: ~ts", [Url, Headers, Body]), case hackney:request(post, Url, Headers, GZipBody, [{pool, false}]) of {ok, StatusCode, _RespHeaders, ClientRef} -> case hackney:body(ClientRef) of {ok, RespBody} -> - lager:debug("[influx_client] status_code: ~p, response body is: ~p", [StatusCode, RespBody]); + lager:debug("[influx_client] url: ~p, lines: ~p, first line: ~ts, status_code: ~p, response is: ~p", + [Url, length(Points), hd(PointLines), StatusCode, RespBody]); {error, Error} -> - lager:warning("[influx_client] status_code: ~p, error is: ~p", [Error]) + lager:warning("[influx_client] url: ~p, status_code: ~p, error is: ~p", [Url, Error]) end, hackney:close(ClientRef); {error, Reason} -> - lager:warning("[influx_client] request result is: ~p", [Reason]) + lager:warning("[influx_client] url: ~p, request result is: ~p", [Url, Reason]) end, {noreply, State}. diff --git a/apps/iot/src/influxdb/influx_client_pool.erl b/apps/iot/src/influxdb/influx_client_pool.erl index 4bbba4a..9137063 100644 --- a/apps/iot/src/influxdb/influx_client_pool.erl +++ b/apps/iot/src/influxdb/influx_client_pool.erl @@ -131,8 +131,10 @@ handle_cast({write_data, Bucket, Points}, State = #state{pool_pid = PoolPid, buf NQ = lists:foldl(fun(Point, Q0) -> queue:in(Point, Q0) end, Q, Points), %% 超过缓冲区设置的大小则批量导入 - case queue:len(NQ) >= ?BUFFER_SIZE of + Len = queue:len(NQ), + case Len >= ?BUFFER_SIZE of true -> + lager:debug("[influx_client_pool] bucket: ~p, flush buffer size: ~p", [Bucket, Len]), poolboy:transaction(PoolPid, fun(Pid) -> influx_client:write(Pid, Bucket, ?DEFAULT_ORG, <<"ms">>, queue:to_list(NQ)) end), {noreply, State#state{buffers = maps:put(Bucket, queue:new(), Buffers)}}; false -> @@ -148,14 +150,17 @@ handle_cast({write_data, Bucket, Points}, State = #state{pool_pid = PoolPid, buf handle_info({timeout, _, flush_ticker}, State = #state{buffers = Buffers, pool_pid = PoolPid}) -> erlang:start_timer(5000, self(), flush_ticker), - lists:foreach(fun({Bucket, Q}) -> - case queue:len(Q) > 0 of + Acc = lists:foldl(fun({Bucket, Q}, Acc0) -> + Len = queue:len(Q), + case Len > 0 of true -> - poolboy:transaction(PoolPid, fun(Pid) -> influx_client:write(Pid, Bucket, ?DEFAULT_ORG, <<"ms">>, queue:to_list(Q)) end); + poolboy:transaction(PoolPid, fun(Pid) -> influx_client:write(Pid, Bucket, ?DEFAULT_ORG, <<"ms">>, queue:to_list(Q)) end), + Acc0 + Len; false -> - ok + Acc0 end - end, maps:to_list(Buffers)), + end, 0, maps:to_list(Buffers)), + lager:debug("[influx_client_pool] flush_ticker acc write num: ~p", [Acc]), {noreply, State#state{buffers = #{}}};