diff --git a/apps/iot/priv/env.config b/apps/iot/priv/env.config index f58cd89..0c89ab0 100644 --- a/apps/iot/priv/env.config +++ b/apps/iot/priv/env.config @@ -1,5 +1,29 @@ [ {super_device_uuids, [ + <<"30114946327406592017252591921319">>, + <<"30114747234700902417252587174574">>, + <<"30114595624033075217252583559890">>, + <<"30114100406754099217252571752996">>, + <<"30114013466381926417252569680176">>, + <<"30008218963725926417250047342671">>, + <<"30008101126013337617250044533208">>, + <<"30007942149728256017250040742912">>, + <<"30007735838625382417250035824079">>, + <<"30007384475487846417250027446924">>, + <<"30006962058743398417250017375721">>, + <<"30006822479436595217250014047898">>, + <<"30006688212215808017250010846715">>, + <<"30006604734241587217250008856441">>, + <<"30006271076938956817250000901434">>, + <<"30006167699508838417249998436728">>, + <<"30005985602122956817249994095184">>, + <<"30005889040436838417249991792972">>, + <<"30005793894328320017249989524512">>, + <<"30005663919558246417249986425677">>, + <<"30005503095682252817249982591332">>, + <<"29975356847319040017249263848766">>, + <<"29974871910698598417249252286976">>, + <<"29973464758616473617249218737857">>, <<"29717641877153792017243119445063">> ]} ]. \ No newline at end of file diff --git a/apps/iot/src/influxdb/influx_client_pool.erl b/apps/iot/src/influxdb/influx_client_pool.erl index 36b1beb..4bbba4a 100644 --- a/apps/iot/src/influxdb/influx_client_pool.erl +++ b/apps/iot/src/influxdb/influx_client_pool.erl @@ -20,11 +20,16 @@ -define(SERVER, ?MODULE). +%% buffer的数量 +-define(BUFFER_SIZE, 1500). + -define(DEFAULT_BUCKET, <<"metric">>). -define(DEFAULT_ORG, <<"nannong">>). -record(state, { - pool_pid :: pid() + pool_pid :: pid(), + %% 缓冲区, 格式: #{bucket => []} + buffers = #{} }). %%%=================================================================== @@ -39,23 +44,24 @@ write_data(Measurement, Tags, FieldsList, Timestamp) when is_binary(Measurement) #{<<"key">> := Key} -> Values = maps:remove(<<"key">>, Fields), NFields = #{Key => Values}, - Point = influx_point:new(Measurement, Tags, NFields, Timestamp), + Point = influx_point:new(Measurement, Tags, NFields, format_timestamp(Timestamp)), [Point]; _ -> [] end end, FieldsList), - - Precision = case length(integer_to_list(Timestamp)) of - 10 -> <<"s">>; - 13 -> <<"ms">>; - 16 -> <<"u">>; - 19 -> <<"ns">>; - _ -> <<"ms">> - end, Bucket = get_bucket(Measurement), - gen_server:cast(?SERVER, {write_data, Bucket, ?DEFAULT_ORG, Precision, Points}). + gen_server:cast(?SERVER, {write_data, Bucket, Points}). + +format_timestamp(Timestamp) when is_integer(Timestamp) -> + case length(integer_to_list(Timestamp)) of + 10 -> Timestamp * 1000; + 13 -> Timestamp; + 16 -> Timestamp div 1000; + 19 -> Timestamp div 1000_000; + _ -> Timestamp + end. -spec get_bucket(DeviceUUID :: binary()) -> binary(). get_bucket(DeviceUUID) when is_binary(DeviceUUID) -> @@ -94,6 +100,9 @@ init([]) -> Result = catch create_buckets(PoolPid, SuperDeviceUUIDs), lager:debug("[influx_client_pool] create buckets result is: ~p", [Result]), + %% 定时刷新逻辑 + erlang:start_timer(5000, self(), flush_ticker), + {ok, #state{pool_pid = PoolPid}}. %% @private @@ -115,11 +124,20 @@ handle_call(_Request, _From, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_cast({write_data, _Bucket, _Org, _Precision, []}, State) -> +handle_cast({write_data, _Bucket, []}, State) -> {noreply, State}; -handle_cast({write_data, Bucket, Org, Precision, Points}, State = #state{pool_pid = PoolPid}) -> - poolboy:transaction(PoolPid, fun(Pid) -> influx_client:write(Pid, Bucket, Org, Precision, Points) end), - {noreply, State}. +handle_cast({write_data, Bucket, Points}, State = #state{pool_pid = PoolPid, buffers = Buffers}) -> + Q = maps:get(Bucket, Buffers, queue:new()), + NQ = lists:foldl(fun(Point, Q0) -> queue:in(Point, Q0) end, Q, Points), + + %% 超过缓冲区设置的大小则批量导入 + case queue:len(NQ) >= ?BUFFER_SIZE of + true -> + poolboy:transaction(PoolPid, fun(Pid) -> influx_client:write(Pid, Bucket, ?DEFAULT_ORG, <<"ms">>, queue:to_list(NQ)) end), + {noreply, State#state{buffers = maps:put(Bucket, queue:new(), Buffers)}}; + false -> + {noreply, State#state{buffers = maps:put(Bucket, NQ, Buffers)}} + end. %% @private %% @doc Handling all non call/cast messages @@ -127,6 +145,20 @@ handle_cast({write_data, Bucket, Org, Precision, Points}, State = #state{pool_pi {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). +handle_info({timeout, _, flush_ticker}, State = #state{buffers = Buffers, pool_pid = PoolPid}) -> + erlang:start_timer(5000, self(), flush_ticker), + + lists:foreach(fun({Bucket, Q}) -> + case queue:len(Q) > 0 of + true -> + poolboy:transaction(PoolPid, fun(Pid) -> influx_client:write(Pid, Bucket, ?DEFAULT_ORG, <<"ms">>, queue:to_list(Q)) end); + false -> + ok + end + end, maps:to_list(Buffers)), + + {noreply, State#state{buffers = #{}}}; + handle_info(_Info, State = #state{}) -> {noreply, State}.