fix device poll

This commit is contained in:
anlicheng 2025-04-08 10:58:16 +08:00
parent 859aea44f2
commit 856072a9f2
11 changed files with 509 additions and 10 deletions

View File

@ -11,7 +11,7 @@
-include("iot.hrl").
%% API
-export([get_all_devices/0, get_host_devices/1, get_device_by_uuid/1, change_status/2]).
-export([get_all_devices/0, get_host_devices/1, get_device_by_uuid/1, change_status/2, change_edge_status/2]).
-spec get_all_devices() -> {ok, Devices :: [map()]} | {error, Reason :: any()}.
get_all_devices() ->
@ -52,3 +52,8 @@ change_status0(DeviceUUID, ?DEVICE_ONLINE) when is_binary(DeviceUUID) ->
end;
change_status0(DeviceUUID, ?DEVICE_OFFLINE) when is_binary(DeviceUUID) ->
mysql_pool:update_by(mysql_iot, <<"UPDATE device SET status = ? WHERE device_uuid = ? LIMIT 1">>, [?DEVICE_OFFLINE, DeviceUUID]).
%%
-spec change_edge_status(DeviceUUID :: binary(), NEdgeStatus :: integer()) -> {ok, AffectedRows :: integer()} | {error, Reason :: any()}.
change_edge_status(DeviceUUID, NEdgeStatus) when is_binary(DeviceUUID), is_integer(NEdgeStatus) ->
mysql_pool:update_by(mysql_iot, <<"UPDATE device SET edge_status = ? WHERE device_uuid = ? LIMIT 1">>, [NEdgeStatus, DeviceUUID]).

View File

