add observer

This commit is contained in:
anlicheng 2023-12-21 11:13:39 +08:00
parent ef419f551c
commit 5f12140565
3 changed files with 44 additions and 8 deletions

View File

@ -35,6 +35,12 @@
%%% API %%% API
%%%=================================================================== %%%===================================================================
%%
data_filter(#{<<"key">> := Key}) when is_binary(Key), Key /= <<>> ->
true;
data_filter(_) ->
false.
%% %%
-spec get_precision(Timestamp :: integer()) -> binary(). -spec get_precision(Timestamp :: integer()) -> binary().
get_precision(Timestamp) when is_integer(Timestamp) -> get_precision(Timestamp) when is_integer(Timestamp) ->
@ -53,15 +59,22 @@ get_precision(Timestamp) when is_integer(Timestamp) ->
-spec write_data(Measurement :: binary(), Tags :: map(), FieldsList :: list(), Timestamp :: integer()) -> no_return(). -spec write_data(Measurement :: binary(), Tags :: map(), FieldsList :: list(), Timestamp :: integer()) -> no_return().
write_data(Measurement, Tags, FieldsList, Timestamp) when is_binary(Measurement), is_map(Tags), is_list(FieldsList), is_integer(Timestamp) -> write_data(Measurement, Tags, FieldsList, Timestamp) when is_binary(Measurement), is_map(Tags), is_list(FieldsList), is_integer(Timestamp) ->
%% uuid进行分组 %% key的选项
Points = lists:map(fun(Fields = #{<<"key">> := Key}) -> NFieldsList = lists:filter(fun data_filter/1, FieldsList),
Values = maps:remove(<<"key">>, Fields), case length(NFieldsList) > 0 of
NFields = #{Key => Values}, true ->
influx_point:new(Measurement, Tags, NFields, Timestamp) %% uuid进行分组
end, FieldsList), Points = lists:map(fun(Fields = #{<<"key">> := Key}) ->
Precision = influx_client:get_precision(Timestamp), Values = maps:remove(<<"key">>, Fields),
NFields = #{Key => Values},
influx_point:new(Measurement, Tags, NFields, Timestamp)
end, NFieldsList),
Precision = influx_client:get_precision(Timestamp),
poolboy:transaction(influx_pool, fun(Pid) -> influx_client:write(Pid, ?DEFAULT_BUCKET, ?DEFAULT_ORG, Precision, Points) end). poolboy:transaction(influx_pool, fun(Pid) -> influx_client:write(Pid, ?DEFAULT_BUCKET, ?DEFAULT_ORG, Precision, Points) end);
false ->
ok
end.
-spec write(Pid :: pid(), Bucket :: binary(), Org :: binary(), Points :: list()) -> no_return(). -spec write(Pid :: pid(), Bucket :: binary(), Org :: binary(), Points :: list()) -> no_return().
write(Pid, Bucket, Org, Points) when is_pid(Pid), is_binary(Bucket), is_binary(Org), is_list(Points) -> write(Pid, Bucket, Org, Points) when is_pid(Pid), is_binary(Bucket), is_binary(Org), is_list(Points) ->

View File

@ -21,6 +21,7 @@
public_key, public_key,
ssl, ssl,
erts, erts,
observer,
kernel, kernel,
stdlib stdlib
]}, ]},

View File

@ -0,0 +1,22 @@
%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2023, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 21. 12 2023 11:08
%%%-------------------------------------------------------------------
-module(iot_observer).
-author("anlicheng").
%% API
-export([memory_top/1, cpu_top/1, stop/0]).
memory_top(Interval) when is_integer(Interval) ->
spawn(fun()->etop:start([{output, text}, {interval, Interval}, {lines, 20}, {sort, memory}])end).
cpu_top(Interval) when is_integer(Interval) ->
spawn(fun()->etop:start([{output, text}, {interval, Interval}, {lines, 20}, {sort, runtime}])end).
stop() ->
etop:stop().