diff --git a/apps/iot/src/influxdb/influx_client.erl b/apps/iot/src/influxdb/influx_client.erl index 083dc3a..c16379f 100644 --- a/apps/iot/src/influxdb/influx_client.erl +++ b/apps/iot/src/influxdb/influx_client.erl @@ -12,21 +12,11 @@ -behaviour(gen_server). %% API --export([start_link/1, write/4, write/5, write_data/4, create_bucket/3, get_orgs/1]). --export([get_precision/1]). --export([get_bucket/1]). --export([create_all_buckets/1]). +-export([start_link/1, write/4, write/5, create_bucket/3, get_orgs/1]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --define(SERVER, ?MODULE). - --define(INFLUX_POOl, influx_pool). - --define(DEFAULT_BUCKET, <<"metric">>). --define(DEFAULT_ORG, <<"nannong">>). - -record(state, { host, port, @@ -37,76 +27,10 @@ %%% API %%%=================================================================== -%% 数据过滤器 -data_filter(#{<<"key">> := Key}) when is_binary(Key), Key /= <<>> -> - true; -data_filter(_) -> - false. - -%% 获取时间标识符号 --spec get_precision(Timestamp :: integer()) -> binary(). -get_precision(Timestamp) when is_integer(Timestamp) -> - case length(integer_to_list(Timestamp)) of - 10 -> - <<"s">>; - 13 -> - <<"ms">>; - 16 -> - <<"u">>; - 19 -> - <<"ns">>; - _ -> - <<"ms">> - end. - --spec write_data(Measurement :: binary(), Tags :: map(), FieldsList :: list(), Timestamp :: integer()) -> no_return(). -write_data(Measurement, Tags, FieldsList, Timestamp) when is_binary(Measurement), is_map(Tags), is_list(FieldsList), is_integer(Timestamp) -> - %% 过来掉没有key的选项 - NFieldsList = lists:filter(fun data_filter/1, FieldsList), - case length(NFieldsList) > 0 of - true -> - %% 按照设备的uuid进行分组 - Points = lists:map(fun(Fields = #{<<"key">> := Key}) -> - Values = maps:remove(<<"key">>, Fields), - NFields = #{Key => Values}, - influx_point:new(Measurement, Tags, NFields, Timestamp) - end, NFieldsList), - Precision = influx_client:get_precision(Timestamp), - Bucket = get_bucket(Measurement), - - %poolboy:transaction(influx_pool_backup, fun(Pid) -> influx_client:write(Pid, ?DEFAULT_BUCKET, ?DEFAULT_ORG, Precision, Points) end); - poolboy:transaction(influx_pool, fun(Pid) -> influx_client:write(Pid, Bucket, ?DEFAULT_ORG, Precision, Points) end); - false -> - ok - end. - --spec get_bucket(DeviceUUID :: binary()) -> binary(). -get_bucket(DeviceUUID) when is_binary(DeviceUUID) -> - SuperDeviceUUIDs = iot_env:get_value(super_device_uuids, []), - case lists:member(DeviceUUID, SuperDeviceUUIDs) of - true -> - <<"metric_", DeviceUUID/binary>>; - false -> - ?DEFAULT_BUCKET - end. - -spec write(Pid :: pid(), Bucket :: binary(), Org :: binary(), Points :: list()) -> no_return(). write(Pid, Bucket, Org, Points) when is_pid(Pid), is_binary(Bucket), is_binary(Org), is_list(Points) -> write(Pid, Bucket, Org, <<"ms">>, Points). --spec create_all_buckets(SuperDeviceUUIDs :: list()) -> no_return(). -create_all_buckets(SuperDeviceUUIDs) when is_list(SuperDeviceUUIDs) -> - poolboy:transaction(influx_pool, fun(Pid) -> - {ok, Orgs} = get_orgs(Pid), - OrgMap = maps:from_list(lists:map(fun(#{<<"name">> := Name, <<"id">> := Id}) -> {Name, Id} end, Orgs)), - OrgId = maps:get(?DEFAULT_ORG, OrgMap, <<"">>), - lager:debug("[influx_client] org_name: ~p, id is: ~p", [?DEFAULT_ORG, OrgId]), - lists:foreach(fun(DeviceUUID) -> - Bucket = <<"metric_", DeviceUUID/binary>>, - influx_client:create_bucket(Pid, OrgId, Bucket) - end, SuperDeviceUUIDs) - end). - %% Precision的值为: ms|ns|s; 表示时间的精度,默认为毫秒(ms) -spec write(Pid :: pid(), Bucket :: binary(), Org :: binary(), Precision :: binary(), Points :: list()) -> no_return(). write(Pid, Bucket, Org, Precision, Points) when is_pid(Pid), is_binary(Bucket), is_binary(Org), is_binary(Precision), is_list(Points) -> diff --git a/apps/iot/src/influxdb/influx_client_pool.erl b/apps/iot/src/influxdb/influx_client_pool.erl new file mode 100644 index 0000000..c8a08d1 --- /dev/null +++ b/apps/iot/src/influxdb/influx_client_pool.erl @@ -0,0 +1,165 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2024, +%%% @doc +%%% +%%% @end +%%% Created : 03. 9月 2024 11:32 +%%%------------------------------------------------------------------- +-module(influx_client_pool). +-author("anlicheng"). + +-behaviour(gen_server). + +%% API +-export([start_link/0]). +-export([write_data/4]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-define(DEFAULT_BUCKET, <<"metric">>). +-define(DEFAULT_ORG, <<"nannong">>). + +-record(state, { + pool_pid :: pid() +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +-spec write_data(Measurement :: binary(), Tags :: map(), FieldsList :: list(), Timestamp :: integer()) -> no_return(). +write_data(Measurement, Tags, FieldsList, Timestamp) when is_binary(Measurement), is_map(Tags), is_list(FieldsList), is_integer(Timestamp) -> + %% 过来掉没有key的选项 + Points = lists:flatmap(fun(Fields) -> + case Fields of + #{<<"key">> := Key} -> + Values = maps:remove(<<"key">>, Fields), + NFields = #{Key => Values}, + Point = influx_point:new(Measurement, Tags, NFields, Timestamp), + [Point]; + _ -> + [] + end + end, FieldsList), + + Precision = case length(integer_to_list(Timestamp)) of + 10 -> <<"s">>; + 13 -> <<"ms">>; + 16 -> <<"u">>; + 19 -> <<"ns">>; + _ -> <<"ms">> + end, + Bucket = get_bucket(Measurement), + + gen_server:cast(?SERVER, {write_data, Bucket, ?DEFAULT_ORG, Precision, Points}). + +-spec get_bucket(DeviceUUID :: binary()) -> binary(). +get_bucket(DeviceUUID) when is_binary(DeviceUUID) -> + SuperDeviceUUIDs = iot_env:get_value(super_device_uuids, []), + case lists:member(DeviceUUID, SuperDeviceUUIDs) of + true -> + <<"metric_", DeviceUUID/binary>>; + false -> + ?DEFAULT_BUCKET + end. + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link() -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([]) -> + {ok, PoolProps} = application:get_env(iot, influx_pool), + PoolSize = proplists:get_value(pool_size, PoolProps), + WorkerArgs = proplists:get_value(worker_args, PoolProps), + + %% 启动工作的线程池 + {ok, PoolPid} = poolboy:start_link([{size, PoolSize}, {max_overflow, PoolSize}, {worker_module, influx_client}], WorkerArgs), + %% 创建特殊的buckets + SuperDeviceUUIDs = iot_env:get_value(super_device_uuids, []), + catch create_buckets(PoolPid, SuperDeviceUUIDs), + + {ok, #state{pool_pid = PoolPid}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(_Request, _From, State = #state{}) -> + {reply, ok, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast({write_data, _Bucket, _Org, _Precision, []}, State) -> + {noreply, State}; +handle_cast({write_data, Bucket, Org, Precision, Points}, State = #state{pool_pid = PoolPid}) -> + poolboy:transaction(PoolPid, fun(Pid) -> influx_client:write(Pid, Bucket, Org, Precision, Points) end), + {noreply, State}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info(_Info, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +%% 创建特殊的buckets +-spec create_buckets(PoolPid :: pid(), SuperDeviceUUIDs :: list()) -> no_return(). +create_buckets(PoolPid, []) when is_pid(PoolPid) -> + ok; +create_buckets(PoolPid, SuperDeviceUUIDs) when is_pid(PoolPid), is_list(SuperDeviceUUIDs) -> + poolboy:transaction(PoolPid, fun(Pid) -> + {ok, Orgs} = influx_client:get_orgs(Pid), + OrgMap = maps:from_list(lists:map(fun(#{<<"name">> := Name, <<"id">> := Id}) -> {Name, Id} end, Orgs)), + OrgId = maps:get(?DEFAULT_ORG, OrgMap, <<"">>), + lager:debug("[influx_client_pool] org_name: ~p, id is: ~p", [?DEFAULT_ORG, OrgId]), + [influx_client:create_bucket(Pid, OrgId, <<"metric_", DeviceUUID/binary>>) || DeviceUUID <- SuperDeviceUUIDs] + end). \ No newline at end of file diff --git a/apps/iot/src/iot_app.erl b/apps/iot/src/iot_app.erl index 0732e45..ea7eee3 100644 --- a/apps/iot/src/iot_app.erl +++ b/apps/iot/src/iot_app.erl @@ -27,16 +27,6 @@ start(_StartType, _StartArgs) -> %% 启动http服务 start_http_server(), - %% 创建特殊的buckets - SuperDeviceUUIDs = iot_env:get_value(super_device_uuids, []), - case is_list(SuperDeviceUUIDs) andalso length(SuperDeviceUUIDs) > 0 of - true -> - Result = catch influx_client:create_all_buckets(SuperDeviceUUIDs), - lager:debug("[iot_app] create_all_buckets: ~p, result: ~p", [SuperDeviceUUIDs, Result]); - false -> - ok - end, - {ok, SupPid}. stop(_State) -> diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index ec01be3..79f0fa9 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -585,7 +585,7 @@ handle_data(#{<<"device_uuid">> := DeviceUUID, <<"service_name">> := ServiceName %% 数据写入influxdb NTags = Tags#{<<"uuid">> => UUID, <<"service_name">> => ServiceName, <<"device_uuid">> => DeviceUUID}, - influx_client:write_data(DeviceUUID, NTags, FieldsList, Timestamp), + influx_client_pool:write_data(DeviceUUID, NTags, FieldsList, Timestamp), iot_device:change_status(DevicePid, ?DEVICE_ONLINE), @@ -608,7 +608,7 @@ handle_data(#{<<"service_name">> := ServiceName, <<"at">> := Timestamp, <<"field %% 数据写入influxdb NTags = Tags#{<<"uuid">> => UUID, <<"service_name">> => ServiceName}, - influx_client:write_data(UUID, NTags, FieldsList, Timestamp). + influx_client_pool:write_data(UUID, NTags, FieldsList, Timestamp). -spec report_event(UUID :: binary(), NewStatus :: integer()) -> no_return(). report_event(UUID, NewStatus) when is_binary(UUID), is_integer(NewStatus) -> diff --git a/apps/iot/src/iot_sup.erl b/apps/iot/src/iot_sup.erl index 61f51cd..aa11084 100644 --- a/apps/iot/src/iot_sup.erl +++ b/apps/iot/src/iot_sup.erl @@ -37,6 +37,15 @@ init([]) -> modules => ['iot_watchdog'] }, + #{ + id => influx_client_pool, + start => {'influx_client_pool', start_link, []}, + restart => permanent, + shutdown => 2000, + type => worker, + modules => ['influx_client_pool'] + }, + #{ id => 'iot_task', start => {'iot_task', start_link, []}, diff --git a/apps/iot/src/mocker/iot_mock.erl b/apps/iot/src/mocker/iot_mock.erl index 4c2a26b..1a09cc8 100644 --- a/apps/iot/src/mocker/iot_mock.erl +++ b/apps/iot/src/mocker/iot_mock.erl @@ -13,21 +13,7 @@ %% API -export([rsa_encode/1]). -export([insert_services/1]). --export([test_mqtt/0, test_influxdb/0]). - -test_influxdb() -> - UUID = <<"device123123">>, - - lists:foreach(fun(Id) -> - Point = influx_point:new(<<"shui_biao">>, - [{<<"uuid">>, UUID}, {<<"service_name">>, <<"shui_biao">>}], - [{<<"cost">>, Id}], - iot_util:timestamp()), - - poolboy:transaction(influx_pool, fun(Pid) -> - influx_client:write(Pid, <<"iot">>, <<"iot">>, [Point]) - end) - end, lists:seq(1, 100)). +-export([test_mqtt/0]). test_mqtt() -> iot_zd_endpoint:forward(<<"location_code_test123">>, <<"location_code_test123">>, [ diff --git a/config/sys-dev.config b/config/sys-dev.config index 0d69566..8eaa5e4 100644 --- a/config/sys-dev.config +++ b/config/sys-dev.config @@ -61,6 +61,16 @@ {pool_size, 10} ]}, + %% influxdb数据库配置, 测试环境的: 用户名: iot; password: password1234 + {influx_pool, [ + {pool_size, 100}, + {worker_args, [ + {host, "39.98.184.67"}, + {port, 8086}, + {token, <<"IUQ04qecTie7LSuX1EDFBeqspClOdoRBfmXDQxhoEjiJFeW8M-Ui66t981YvviI5qOBpf_ZLgJlBx7nid2lyJQ==">>} + ]} + ]}, + {pools, [ %% mysql连接池配置 {mysql_iot, @@ -85,28 +95,7 @@ {port, 26379}, {database, 1} ] - }, - - %% influxdb数据库配置, 测试环境的: 用户名: iot; password: password1234 - {influx_pool, - [{size, 100}, {max_overflow, 200}, {worker_module, influx_client}], - [ - {host, "39.98.184.67"}, - {port, 8086}, - {token, <<"IUQ04qecTie7LSuX1EDFBeqspClOdoRBfmXDQxhoEjiJFeW8M-Ui66t981YvviI5qOBpf_ZLgJlBx7nid2lyJQ==">>} - ] - }, - - %% 备份库 - {influx_pool_backup, - [{size, 100}, {max_overflow, 200}, {worker_module, influx_client}], - [ - {host, "39.98.184.67"}, - {port, 8086}, - {token, <<"IUQ04qecTie7LSuX1EDFBeqspClOdoRBfmXDQxhoEjiJFeW8M-Ui66t981YvviI5qOBpf_ZLgJlBx7nid2lyJQ==">>} - ] } - ]} ]}, diff --git a/config/sys-prod.config b/config/sys-prod.config index 5b5adf6..083e173 100644 --- a/config/sys-prod.config +++ b/config/sys-prod.config @@ -50,6 +50,16 @@ {pool_size, 10} ]}, + %% influxdb数据库配置 + {influx_pool, [ + {pool_size, 100}, + {worker_args, [ + {host, "172.19.0.4"}, + {port, 8086}, + {token, <<"A-ZRjqMK_7NR45lXXEiR7AEtYCd1ETzq9Z61FTMQLb5O4-1hSf8sCrjdPB84e__xsrItKHL3qjJALgbYN-H_VQ==">>} + ]} + ]}, + {pools, [ %% mysql连接池配置 {mysql_iot, @@ -74,26 +84,6 @@ {port, 6379}, {database, 1} ] - }, - - %% influxdb数据库配置 - {influx_pool, - [{size, 100}, {max_overflow, 200}, {worker_module, influx_client}], - [ - {host, "172.19.0.4"}, - {port, 8086}, - {token, <<"A-ZRjqMK_7NR45lXXEiR7AEtYCd1ETzq9Z61FTMQLb5O4-1hSf8sCrjdPB84e__xsrItKHL3qjJALgbYN-H_VQ==">>} - ] - }, - - %% influxdb备份库 - {influx_pool_backup, - [{size, 100}, {max_overflow, 200}, {worker_module, influx_client}], - [ - {host, "172.19.0.4"}, - {port, 8086}, - {token, <<"A-ZRjqMK_7NR45lXXEiR7AEtYCd1ETzq9Z61FTMQLb5O4-1hSf8sCrjdPB84e__xsrItKHL3qjJALgbYN-H_VQ==">>} - ] } ]} diff --git a/config/sys-test.config b/config/sys-test.config index 18f1f02..c91219c 100644 --- a/config/sys-test.config +++ b/config/sys-test.config @@ -50,6 +50,15 @@ {pool_size, 10} ]}, + {influx_pool, [ + {pool_size, 100}, + {worker_args, [ + {host, "172.16.0.17"}, + {port, 8086}, + {token, <<"_p7ehr7STau3WRk4Iy94diB-8i5gdhK7fI9H2bpJmVWKVMX57DqBwhS7ln2gkU3Q2Oy6vnTOqBXB5ilLl_2xAg==">>} + ]} + ]}, + {pools, [ %% mysql连接池配置 {mysql_iot, @@ -74,26 +83,6 @@ {port, 6379}, {database, 1} ] - }, - - %% influxdb数据库配置 - {influx_pool, - [{size, 100}, {max_overflow, 200}, {worker_module, influx_client}], - [ - {host, "172.16.0.17"}, - {port, 8086}, - {token, <<"_p7ehr7STau3WRk4Iy94diB-8i5gdhK7fI9H2bpJmVWKVMX57DqBwhS7ln2gkU3Q2Oy6vnTOqBXB5ilLl_2xAg==">>} - ] - }, - - %% influxdb备份库 - {influx_pool_backup, - [{size, 100}, {max_overflow, 200}, {worker_module, influx_client}], - [ - {host, "172.16.0.17"}, - {port, 8086}, - {token, <<"_p7ehr7STau3WRk4Iy94diB-8i5gdhK7fI9H2bpJmVWKVMX57DqBwhS7ln2gkU3Q2Oy6vnTOqBXB5ilLl_2xAg==">>} - ] } ]}