From 856072a9f2280948d69c8901d42a0c505d8d9353 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Tue, 8 Apr 2025 10:58:16 +0800 Subject: [PATCH] fix device poll --- apps/iot/src/database/device_bo.erl | 9 +- apps/iot/src/http_handler/device_handler.erl | 55 ++++++- apps/iot/src/influxdb/influx_client_pool.erl | 2 +- .../influxdb/influx_monitor_client_pool.erl | 155 ++++++++++++++++++ apps/iot/src/iot_device.erl | 87 +++++++++- apps/iot/src/iot_device_poll_task.erl | 135 +++++++++++++++ apps/iot/src/iot_sup.erl | 9 + config/sys-dev.config | 15 +- config/sys-prod.config | 13 ++ config/sys-test.config | 12 ++ docs/publish_command.md | 27 ++- 11 files changed, 509 insertions(+), 10 deletions(-) create mode 100644 apps/iot/src/influxdb/influx_monitor_client_pool.erl create mode 100644 apps/iot/src/iot_device_poll_task.erl diff --git a/apps/iot/src/database/device_bo.erl b/apps/iot/src/database/device_bo.erl index 40875ac..7a37b72 100644 --- a/apps/iot/src/database/device_bo.erl +++ b/apps/iot/src/database/device_bo.erl @@ -11,7 +11,7 @@ -include("iot.hrl"). %% API --export([get_all_devices/0, get_host_devices/1, get_device_by_uuid/1, change_status/2]). +-export([get_all_devices/0, get_host_devices/1, get_device_by_uuid/1, change_status/2, change_edge_status/2]). -spec get_all_devices() -> {ok, Devices :: [map()]} | {error, Reason :: any()}. get_all_devices() -> @@ -51,4 +51,9 @@ change_status0(DeviceUUID, ?DEVICE_ONLINE) when is_binary(DeviceUUID) -> {error, <<"device not found">>} end; change_status0(DeviceUUID, ?DEVICE_OFFLINE) when is_binary(DeviceUUID) -> - mysql_pool:update_by(mysql_iot, <<"UPDATE device SET status = ? WHERE device_uuid = ? LIMIT 1">>, [?DEVICE_OFFLINE, DeviceUUID]). \ No newline at end of file + mysql_pool:update_by(mysql_iot, <<"UPDATE device SET status = ? WHERE device_uuid = ? LIMIT 1">>, [?DEVICE_OFFLINE, DeviceUUID]). + +%% 修改主机的状态 +-spec change_edge_status(DeviceUUID :: binary(), NEdgeStatus :: integer()) -> {ok, AffectedRows :: integer()} | {error, Reason :: any()}. +change_edge_status(DeviceUUID, NEdgeStatus) when is_binary(DeviceUUID), is_integer(NEdgeStatus) -> + mysql_pool:update_by(mysql_iot, <<"UPDATE device SET edge_status = ? WHERE device_uuid = ? LIMIT 1">>, [NEdgeStatus, DeviceUUID]). \ No newline at end of file diff --git a/apps/iot/src/http_handler/device_handler.erl b/apps/iot/src/http_handler/device_handler.erl index 3c4308c..2751bc3 100644 --- a/apps/iot/src/http_handler/device_handler.erl +++ b/apps/iot/src/http_handler/device_handler.erl @@ -12,6 +12,7 @@ %% API -export([handle_request/4]). +-export([test/1]). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% helper methods @@ -63,6 +64,58 @@ handle_request("POST", "/device/activate", _, #{<<"host_id">> := HostId, <<"devi end end; +%% 设备状态查询 +handle_request("POST", "/device/query_edge_status", _, #{<<"host_id">> := HostId, <<"device_uuid">> := DeviceUUID, <<"timeout">> := Timeout}) when is_integer(HostId), is_binary(DeviceUUID) -> + lager:debug("[device_handler] host_id: ~p, will query_edge_status uuid: ~p", [HostId, DeviceUUID]), + AliasName = iot_host:get_alias_name(HostId), + + case global:whereis_name(AliasName) of + undefined -> + {ok, 200, iot_util:json_error(404, <<"query_edge_status failed">>)}; + HostPid when is_pid(HostPid) -> + Ref = make_ref(), + {ok, {TaskPid, _}} = iot_device_poll_task:start_monitor(), + iot_device_poll_task:poll_task(TaskPid, Ref, HostPid, self(), DeviceUUID, Timeout), + receive + {poll_task_reply, Ref, {ok, EdgeStatus}} -> + {ok, _} = device_bo:change_edge_status(DeviceUUID, EdgeStatus), + {ok, 200, iot_util:json_data(#{<<"edge_status">> => EdgeStatus})}; + {poll_task_reply, Ref, {error, Reason}} -> + lager:debug("[device_handler] query_edge_status device: ~p, get error: ~p", [DeviceUUID, Reason]), + {ok, 200, iot_util:json_error(404, <<"query_edge_status failed">>)} + after Timeout * 1000 -> + lager:debug("[device_handler] query_edge_status device: ~p, timeout", [DeviceUUID]), + {ok, 200, iot_util:json_error(404, <<"query_edge_status timeout">>)} + end + end; + handle_request(_, Path, _, _) -> Path1 = list_to_binary(Path), - {ok, 200, iot_util:json_error(-1, <<"url: ", Path1/binary, " not found">>)}. \ No newline at end of file + {ok, 200, iot_util:json_error(-1, <<"url: ", Path1/binary, " not found">>)}. + + +test(DeviceUUID) when is_binary(DeviceUUID) -> + Timeout = 10, + + HostUUID = <<"0356e898adc51ac449b215627b7d8e55">>, + lager:debug("[device_handler] host_id: ~p, will query_edge_status uuid: ~p", [HostUUID, DeviceUUID]), + + case iot_host:get_pid(HostUUID) of + undefined -> + {ok, 200, iot_util:json_error(404, <<"query_edge_status failed">>)}; + HostPid when is_pid(HostPid) -> + Ref = make_ref(), + {ok, {TaskPid, _}} = iot_device_poll_task:start_monitor(), + iot_device_poll_task:poll_task(TaskPid, Ref, HostPid, self(), DeviceUUID, Timeout), + receive + {poll_task_reply, Ref, {ok, EdgeStatus}} -> + % {ok, _} = device_bo:change_edge_status(DeviceUUID, EdgeStatus), + {ok, 200, iot_util:json_data(#{<<"edge_status">> => EdgeStatus})}; + {poll_task_reply, Ref, {error, Reason}} -> + lager:debug("[device_handler] query_edge_status device: ~p, get error: ~p", [DeviceUUID, Reason]), + {ok, 200, iot_util:json_error(404, <<"query_edge_status failed">>)} + after Timeout * 1000 -> + lager:debug("[device_handler] query_edge_status device: ~p, timeout", [DeviceUUID]), + {ok, 200, iot_util:json_error(404, <<"query_edge_status timeout">>)} + end + end. diff --git a/apps/iot/src/influxdb/influx_client_pool.erl b/apps/iot/src/influxdb/influx_client_pool.erl index 13591a5..38a3228 100644 --- a/apps/iot/src/influxdb/influx_client_pool.erl +++ b/apps/iot/src/influxdb/influx_client_pool.erl @@ -2,7 +2,7 @@ %%% @author anlicheng %%% @copyright (C) 2024, %%% @doc -%%% +%%% 用于存储南农的设备数据 %%% @end %%% Created : 03. 9月 2024 11:32 %%%------------------------------------------------------------------- diff --git a/apps/iot/src/influxdb/influx_monitor_client_pool.erl b/apps/iot/src/influxdb/influx_monitor_client_pool.erl new file mode 100644 index 0000000..8b46aa2 --- /dev/null +++ b/apps/iot/src/influxdb/influx_monitor_client_pool.erl @@ -0,0 +1,155 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2024, +%%% @doc +%%% 用于存储南农的设备数据 +%%% @end +%%% Created : 03. 9月 2024 11:32 +%%%------------------------------------------------------------------- +-module(influx_monitor_client_pool). +-author("anlicheng"). + +-behaviour(gen_server). + +%% API +-export([start_link/0]). +-export([write_data/4]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +%% buffer的数量 +-define(BUFFER_SIZE, 1500). + +-define(DEFAULT_BUCKET, <<"monitor">>). +-define(DEFAULT_ORG, <<"nannong">>). + +-record(state, { + pool_pid :: pid(), + %% 缓冲区, 批量写入 + q = queue:new() +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +%% Measurement: device_status 检测的指标值,字符串 +%% Tags: #{<<"device_uuid">> => <<"abc">>} +%% Fields: #{<<"edge_status">> => 1} 监控的指标值 +%% Timestamp: 时间信息,13位整数精确到毫秒 +-spec write_data(Measurement :: binary(), Tags :: map(), Fields :: map(), Timestamp :: integer()) -> no_return(). +write_data(Measurement, Tags, Fields, Timestamp) when is_binary(Measurement), is_map(Tags), is_map(Fields), is_integer(Timestamp) -> + Point = influx_point:new(Measurement, Tags, Fields, format_timestamp(Timestamp)), + gen_server:cast(?SERVER, {write_data, [Point]}). + +format_timestamp(Timestamp) when is_integer(Timestamp) -> + case length(integer_to_list(Timestamp)) of + 10 -> Timestamp * 1000; + 13 -> Timestamp; + 16 -> Timestamp div 1000; + 19 -> Timestamp div 1000_000; + _ -> Timestamp + end. + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link() -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([]) -> + {ok, PoolProps} = application:get_env(iot, influx_monitor_pool), + PoolSize = proplists:get_value(pool_size, PoolProps), + WorkerArgs = proplists:get_value(worker_args, PoolProps), + + %% 启动工作的线程池 + {ok, PoolPid} = poolboy:start_link([{size, PoolSize}, {max_overflow, PoolSize}, {worker_module, influx_client}], WorkerArgs), + %% 定时刷新逻辑 + erlang:start_timer(5000, self(), flush_ticker), + + {ok, #state{pool_pid = PoolPid}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(_Request, _From, State = #state{}) -> + {reply, ok, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast({write_data, []}, State) -> + {noreply, State}; +handle_cast({write_data, Points}, State = #state{pool_pid = PoolPid, q = Q}) -> + NQ = queue:join(Q, queue:from_list(Points)), + + %% 超过缓冲区设置的大小则批量导入 + Len = queue:len(NQ), + case Len >= ?BUFFER_SIZE of + true -> + poolboy:transaction(PoolPid, fun(Pid) -> influx_client:write(Pid, ?DEFAULT_BUCKET, ?DEFAULT_ORG, <<"ms">>, queue:to_list(NQ)) end), + {noreply, State#state{q = queue:new()}}; + false -> + {noreply, State#state{q = NQ}} + end. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info({timeout, _, flush_ticker}, State = #state{q = Q, pool_pid = PoolPid}) -> + erlang:start_timer(5000, self(), flush_ticker), + Len = queue:len(Q), + Len > 0 andalso poolboy:transaction(PoolPid, fun(Pid) -> influx_client:write(Pid, ?DEFAULT_BUCKET, ?DEFAULT_ORG, <<"ms">>, queue:to_list(Q)) end), + lager:debug("[influx_monitor_client_pool] flush_ticker acc write num: ~p", [Len]), + + {noreply, State#state{q = queue:new()}}; + +handle_info(_Info, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== \ No newline at end of file diff --git a/apps/iot/src/iot_device.erl b/apps/iot/src/iot_device.erl index 8ae835f..2d5e6eb 100644 --- a/apps/iot/src/iot_device.erl +++ b/apps/iot/src/iot_device.erl @@ -25,6 +25,8 @@ %% 设备id device_id :: integer(), device_uuid :: binary(), + %% 主机的id + host_id :: integer(), %% 设备是否授权 auth_status :: integer(), %% 事件触发周期 @@ -35,6 +37,12 @@ %% 设备类型 model_id = 0, + %% 设备状态轮询周期 + poll_interval = 0 :: integer(), + poll_ref :: undefined | reference(), + %% 设备的最后状态,避免重复更新数据库 + edge_status = 0 :: integer(), + status = ?DEVICE_OFFLINE }). @@ -139,11 +147,19 @@ init([DeviceUUID]) when is_binary(DeviceUUID) -> lager:warning("[iot_device] device uuid: ~p, loaded from mysql failed", [DeviceUUID]), ignore end; -init([#{<<"id">> := DeviceId, <<"device_uuid">> := DeviceUUID, <<"authorize_status">> := AuthorizeStatus, <<"status">> := Status, <<"model_id">> := ModelId}]) -> + +init([DeviceInfo = #{<<"id">> := DeviceId, <<"device_uuid">> := DeviceUUID, <<"host_id">> := HostId, <<"authorize_status">> := AuthorizeStatus, <<"status">> := Status, <<"model_id">> := ModelId}]) -> {ok, AiEventThrottle} = application:get_env(iot, ai_event_throttle), - {ok, #state{device_id = DeviceId, device_uuid = DeviceUUID, ai_event_throttle = AiEventThrottle, - status = as_integer(Status), auth_status = as_integer(AuthorizeStatus), model_id = as_integer(ModelId)}}. + EdgeStatus = maps:get(<<"edge_status">>, DeviceInfo, 0), + + %% 轮询机制 + {ok, PollInterval0} = application:get_env(iot, device_poll_interval), + PollInterval = PollInterval0 * 1000, + erlang:start_timer(PollInterval, self(), poll_ticker), + + {ok, #state{device_id = DeviceId, device_uuid = DeviceUUID, ai_event_throttle = AiEventThrottle, host_id = as_integer(HostId), edge_status = EdgeStatus, + status = as_integer(Status), auth_status = as_integer(AuthorizeStatus), model_id = as_integer(ModelId), poll_interval = PollInterval}}. %% @private %% @doc Handling call messages @@ -266,6 +282,54 @@ handle_cast({handle_data, Fields, Timestamp}, State = #state{device_uuid = Devic {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). +handle_info({timeout, _, poll_ticker}, State = #state{poll_interval = PollInterval, device_uuid = DeviceUUID, host_id = HostId}) -> + %% 开启下一次轮询 + erlang:start_timer(PollInterval, self(), poll_ticker), + + AliasName = iot_host:get_alias_name(HostId), + case global:whereis_name(AliasName) of + undefined -> + lager:warning("[iot_device] device_uuid: ~p, host not found", [DeviceUUID]), + {noreply, State}; + HostPid when is_pid(HostPid) -> + %% 建立临时任务,不阻塞当前进程 + Ref = make_ref(), + {ok, {TaskPid, _}} = iot_device_poll_task:start_monitor(), + iot_device_poll_task:poll_task(TaskPid, Ref, HostPid, self(), DeviceUUID, 10), + + {noreply, State#state{poll_ref = Ref}} + end; + +%% 处理响应 +handle_info({poll_task_reply, Ref, {ok, EdgeStatus}}, State = #state{device_uuid = DeviceUUID, edge_status = LastEdgeStatus, status = Status, poll_ref = Ref}) -> + case EdgeStatus =/= LastEdgeStatus of + true -> + lager:debug("[iot_device] device_uuid: ~p, poll edge_status is: ~p", [DeviceUUID, EdgeStatus]), + save_device_edge_status(DeviceUUID, EdgeStatus, LastEdgeStatus, Status), + + {noreply, State#state{edge_status = EdgeStatus}}; + false -> + {noreply, State} + end; + +%% 通讯超时逻辑需要处理 +handle_info({poll_task_reply, Ref, {error, timeout}}, State = #state{device_uuid = DeviceUUID, edge_status = LastEdgeStatus, status = Status, poll_ref = Ref}) -> + lager:debug("[iot_device] device_uuid: ~p, poll timeout", [DeviceUUID]), + EdgeStatus = -1, + case EdgeStatus =/= LastEdgeStatus of + true -> + save_device_edge_status(DeviceUUID, EdgeStatus, LastEdgeStatus, Status), + + {noreply, State#state{edge_status = EdgeStatus}}; + false -> + {noreply, State} + end; + +%% 其他类型的错误暂时不更新数据库状态 +handle_info({poll_task_reply, Ref, {error, Reason}}, State = #state{device_uuid = DeviceUUID, edge_status = LastEdgeStatus, poll_ref = Ref}) -> + lager:notice("[iot_device] device_uuid: ~p, poll get error: ~p, last edge_status: ~p", [DeviceUUID, Reason, LastEdgeStatus]), + {noreply, State}; + handle_info(_Info, State = #state{}) -> {noreply, State}. @@ -343,4 +407,19 @@ rewrite_ai_event(EventType, Params) when is_integer(EventType), is_map(Params) - {ok, Params#{<<"event_code">> => EventCode, <<"description">> => EventName}}; undefined -> error - end. \ No newline at end of file + end. + +-spec save_device_edge_status(DeviceUUID :: binary(), EdgeStatus :: integer(), LastEdgeStatus :: integer(), Status :: integer()) -> no_return(). +save_device_edge_status(DeviceUUID, EdgeStatus, LastEdgeStatus, Status) + when is_binary(DeviceUUID), is_integer(EdgeStatus), is_integer(LastEdgeStatus), is_integer(Status) -> + + %% 更新数据库 + {ok, _} = device_bo:change_edge_status(DeviceUUID, EdgeStatus), + %% 写入influx表 + Tags = #{<<"device_uuid">> => DeviceUUID}, + Fields = #{ + <<"edge_status">> => EdgeStatus, + <<"last_edge_status">> => LastEdgeStatus, + <<"status">> => Status + }, + influx_monitor_client_pool:write_data(<<"device_status">>, Tags, Fields, iot_util:timestamp()). \ No newline at end of file diff --git a/apps/iot/src/iot_device_poll_task.erl b/apps/iot/src/iot_device_poll_task.erl new file mode 100644 index 0000000..585f80d --- /dev/null +++ b/apps/iot/src/iot_device_poll_task.erl @@ -0,0 +1,135 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2025, +%%% @doc +%%% +%%% @end +%%% Created : 10. 3月 2025 21:09 +%%%------------------------------------------------------------------- +-module(iot_device_poll_task). +-author("anlicheng"). + +-behaviour(gen_server). + +%% API +-export([start_monitor/0]). +-export([poll_task/6]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, {}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +-spec poll_task(Pid :: pid(), Ref :: reference(), HostPid :: pid(), ReceiverPid :: pid(), DeviceUUID :: binary(), Timeout :: integer()) -> no_return(). +poll_task(Pid, Ref, HostPid, ReceiverPid, DeviceUUID, Timeout) + when is_pid(Pid), is_reference(Ref), is_pid(HostPid), is_pid(ReceiverPid), is_binary(DeviceUUID), is_integer(Timeout) -> + gen_server:cast(Pid, {poll_task, Ref, HostPid, ReceiverPid, DeviceUUID, Timeout}). + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_monitor() -> + {'ok', {Pid :: pid(), MonRef :: reference()}} | 'ignore' | {'error', Reason :: term()}). +start_monitor() -> + gen_server:start_monitor(?MODULE, [], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([]) -> + {ok, #state{}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(_Request, _From, State = #state{}) -> + {reply, ok, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast({poll_task, Ref, HostPid, ReceiverPid, DeviceUUID, Timeout}, State = #state{}) -> + Command = #{ + <<"device_uuid">> => DeviceUUID, + <<"timeout">> => Timeout, + <<"command">> => <<"query_status">> + }, + BinCommand = iolist_to_binary(jiffy:encode(Command, [force_utf8])), + + case iot_host:publish_message(HostPid, 17, {aes, BinCommand}, Timeout * 1000) of + {ok, Reply} when is_binary(Reply) -> + case catch jiffy:decode(Reply, [return_maps]) of + %% 返回的是一个数组 + #{<<"result">> := ResponseList} when is_list(ResponseList) andalso length(ResponseList) > 0 -> + EdgeStatusList = lists:map(fun(#{<<"edge_status">> := EdgeStatus}) -> EdgeStatus end, ResponseList), + case lists:any(fun(S) -> S =:= 1 end, EdgeStatusList) of + true -> + ReceiverPid ! {poll_task_reply, Ref, {ok, 1}}; + false -> + case lists:all(fun(S) -> S =:= 0 end, EdgeStatusList) of + true -> + ReceiverPid ! {poll_task_reply, Ref, {ok, 0}}; + false -> + ReceiverPid ! {poll_task_reply, Ref, {ok, lists:min(EdgeStatusList)}} + end + end; + _ -> + ReceiverPid ! {poll_task_reply, Ref, {error, <<"invalid reply json">>}} + end; + {error, Reason} -> + ReceiverPid ! {poll_task_reply, Ref, {error, Reason}} + end, + {stop, normal, State}; +handle_cast(_Request, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info(_Info, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== diff --git a/apps/iot/src/iot_sup.erl b/apps/iot/src/iot_sup.erl index 8d7ab89..03834d8 100644 --- a/apps/iot/src/iot_sup.erl +++ b/apps/iot/src/iot_sup.erl @@ -46,6 +46,15 @@ init([]) -> modules => ['influx_client_pool'] }, + #{ + id => influx_monitor_client_pool, + start => {'influx_monitor_client_pool', start_link, []}, + restart => permanent, + shutdown => 2000, + type => worker, + modules => ['influx_monitor_client_pool'] + }, + #{ id => 'iot_task', start => {'iot_task', start_link, []}, diff --git a/config/sys-dev.config b/config/sys-dev.config index 2e8851f..ee29974 100644 --- a/config/sys-dev.config +++ b/config/sys-dev.config @@ -14,6 +14,9 @@ %% 数据的最大缓存量 {device_cache_size, 200}, + %% 设备状态轮询周期,单位秒 + {device_poll_interval, 300}, + %% 事件的间隔处理逻辑 {ai_event_throttle, #{ 15 => 300 @@ -86,6 +89,16 @@ ]} ]}, + %% influxdb数据库配置, 测试环境的: 用户名: iot; password: password1234 + {influx_monitor_pool, [ + {pool_size, 100}, + {worker_args, [ + {host, "39.98.184.67"}, + {port, 18086}, + {token, <<"IUQ04qecTie7LSuX1EDFBeqspClOdoRBfmXDQxhoEjiJFeW8M-Ui66t981YvviI5qOBpf_ZLgJlBx7nid2lyJQ==">>} + ]} + ]}, + %% 智慧监控平台 {donghuoliren, [ {url, "https://xsdc.njau.edu.cn/hq-cyaqjg/rest/rgkSmart/push"}, @@ -153,7 +166,7 @@ {handlers, [ %% debug | info | warning | error, 日志级别 - {lager_console_backend, debug}, + {lager_console_backend, warning}, {lager_file_backend, [{file, "debug.log"}, {level, debug}, {size, 314572800}]}, {lager_file_backend, [{file, "notice.log"}, {level, notice}, {size, 314572800}]}, {lager_file_backend, [{file, "error.log"}, {level, error}, {size, 314572800}]}, diff --git a/config/sys-prod.config b/config/sys-prod.config index cc51b3e..f2f7874 100644 --- a/config/sys-prod.config +++ b/config/sys-prod.config @@ -14,6 +14,9 @@ %% 数据的最大缓存量 {device_cache_size, 200}, + %% 设备状态轮询周期,单位秒 + {device_poll_interval, 300}, + %% 事件的间隔处理逻辑 {ai_event_throttle, #{ 15 => 300 @@ -75,6 +78,16 @@ ]} ]}, + %% influxdb监控数据库配置, 共用同一个influxdb + {influx_monitor_pool, [ + {pool_size, 100}, + {worker_args, [ + {host, "172.19.0.4"}, + {port, 8086}, + {token, <<"A-ZRjqMK_7NR45lXXEiR7AEtYCd1ETzq9Z61FTMQLb5O4-1hSf8sCrjdPB84e__xsrItKHL3qjJALgbYN-H_VQ==">>} + ]} + ]}, + {pools, [ %% mysql连接池配置 {mysql_iot, diff --git a/config/sys-test.config b/config/sys-test.config index c91219c..17be5ac 100644 --- a/config/sys-test.config +++ b/config/sys-test.config @@ -14,6 +14,9 @@ %% 数据的最大缓存量 {device_cache_size, 200}, + %% 设备状态轮询周期,单位秒 + {device_poll_interval, 300}, + {watchdog, [ {pri_key, "jinzhi_watchdog_pri.key"}, {url, "http://172.30.37.242:8080/hqtaskcenterapp/sys/taskCenter/taskReceive/sendNotice.do"}, @@ -59,6 +62,15 @@ ]} ]}, + {influx_monitor_pool, [ + {pool_size, 100}, + {worker_args, [ + {host, "172.16.0.17"}, + {port, 8086}, + {token, <<"_p7ehr7STau3WRk4Iy94diB-8i5gdhK7fI9H2bpJmVWKVMX57DqBwhS7ln2gkU3Q2Oy6vnTOqBXB5ilLl_2xAg==">>} + ]} + ]}, + {pools, [ %% mysql连接池配置 {mysql_iot, diff --git a/docs/publish_command.md b/docs/publish_command.md index c46f7da..7ff08fb 100644 --- a/docs/publish_command.md +++ b/docs/publish_command.md @@ -92,4 +92,29 @@ "scene_id": "scene_id" } ``` - ```ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss \ No newline at end of file + ```ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss + +## 设备状态轮询 + +## 2. 下发的数据格式如下 + <<17:8, Body:任意长度(先json序列化,然后aes加密)>>, 其中 + + Body: + ```json + { + "device_uuid": "设备的uuid", + // 命令执行的超时时间,单位为秒 + "t": 10, + "command": "query_status" + } + ``` + + 返回值: + + ```json + { + "edge_status": int, + "message": "描述信息" + } + ``` + \ No newline at end of file