重新调整influxdb

This commit is contained in:
anlicheng 2024-09-03 14:39:27 +08:00
parent 9b53174da4
commit 5b9675ba7f
9 changed files with 207 additions and 165 deletions

View File

@ -12,21 +12,11 @@
-behaviour(gen_server).
%% API
-export([start_link/1, write/4, write/5, write_data/4, create_bucket/3, get_orgs/1]).
-export([get_precision/1]).
-export([get_bucket/1]).
-export([create_all_buckets/1]).
-export([start_link/1, write/4, write/5, create_bucket/3, get_orgs/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-define(INFLUX_POOl, influx_pool).
-define(DEFAULT_BUCKET, <<"metric">>).
-define(DEFAULT_ORG, <<"nannong">>).
-record(state, {
host,
port,
@ -37,76 +27,10 @@
%%% API
%%%===================================================================
%%
data_filter(#{<<"key">> := Key}) when is_binary(Key), Key /= <<>> ->
true;
data_filter(_) ->
false.
%%
-spec get_precision(Timestamp :: integer()) -> binary().
get_precision(Timestamp) when is_integer(Timestamp) ->
case length(integer_to_list(Timestamp)) of
10 ->
<<"s">>;
13 ->
<<"ms">>;
16 ->
<<"u">>;
19 ->
<<"ns">>;
_ ->
<<"ms">>
end.
-spec write_data(Measurement :: binary(), Tags :: map(), FieldsList :: list(), Timestamp :: integer()) -> no_return().
write_data(Measurement, Tags, FieldsList, Timestamp) when is_binary(Measurement), is_map(Tags), is_list(FieldsList), is_integer(Timestamp) ->
%% key的选项
NFieldsList = lists:filter(fun data_filter/1, FieldsList),
case length(NFieldsList) > 0 of
true ->
%% uuid进行分组
Points = lists:map(fun(Fields = #{<<"key">> := Key}) ->
Values = maps:remove(<<"key">>, Fields),
NFields = #{Key => Values},
influx_point:new(Measurement, Tags, NFields, Timestamp)
end, NFieldsList),
Precision = influx_client:get_precision(Timestamp),
Bucket = get_bucket(Measurement),
%poolboy:transaction(influx_pool_backup, fun(Pid) -> influx_client:write(Pid, ?DEFAULT_BUCKET, ?DEFAULT_ORG, Precision, Points) end);
poolboy:transaction(influx_pool, fun(Pid) -> influx_client:write(Pid, Bucket, ?DEFAULT_ORG, Precision, Points) end);
false ->
ok
end.
-spec get_bucket(DeviceUUID :: binary()) -> binary().
get_bucket(DeviceUUID) when is_binary(DeviceUUID) ->
SuperDeviceUUIDs = iot_env:get_value(super_device_uuids, []),
case lists:member(DeviceUUID, SuperDeviceUUIDs) of
true ->
<<"metric_", DeviceUUID/binary>>;
false ->
?DEFAULT_BUCKET
end.
-spec write(Pid :: pid(), Bucket :: binary(), Org :: binary(), Points :: list()) -> no_return().
write(Pid, Bucket, Org, Points) when is_pid(Pid), is_binary(Bucket), is_binary(Org), is_list(Points) ->
write(Pid, Bucket, Org, <<"ms">>, Points).
-spec create_all_buckets(SuperDeviceUUIDs :: list()) -> no_return().
create_all_buckets(SuperDeviceUUIDs) when is_list(SuperDeviceUUIDs) ->
poolboy:transaction(influx_pool, fun(Pid) ->
{ok, Orgs} = get_orgs(Pid),
OrgMap = maps:from_list(lists:map(fun(#{<<"name">> := Name, <<"id">> := Id}) -> {Name, Id} end, Orgs)),
OrgId = maps:get(?DEFAULT_ORG, OrgMap, <<"">>),
lager:debug("[influx_client] org_name: ~p, id is: ~p", [?DEFAULT_ORG, OrgId]),
lists:foreach(fun(DeviceUUID) ->
Bucket = <<"metric_", DeviceUUID/binary>>,
influx_client:create_bucket(Pid, OrgId, Bucket)
end, SuperDeviceUUIDs)
end).
%% Precision的值为: ms|ns|s; (ms)
-spec write(Pid :: pid(), Bucket :: binary(), Org :: binary(), Precision :: binary(), Points :: list()) -> no_return().
write(Pid, Bucket, Org, Precision, Points) when is_pid(Pid), is_binary(Bucket), is_binary(Org), is_binary(Precision), is_list(Points) ->

View File

@ -0,0 +1,165 @@
%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2024, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 03. 9 2024 11:32
%%%-------------------------------------------------------------------
-module(influx_client_pool).
-author("anlicheng").
-behaviour(gen_server).
%% API
-export([start_link/0]).
-export([write_data/4]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-define(DEFAULT_BUCKET, <<"metric">>).
-define(DEFAULT_ORG, <<"nannong">>).
-record(state, {
pool_pid :: pid()
}).
%%%===================================================================
%%% API
%%%===================================================================
-spec write_data(Measurement :: binary(), Tags :: map(), FieldsList :: list(), Timestamp :: integer()) -> no_return().
write_data(Measurement, Tags, FieldsList, Timestamp) when is_binary(Measurement), is_map(Tags), is_list(FieldsList), is_integer(Timestamp) ->
%% key的选项
Points = lists:flatmap(fun(Fields) ->
case Fields of
#{<<"key">> := Key} ->
Values = maps:remove(<<"key">>, Fields),
NFields = #{Key => Values},
Point = influx_point:new(Measurement, Tags, NFields, Timestamp),
[Point];
_ ->
[]
end
end, FieldsList),
Precision = case length(integer_to_list(Timestamp)) of
10 -> <<"s">>;
13 -> <<"ms">>;
16 -> <<"u">>;
19 -> <<"ns">>;
_ -> <<"ms">>
end,
Bucket = get_bucket(Measurement),
gen_server:cast(?SERVER, {write_data, Bucket, ?DEFAULT_ORG, Precision, Points}).
-spec get_bucket(DeviceUUID :: binary()) -> binary().
get_bucket(DeviceUUID) when is_binary(DeviceUUID) ->
SuperDeviceUUIDs = iot_env:get_value(super_device_uuids, []),
case lists:member(DeviceUUID, SuperDeviceUUIDs) of
true ->
<<"metric_", DeviceUUID/binary>>;
false ->
?DEFAULT_BUCKET
end.
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link() ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% @private
%% @doc Initializes the server
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([]) ->
{ok, PoolProps} = application:get_env(iot, influx_pool),
PoolSize = proplists:get_value(pool_size, PoolProps),
WorkerArgs = proplists:get_value(worker_args, PoolProps),
%% 线
{ok, PoolPid} = poolboy:start_link([{size, PoolSize}, {max_overflow, PoolSize}, {worker_module, influx_client}], WorkerArgs),
%% buckets
SuperDeviceUUIDs = iot_env:get_value(super_device_uuids, []),
catch create_buckets(PoolPid, SuperDeviceUUIDs),
{ok, #state{pool_pid = PoolPid}}.
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call(_Request, _From, State = #state{}) ->
{reply, ok, State}.
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast({write_data, _Bucket, _Org, _Precision, []}, State) ->
{noreply, State};
handle_cast({write_data, Bucket, Org, Precision, Points}, State = #state{pool_pid = PoolPid}) ->
poolboy:transaction(PoolPid, fun(Pid) -> influx_client:write(Pid, Bucket, Org, Precision, Points) end),
{noreply, State}.
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info(_Info, State = #state{}) ->
{noreply, State}.
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(_Reason, _State = #state{}) ->
ok.
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
%% buckets
-spec create_buckets(PoolPid :: pid(), SuperDeviceUUIDs :: list()) -> no_return().
create_buckets(PoolPid, []) when is_pid(PoolPid) ->
ok;
create_buckets(PoolPid, SuperDeviceUUIDs) when is_pid(PoolPid), is_list(SuperDeviceUUIDs) ->
poolboy:transaction(PoolPid, fun(Pid) ->
{ok, Orgs} = influx_client:get_orgs(Pid),
OrgMap = maps:from_list(lists:map(fun(#{<<"name">> := Name, <<"id">> := Id}) -> {Name, Id} end, Orgs)),
OrgId = maps:get(?DEFAULT_ORG, OrgMap, <<"">>),
lager:debug("[influx_client_pool] org_name: ~p, id is: ~p", [?DEFAULT_ORG, OrgId]),
[influx_client:create_bucket(Pid, OrgId, <<"metric_", DeviceUUID/binary>>) || DeviceUUID <- SuperDeviceUUIDs]
end).

View File

@ -27,16 +27,6 @@ start(_StartType, _StartArgs) ->
%% http服务
start_http_server(),
%% buckets
SuperDeviceUUIDs = iot_env:get_value(super_device_uuids, []),
case is_list(SuperDeviceUUIDs) andalso length(SuperDeviceUUIDs) > 0 of
true ->
Result = catch influx_client:create_all_buckets(SuperDeviceUUIDs),
lager:debug("[iot_app] create_all_buckets: ~p, result: ~p", [SuperDeviceUUIDs, Result]);
false ->
ok
end,
{ok, SupPid}.
stop(_State) ->

View File

@ -585,7 +585,7 @@ handle_data(#{<<"device_uuid">> := DeviceUUID, <<"service_name">> := ServiceName
%% influxdb
NTags = Tags#{<<"uuid">> => UUID, <<"service_name">> => ServiceName, <<"device_uuid">> => DeviceUUID},
influx_client:write_data(DeviceUUID, NTags, FieldsList, Timestamp),
influx_client_pool:write_data(DeviceUUID, NTags, FieldsList, Timestamp),
iot_device:change_status(DevicePid, ?DEVICE_ONLINE),
@ -608,7 +608,7 @@ handle_data(#{<<"service_name">> := ServiceName, <<"at">> := Timestamp, <<"field
%% influxdb
NTags = Tags#{<<"uuid">> => UUID, <<"service_name">> => ServiceName},
influx_client:write_data(UUID, NTags, FieldsList, Timestamp).
influx_client_pool:write_data(UUID, NTags, FieldsList, Timestamp).
-spec report_event(UUID :: binary(), NewStatus :: integer()) -> no_return().
report_event(UUID, NewStatus) when is_binary(UUID), is_integer(NewStatus) ->

View File

@ -37,6 +37,15 @@ init([]) ->
modules => ['iot_watchdog']
},
#{
id => influx_client_pool,
start => {'influx_client_pool', start_link, []},
restart => permanent,
shutdown => 2000,
type => worker,
modules => ['influx_client_pool']
},
#{
id => 'iot_task',
start => {'iot_task', start_link, []},

View File

@ -13,21 +13,7 @@
%% API
-export([rsa_encode/1]).
-export([insert_services/1]).
-export([test_mqtt/0, test_influxdb/0]).
test_influxdb() ->
UUID = <<"device123123">>,
lists:foreach(fun(Id) ->
Point = influx_point:new(<<"shui_biao">>,
[{<<"uuid">>, UUID}, {<<"service_name">>, <<"shui_biao">>}],
[{<<"cost">>, Id}],
iot_util:timestamp()),
poolboy:transaction(influx_pool, fun(Pid) ->
influx_client:write(Pid, <<"iot">>, <<"iot">>, [Point])
end)
end, lists:seq(1, 100)).
-export([test_mqtt/0]).
test_mqtt() ->
iot_zd_endpoint:forward(<<"location_code_test123">>, <<"location_code_test123">>, [

View File

@ -61,6 +61,16 @@
{pool_size, 10}
]},
%% influxdb数据库配置, 测试环境的: 用户名: iot; password: password1234
{influx_pool, [
{pool_size, 100},
{worker_args, [
{host, "39.98.184.67"},
{port, 8086},
{token, <<"IUQ04qecTie7LSuX1EDFBeqspClOdoRBfmXDQxhoEjiJFeW8M-Ui66t981YvviI5qOBpf_ZLgJlBx7nid2lyJQ==">>}
]}
]},
{pools, [
%% mysql连接池配置
{mysql_iot,
@ -85,28 +95,7 @@
{port, 26379},
{database, 1}
]
},
%% influxdb数据库配置, 测试环境的: 用户名: iot; password: password1234
{influx_pool,
[{size, 100}, {max_overflow, 200}, {worker_module, influx_client}],
[
{host, "39.98.184.67"},
{port, 8086},
{token, <<"IUQ04qecTie7LSuX1EDFBeqspClOdoRBfmXDQxhoEjiJFeW8M-Ui66t981YvviI5qOBpf_ZLgJlBx7nid2lyJQ==">>}
]
},
%% 备份库
{influx_pool_backup,
[{size, 100}, {max_overflow, 200}, {worker_module, influx_client}],
[
{host, "39.98.184.67"},
{port, 8086},
{token, <<"IUQ04qecTie7LSuX1EDFBeqspClOdoRBfmXDQxhoEjiJFeW8M-Ui66t981YvviI5qOBpf_ZLgJlBx7nid2lyJQ==">>}
]
}
]}
]},

View File

@ -50,6 +50,16 @@
{pool_size, 10}
]},
%% influxdb数据库配置
{influx_pool, [
{pool_size, 100},
{worker_args, [
{host, "172.19.0.4"},
{port, 8086},
{token, <<"A-ZRjqMK_7NR45lXXEiR7AEtYCd1ETzq9Z61FTMQLb5O4-1hSf8sCrjdPB84e__xsrItKHL3qjJALgbYN-H_VQ==">>}
]}
]},
{pools, [
%% mysql连接池配置
{mysql_iot,
@ -74,26 +84,6 @@
{port, 6379},
{database, 1}
]
},
%% influxdb数据库配置
{influx_pool,
[{size, 100}, {max_overflow, 200}, {worker_module, influx_client}],
[
{host, "172.19.0.4"},
{port, 8086},
{token, <<"A-ZRjqMK_7NR45lXXEiR7AEtYCd1ETzq9Z61FTMQLb5O4-1hSf8sCrjdPB84e__xsrItKHL3qjJALgbYN-H_VQ==">>}
]
},
%% influxdb备份库
{influx_pool_backup,
[{size, 100}, {max_overflow, 200}, {worker_module, influx_client}],
[
{host, "172.19.0.4"},
{port, 8086},
{token, <<"A-ZRjqMK_7NR45lXXEiR7AEtYCd1ETzq9Z61FTMQLb5O4-1hSf8sCrjdPB84e__xsrItKHL3qjJALgbYN-H_VQ==">>}
]
}
]}

View File

@ -50,6 +50,15 @@
{pool_size, 10}
]},
{influx_pool, [
{pool_size, 100},
{worker_args, [
{host, "172.16.0.17"},
{port, 8086},
{token, <<"_p7ehr7STau3WRk4Iy94diB-8i5gdhK7fI9H2bpJmVWKVMX57DqBwhS7ln2gkU3Q2Oy6vnTOqBXB5ilLl_2xAg==">>}
]}
]},
{pools, [
%% mysql连接池配置
{mysql_iot,
@ -74,26 +83,6 @@
{port, 6379},
{database, 1}
]
},
%% influxdb数据库配置
{influx_pool,
[{size, 100}, {max_overflow, 200}, {worker_module, influx_client}],
[
{host, "172.16.0.17"},
{port, 8086},
{token, <<"_p7ehr7STau3WRk4Iy94diB-8i5gdhK7fI9H2bpJmVWKVMX57DqBwhS7ln2gkU3Q2Oy6vnTOqBXB5ilLl_2xAg==">>}
]
},
%% influxdb备份库
{influx_pool_backup,
[{size, 100}, {max_overflow, 200}, {worker_module, influx_client}],
[
{host, "172.16.0.17"},
{port, 8086},
{token, <<"_p7ehr7STau3WRk4Iy94diB-8i5gdhK7fI9H2bpJmVWKVMX57DqBwhS7ln2gkU3Q2Oy6vnTOqBXB5ilLl_2xAg==">>}
]
}
]}