@ -12,6 +12,7 @@
%% API
-export([handle_request/4]).
-export([test/1]).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% helper methods
@ -63,6 +64,58 @@ handle_request("POST", "/device/activate", _, #{<<"host_id">> := HostId, <<"devi
end
end;
%%
handle_request("POST", "/device/query_edge_status", _, #{<<"host_id">> := HostId, <<"device_uuid">> := DeviceUUID, <<"timeout">> := Timeout}) when is_integer(HostId), is_binary(DeviceUUID) ->
lager:debug("[device_handler] host_id: ~p, will query_edge_status uuid: ~p", [HostId, DeviceUUID]),
AliasName = iot_host:get_alias_name(HostId),
case global:whereis_name(AliasName) of
undefined ->
{ok, 200, iot_util:json_error(404, <<"query_edge_status failed">>)};
HostPid when is_pid(HostPid) ->
Ref = make_ref(),
{ok, {TaskPid, _}} = iot_device_poll_task:start_monitor(),
iot_device_poll_task:poll_task(TaskPid, Ref, HostPid, self(), DeviceUUID, Timeout),
receive
{poll_task_reply, Ref, {ok, EdgeStatus}} ->
{ok, _} = device_bo:change_edge_status(DeviceUUID, EdgeStatus),
{ok, 200, iot_util:json_data(#{<<"edge_status">> => EdgeStatus})};
{poll_task_reply, Ref, {error, Reason}} ->
lager:debug("[device_handler] query_edge_status device: ~p, get error: ~p", [DeviceUUID, Reason]),
{ok, 200, iot_util:json_error(404, <<"query_edge_status failed">>)}
after Timeout * 1000 ->
lager:debug("[device_handler] query_edge_status device: ~p, timeout", [DeviceUUID]),
{ok, 200, iot_util:json_error(404, <<"query_edge_status timeout">>)}
end
end;
handle_request(_, Path, _, _) ->
Path1 = list_to_binary(Path),
{ok, 200, iot_util:json_error(-1, <<"url: ", Path1/binary, " not found">>)}.
test(DeviceUUID) when is_binary(DeviceUUID) ->
Timeout = 10,
HostUUID = <<"0356e898adc51ac449b215627b7d8e55">>,
lager:debug("[device_handler] host_id: ~p, will query_edge_status uuid: ~p", [HostUUID, DeviceUUID]),
case iot_host:get_pid(HostUUID) of
undefined ->
{ok, 200, iot_util:json_error(404, <<"query_edge_status failed">>)};
HostPid when is_pid(HostPid) ->
Ref = make_ref(),
{ok, {TaskPid, _}} = iot_device_poll_task:start_monitor(),
iot_device_poll_task:poll_task(TaskPid, Ref, HostPid, self(), DeviceUUID, Timeout),
receive
{poll_task_reply, Ref, {ok, EdgeStatus}} ->
% {ok, _} = device_bo:change_edge_status(DeviceUUID, EdgeStatus),
{ok, 200, iot_util:json_data(#{<<"edge_status">> => EdgeStatus})};
{poll_task_reply, Ref, {error, Reason}} ->
lager:debug("[device_handler] query_edge_status device: ~p, get error: ~p", [DeviceUUID, Reason]),
{ok, 200, iot_util:json_error(404, <<"query_edge_status failed">>)}
after Timeout * 1000 ->
lager:debug("[device_handler] query_edge_status device: ~p, timeout", [DeviceUUID]),
{ok, 200, iot_util:json_error(404, <<"query_edge_status timeout">>)}
end
end.

View File

@ -2,7 +2,7 @@
%%% @author anlicheng
%%% @copyright (C) 2024, <COMPANY>
%%% @doc
%%%
%%%
%%% @end
%%% Created : 03. 9 2024 11:32
%%%-------------------------------------------------------------------

View File

@ -0,0 +1,155 @@
%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2024, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 03. 9 2024 11:32
%%%-------------------------------------------------------------------
-module(influx_monitor_client_pool).
-author("anlicheng").
-behaviour(gen_server).
%% API
-export([start_link/0]).
-export([write_data/4]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
%% buffer的数量
-define(BUFFER_SIZE, 1500).
-define(DEFAULT_BUCKET, <<"monitor">>).
-define(DEFAULT_ORG, <<"nannong">>).
-record(state, {
pool_pid :: pid(),
%% ,
q = queue:new()
}).
%%%===================================================================
%%% API
%%%===================================================================
%% Measurement: device_status
%% Tags: #{<<"device_uuid">> => <<"abc">>}
%% Fields: #{<<"edge_status">> => 1}
%% Timestamp: 13
-spec write_data(Measurement :: binary(), Tags :: map(), Fields :: map(), Timestamp :: integer()) -> no_return().
write_data(Measurement, Tags, Fields, Timestamp) when is_binary(Measurement), is_map(Tags), is_map(Fields), is_integer(Timestamp) ->
Point = influx_point:new(Measurement, Tags, Fields, format_timestamp(Timestamp)),
gen_server:cast(?SERVER, {write_data, [Point]}).
format_timestamp(Timestamp) when is_integer(Timestamp) ->
case length(integer_to_list(Timestamp)) of
10 -> Timestamp * 1000;
13 -> Timestamp;
16 -> Timestamp div 1000;
19 -> Timestamp div 1000_000;
_ -> Timestamp
end.
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link() ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% @private
%% @doc Initializes the server
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([]) ->
{ok, PoolProps} = application:get_env(iot, influx_monitor_pool),
PoolSize = proplists:get_value(pool_size, PoolProps),
WorkerArgs = proplists:get_value(worker_args, PoolProps),
%% 线
{ok, PoolPid} = poolboy:start_link([{size, PoolSize}, {max_overflow, PoolSize}, {worker_module, influx_client}], WorkerArgs),
%%
erlang:start_timer(5000, self(), flush_ticker),
{ok, #state{pool_pid = PoolPid}}.
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call(_Request, _From, State = #state{}) ->
{reply, ok, State}.
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast({write_data, []}, State) ->
{noreply, State};
handle_cast({write_data, Points}, State = #state{pool_pid = PoolPid, q = Q}) ->
NQ = queue:join(Q, queue:from_list(Points)),
%%
Len = queue:len(NQ),
case Len >= ?BUFFER_SIZE of
true ->
poolboy:transaction(PoolPid, fun(Pid) -> influx_client:write(Pid, ?DEFAULT_BUCKET, ?DEFAULT_ORG, <<"ms">>, queue:to_list(NQ)) end),
{noreply, State#state{q = queue:new()}};
false ->
{noreply, State#state{q = NQ}}
end.
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info({timeout, _, flush_ticker}, State = #state{q = Q, pool_pid = PoolPid}) ->
erlang:start_timer(5000, self(), flush_ticker),
Len = queue:len(Q),
Len > 0 andalso poolboy:transaction(PoolPid, fun(Pid) -> influx_client:write(Pid, ?DEFAULT_BUCKET, ?DEFAULT_ORG, <<"ms">>, queue:to_list(Q)) end),
lager:debug("[influx_monitor_client_pool] flush_ticker acc write num: ~p", [Len]),
{noreply, State#state{q = queue:new()}};
handle_info(_Info, State = #state{}) ->
{noreply, State}.
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(_Reason, _State = #state{}) ->
ok.
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================

View File

@ -25,6 +25,8 @@
%% id
device_id :: integer(),
device_uuid :: binary(),
%% id
host_id :: integer(),
%%
auth_status :: integer(),
%%
@ -35,6 +37,12 @@
%%
model_id = 0,
%%
poll_interval = 0 :: integer(),
poll_ref :: undefined | reference(),
%%
edge_status = 0 :: integer(),
status = ?DEVICE_OFFLINE
}).
@ -139,11 +147,19 @@ init([DeviceUUID]) when is_binary(DeviceUUID) ->
lager:warning("[iot_device] device uuid: ~p, loaded from mysql failed", [DeviceUUID]),
ignore
end;
init([#{<<"id">> := DeviceId, <<"device_uuid">> := DeviceUUID, <<"authorize_status">> := AuthorizeStatus, <<"status">> := Status, <<"model_id">> := ModelId}]) ->
init([DeviceInfo = #{<<"id">> := DeviceId, <<"device_uuid">> := DeviceUUID, <<"host_id">> := HostId, <<"authorize_status">> := AuthorizeStatus, <<"status">> := Status, <<"model_id">> := ModelId}]) ->
{ok, AiEventThrottle} = application:get_env(iot, ai_event_throttle),
{ok, #state{device_id = DeviceId, device_uuid = DeviceUUID, ai_event_throttle = AiEventThrottle,
status = as_integer(Status), auth_status = as_integer(AuthorizeStatus), model_id = as_integer(ModelId)}}.
EdgeStatus = maps:get(<<"edge_status">>, DeviceInfo, 0),
%%
{ok, PollInterval0} = application:get_env(iot, device_poll_interval),
PollInterval = PollInterval0 * 1000,
erlang:start_timer(PollInterval, self(), poll_ticker),
{ok, #state{device_id = DeviceId, device_uuid = DeviceUUID, ai_event_throttle = AiEventThrottle, host_id = as_integer(HostId), edge_status = EdgeStatus,
status = as_integer(Status), auth_status = as_integer(AuthorizeStatus), model_id = as_integer(ModelId), poll_interval = PollInterval}}.
%% @private
%% @doc Handling call messages
@ -266,6 +282,54 @@ handle_cast({handle_data, Fields, Timestamp}, State = #state{device_uuid = Devic
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info({timeout, _, poll_ticker}, State = #state{poll_interval = PollInterval, device_uuid = DeviceUUID, host_id = HostId}) ->
%%
erlang:start_timer(PollInterval, self(), poll_ticker),
AliasName = iot_host:get_alias_name(HostId),
case global:whereis_name(AliasName) of
undefined ->
lager:warning("[iot_device] device_uuid: ~p, host not found", [DeviceUUID]),
{noreply, State};
HostPid when is_pid(HostPid) ->
%%
Ref = make_ref(),
{ok, {TaskPid, _}} = iot_device_poll_task:start_monitor(),
iot_device_poll_task:poll_task(TaskPid, Ref, HostPid, self(), DeviceUUID, 10),
{noreply, State#state{poll_ref = Ref}}
end;
%%
handle_info({poll_task_reply, Ref, {ok, EdgeStatus}}, State = #state{device_uuid = DeviceUUID, edge_status = LastEdgeStatus, status = Status, poll_ref = Ref}) ->
case EdgeStatus =/= LastEdgeStatus of
true ->
lager:debug("[iot_device] device_uuid: ~p, poll edge_status is: ~p", [DeviceUUID, EdgeStatus]),
save_device_edge_status(DeviceUUID, EdgeStatus, LastEdgeStatus, Status),
{noreply, State#state{edge_status = EdgeStatus}};
false ->
{noreply, State}
end;
%%
handle_info({poll_task_reply, Ref, {error, timeout}}, State = #state{device_uuid = DeviceUUID, edge_status = LastEdgeStatus, status = Status, poll_ref = Ref}) ->
lager:debug("[iot_device] device_uuid: ~p, poll timeout", [DeviceUUID]),
EdgeStatus = -1,
case EdgeStatus =/= LastEdgeStatus of
true ->
save_device_edge_status(DeviceUUID, EdgeStatus, LastEdgeStatus, Status),
{noreply, State#state{edge_status = EdgeStatus}};
false ->
{noreply, State}
end;
%%
handle_info({poll_task_reply, Ref, {error, Reason}}, State = #state{device_uuid = DeviceUUID, edge_status = LastEdgeStatus, poll_ref = Ref}) ->
lager:notice("[iot_device] device_uuid: ~p, poll get error: ~p, last edge_status: ~p", [DeviceUUID, Reason, LastEdgeStatus]),
{noreply, State};
handle_info(_Info, State = #state{}) ->
{noreply, State}.
@ -344,3 +408,18 @@ rewrite_ai_event(EventType, Params) when is_integer(EventType), is_map(Params) -
undefined ->
error
end.
-spec save_device_edge_status(DeviceUUID :: binary(), EdgeStatus :: integer(), LastEdgeStatus :: integer(), Status :: integer()) -> no_return().
save_device_edge_status(DeviceUUID, EdgeStatus, LastEdgeStatus, Status)
when is_binary(DeviceUUID), is_integer(EdgeStatus), is_integer(LastEdgeStatus), is_integer(Status) ->
%%
{ok, _} = device_bo:change_edge_status(DeviceUUID, EdgeStatus),
%% influx表
Tags = #{<<"device_uuid">> => DeviceUUID},
Fields = #{
<<"edge_status">> => EdgeStatus,
<<"last_edge_status">> => LastEdgeStatus,
<<"status">> => Status
},
influx_monitor_client_pool:write_data(<<"device_status">>, Tags, Fields, iot_util:timestamp()).

View File

@ -0,0 +1,135 @@
%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2025, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 10. 3 2025 21:09
%%%-------------------------------------------------------------------
-module(iot_device_poll_task).
-author("anlicheng").
-behaviour(gen_server).
%% API
-export([start_monitor/0]).
-export([poll_task/6]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-record(state, {}).
%%%===================================================================
%%% API
%%%===================================================================
-spec poll_task(Pid :: pid(), Ref :: reference(), HostPid :: pid(), ReceiverPid :: pid(), DeviceUUID :: binary(), Timeout :: integer()) -> no_return().
poll_task(Pid, Ref, HostPid, ReceiverPid, DeviceUUID, Timeout)
when is_pid(Pid), is_reference(Ref), is_pid(HostPid), is_pid(ReceiverPid), is_binary(DeviceUUID), is_integer(Timeout) ->
gen_server:cast(Pid, {poll_task, Ref, HostPid, ReceiverPid, DeviceUUID, Timeout}).
%% @doc Spawns the server and registers the local name (unique)
-spec(start_monitor() ->
{'ok', {Pid :: pid(), MonRef :: reference()}} | 'ignore' | {'error', Reason :: term()}).
start_monitor() ->
gen_server:start_monitor(?MODULE, [], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% @private
%% @doc Initializes the server
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([]) ->
{ok, #state{}}.
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call(_Request, _From, State = #state{}) ->
{reply, ok, State}.
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast({poll_task, Ref, HostPid, ReceiverPid, DeviceUUID, Timeout}, State = #state{}) ->
Command = #{
<<"device_uuid">> => DeviceUUID,
<<"timeout">> => Timeout,
<<"command">> => <<"query_status">>
},
BinCommand = iolist_to_binary(jiffy:encode(Command, [force_utf8])),
case iot_host:publish_message(HostPid, 17, {aes, BinCommand}, Timeout * 1000) of
{ok, Reply} when is_binary(Reply) ->
case catch jiffy:decode(Reply, [return_maps]) of
%%
#{<<"result">> := ResponseList} when is_list(ResponseList) andalso length(ResponseList) > 0 ->
EdgeStatusList = lists:map(fun(#{<<"edge_status">> := EdgeStatus}) -> EdgeStatus end, ResponseList),
case lists:any(fun(S) -> S =:= 1 end, EdgeStatusList) of
true ->
ReceiverPid ! {poll_task_reply, Ref, {ok, 1}};
false ->
case lists:all(fun(S) -> S =:= 0 end, EdgeStatusList) of
true ->
ReceiverPid ! {poll_task_reply, Ref, {ok, 0}};
false ->
ReceiverPid ! {poll_task_reply, Ref, {ok, lists:min(EdgeStatusList)}}
end
end;
_ ->
ReceiverPid ! {poll_task_reply, Ref, {error, <<"invalid reply json">>}}
end;
{error, Reason} ->
ReceiverPid ! {poll_task_reply, Ref, {error, Reason}}
end,
{stop, normal, State};
handle_cast(_Request, State = #state{}) ->
{noreply, State}.
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info(_Info, State = #state{}) ->
{noreply, State}.
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(_Reason, _State = #state{}) ->
ok.
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================

View File

@ -46,6 +46,15 @@ init([]) ->
modules => ['influx_client_pool']
},
#{
id => influx_monitor_client_pool,
start => {'influx_monitor_client_pool', start_link, []},
restart => permanent,
shutdown => 2000,
type => worker,
modules => ['influx_monitor_client_pool']
},
#{
id => 'iot_task',
start => {'iot_task', start_link, []},

View File

@ -14,6 +14,9 @@
%% 数据的最大缓存量
{device_cache_size, 200},
%% 设备状态轮询周期,单位秒
{device_poll_interval, 300},
%% 事件的间隔处理逻辑
{ai_event_throttle, #{
15 => 300
@ -86,6 +89,16 @@
]}
]},
%% influxdb数据库配置, 测试环境的: 用户名: iot; password: password1234
{influx_monitor_pool, [
{pool_size, 100},
{worker_args, [
{host, "39.98.184.67"},
{port, 18086},
{token, <<"IUQ04qecTie7LSuX1EDFBeqspClOdoRBfmXDQxhoEjiJFeW8M-Ui66t981YvviI5qOBpf_ZLgJlBx7nid2lyJQ==">>}
]}
]},
%% 智慧监控平台
{donghuoliren, [
{url, "https://xsdc.njau.edu.cn/hq-cyaqjg/rest/rgkSmart/push"},
@ -153,7 +166,7 @@
{handlers, [
%% debug | info | warning | error, 日志级别
{lager_console_backend, debug},
{lager_console_backend, warning},
{lager_file_backend, [{file, "debug.log"}, {level, debug}, {size, 314572800}]},
{lager_file_backend, [{file, "notice.log"}, {level, notice}, {size, 314572800}]},
{lager_file_backend, [{file, "error.log"}, {level, error}, {size, 314572800}]},

View File

@ -14,6 +14,9 @@
%% 数据的最大缓存量
{device_cache_size, 200},
%% 设备状态轮询周期,单位秒
{device_poll_interval, 300},
%% 事件的间隔处理逻辑
{ai_event_throttle, #{
15 => 300
@ -75,6 +78,16 @@
]}
]},
%% influxdb监控数据库配置, 共用同一个influxdb
{influx_monitor_pool, [
{pool_size, 100},
{worker_args, [
{host, "172.19.0.4"},
{port, 8086},
{token, <<"A-ZRjqMK_7NR45lXXEiR7AEtYCd1ETzq9Z61FTMQLb5O4-1hSf8sCrjdPB84e__xsrItKHL3qjJALgbYN-H_VQ==">>}
]}
]},
{pools, [
%% mysql连接池配置
{mysql_iot,

View File

@ -14,6 +14,9 @@
%% 数据的最大缓存量
{device_cache_size, 200},
%% 设备状态轮询周期,单位秒
{device_poll_interval, 300},
{watchdog, [
{pri_key, "jinzhi_watchdog_pri.key"},
{url, "http://172.30.37.242:8080/hqtaskcenterapp/sys/taskCenter/taskReceive/sendNotice.do"},
@ -59,6 +62,15 @@
]}
]},
{influx_monitor_pool, [
{pool_size, 100},
{worker_args, [
{host, "172.16.0.17"},
{port, 8086},
{token, <<"_p7ehr7STau3WRk4Iy94diB-8i5gdhK7fI9H2bpJmVWKVMX57DqBwhS7ln2gkU3Q2Oy6vnTOqBXB5ilLl_2xAg==">>}
]}
]},
{pools, [
%% mysql连接池配置
{mysql_iot,

View File

@ -93,3 +93,28 @@
}
```
```ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss
## 设备状态轮询
## 2. 下发的数据格式如下
<<17:8, Body:任意长度(先json序列化然后aes加密)>>, 其中
Body:
```json
{
"device_uuid": "设备的uuid",
// 命令执行的超时时间,单位为秒
"t": 10,
"command": "query_status"
}
```
返回值:
```json
{
"edge_status": int,
"message": "描述信息"
}
```