简化poll模型
This commit is contained in:
parent
3050841280
commit
76edfcd11f
@ -37,9 +37,6 @@
|
|||||||
%% 设备类型
|
%% 设备类型
|
||||||
model_id = 0,
|
model_id = 0,
|
||||||
|
|
||||||
%% 设备状态轮询周期
|
|
||||||
poll_interval = 0 :: integer(),
|
|
||||||
poll_ref :: undefined | reference(),
|
|
||||||
%% 设备的最后状态,避免重复更新数据库
|
%% 设备的最后状态,避免重复更新数据库
|
||||||
edge_status = 0 :: integer(),
|
edge_status = 0 :: integer(),
|
||||||
|
|
||||||
@ -131,13 +128,8 @@ init([DeviceInfo = #{<<"id">> := DeviceId, <<"device_uuid">> := DeviceUUID, <<"h
|
|||||||
|
|
||||||
EdgeStatus = maps:get(<<"edge_status">>, DeviceInfo, 0),
|
EdgeStatus = maps:get(<<"edge_status">>, DeviceInfo, 0),
|
||||||
|
|
||||||
%% 轮询机制
|
|
||||||
{ok, PollInterval0} = application:get_env(iot, device_poll_interval),
|
|
||||||
PollInterval = PollInterval0 * 1000,
|
|
||||||
erlang:start_timer(PollInterval, self(), poll_ticker),
|
|
||||||
|
|
||||||
{ok, #state{device_id = DeviceId, device_uuid = DeviceUUID, ai_event_throttle = AiEventThrottle, host_id = as_integer(HostId), edge_status = EdgeStatus,
|
{ok, #state{device_id = DeviceId, device_uuid = DeviceUUID, ai_event_throttle = AiEventThrottle, host_id = as_integer(HostId), edge_status = EdgeStatus,
|
||||||
status = as_integer(Status), auth_status = as_integer(AuthorizeStatus), model_id = as_integer(ModelId), poll_interval = PollInterval}}.
|
status = as_integer(Status), auth_status = as_integer(AuthorizeStatus), model_id = as_integer(ModelId)}}.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
%% @doc Handling call messages
|
%% @doc Handling call messages
|
||||||
@ -260,26 +252,8 @@ handle_cast({handle_data, Fields, Timestamp}, State = #state{device_uuid = Devic
|
|||||||
{noreply, NewState :: #state{}} |
|
{noreply, NewState :: #state{}} |
|
||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
handle_info({timeout, _, poll_ticker}, State = #state{poll_interval = PollInterval, device_uuid = DeviceUUID, host_id = HostId}) ->
|
|
||||||
%% 开启下一次轮询
|
|
||||||
erlang:start_timer(PollInterval, self(), poll_ticker),
|
|
||||||
|
|
||||||
AliasName = iot_host:get_alias_name(HostId),
|
|
||||||
case global:whereis_name(AliasName) of
|
|
||||||
undefined ->
|
|
||||||
lager:warning("[iot_device] device_uuid: ~p, host not found", [DeviceUUID]),
|
|
||||||
{noreply, State};
|
|
||||||
HostPid when is_pid(HostPid) ->
|
|
||||||
%% 建立临时任务,不阻塞当前进程
|
|
||||||
Ref = make_ref(),
|
|
||||||
{ok, {TaskPid, _}} = iot_device_poll_task:start_monitor(),
|
|
||||||
iot_device_poll_task:poll_task(TaskPid, Ref, HostPid, self(), DeviceUUID, 10),
|
|
||||||
|
|
||||||
{noreply, State#state{poll_ref = Ref}}
|
|
||||||
end;
|
|
||||||
|
|
||||||
%% 处理响应
|
%% 处理响应
|
||||||
handle_info({poll_task_reply, Ref, {ok, EdgeStatus}}, State = #state{device_uuid = DeviceUUID, edge_status = LastEdgeStatus, status = Status, poll_ref = Ref}) ->
|
handle_info({poll_task_reply, {ok, EdgeStatus}}, State = #state{device_uuid = DeviceUUID, edge_status = LastEdgeStatus, status = Status}) ->
|
||||||
case EdgeStatus =/= LastEdgeStatus of
|
case EdgeStatus =/= LastEdgeStatus of
|
||||||
true ->
|
true ->
|
||||||
lager:debug("[iot_device] device_uuid: ~p, poll edge_status is: ~p", [DeviceUUID, EdgeStatus]),
|
lager:debug("[iot_device] device_uuid: ~p, poll edge_status is: ~p", [DeviceUUID, EdgeStatus]),
|
||||||
@ -291,7 +265,7 @@ handle_info({poll_task_reply, Ref, {ok, EdgeStatus}}, State = #state{device_uuid
|
|||||||
end;
|
end;
|
||||||
|
|
||||||
%% 通讯超时逻辑需要处理
|
%% 通讯超时逻辑需要处理
|
||||||
handle_info({poll_task_reply, Ref, {error, timeout}}, State = #state{device_uuid = DeviceUUID, edge_status = LastEdgeStatus, status = Status, poll_ref = Ref}) ->
|
handle_info({poll_task_reply, {error, timeout}}, State = #state{device_uuid = DeviceUUID, edge_status = LastEdgeStatus, status = Status}) ->
|
||||||
lager:debug("[iot_device] device_uuid: ~p, poll timeout", [DeviceUUID]),
|
lager:debug("[iot_device] device_uuid: ~p, poll timeout", [DeviceUUID]),
|
||||||
EdgeStatus = -1,
|
EdgeStatus = -1,
|
||||||
case EdgeStatus =/= LastEdgeStatus of
|
case EdgeStatus =/= LastEdgeStatus of
|
||||||
@ -304,7 +278,7 @@ handle_info({poll_task_reply, Ref, {error, timeout}}, State = #state{device_uuid
|
|||||||
end;
|
end;
|
||||||
|
|
||||||
%% 其他类型的错误暂时不更新数据库状态
|
%% 其他类型的错误暂时不更新数据库状态
|
||||||
handle_info({poll_task_reply, Ref, {error, Reason}}, State = #state{device_uuid = DeviceUUID, edge_status = LastEdgeStatus, poll_ref = Ref}) ->
|
handle_info({poll_task_reply, {error, Reason}}, State = #state{device_uuid = DeviceUUID, edge_status = LastEdgeStatus}) ->
|
||||||
lager:notice("[iot_device] device_uuid: ~p, poll get error: ~p, last edge_status: ~p", [DeviceUUID, Reason, LastEdgeStatus]),
|
lager:notice("[iot_device] device_uuid: ~p, poll get error: ~p, last edge_status: ~p", [DeviceUUID, Reason, LastEdgeStatus]),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
|
|||||||
@ -1,135 +0,0 @@
|
|||||||
%%%-------------------------------------------------------------------
|
|
||||||
%%% @author anlicheng
|
|
||||||
%%% @copyright (C) 2025, <COMPANY>
|
|
||||||
%%% @doc
|
|
||||||
%%%
|
|
||||||
%%% @end
|
|
||||||
%%% Created : 10. 3月 2025 21:09
|
|
||||||
%%%-------------------------------------------------------------------
|
|
||||||
-module(iot_device_poll_task).
|
|
||||||
-author("anlicheng").
|
|
||||||
|
|
||||||
-behaviour(gen_server).
|
|
||||||
|
|
||||||
%% API
|
|
||||||
-export([start_monitor/0]).
|
|
||||||
-export([poll_task/6]).
|
|
||||||
|
|
||||||
%% gen_server callbacks
|
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
|
||||||
|
|
||||||
-define(SERVER, ?MODULE).
|
|
||||||
|
|
||||||
-record(state, {}).
|
|
||||||
|
|
||||||
%%%===================================================================
|
|
||||||
%%% API
|
|
||||||
%%%===================================================================
|
|
||||||
|
|
||||||
-spec poll_task(Pid :: pid(), Ref :: reference(), HostPid :: pid(), ReceiverPid :: pid(), DeviceUUID :: binary(), Timeout :: integer()) -> no_return().
|
|
||||||
poll_task(Pid, Ref, HostPid, ReceiverPid, DeviceUUID, Timeout)
|
|
||||||
when is_pid(Pid), is_reference(Ref), is_pid(HostPid), is_pid(ReceiverPid), is_binary(DeviceUUID), is_integer(Timeout) ->
|
|
||||||
gen_server:cast(Pid, {poll_task, Ref, HostPid, ReceiverPid, DeviceUUID, Timeout}).
|
|
||||||
|
|
||||||
%% @doc Spawns the server and registers the local name (unique)
|
|
||||||
-spec(start_monitor() ->
|
|
||||||
{'ok', {Pid :: pid(), MonRef :: reference()}} | 'ignore' | {'error', Reason :: term()}).
|
|
||||||
start_monitor() ->
|
|
||||||
gen_server:start_monitor(?MODULE, [], []).
|
|
||||||
|
|
||||||
%%%===================================================================
|
|
||||||
%%% gen_server callbacks
|
|
||||||
%%%===================================================================
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Initializes the server
|
|
||||||
-spec(init(Args :: term()) ->
|
|
||||||
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
|
|
||||||
{stop, Reason :: term()} | ignore).
|
|
||||||
init([]) ->
|
|
||||||
{ok, #state{}}.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Handling call messages
|
|
||||||
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
|
|
||||||
State :: #state{}) ->
|
|
||||||
{reply, Reply :: term(), NewState :: #state{}} |
|
|
||||||
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
|
|
||||||
{noreply, NewState :: #state{}} |
|
|
||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
|
||||||
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
|
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
|
||||||
handle_call(_Request, _From, State = #state{}) ->
|
|
||||||
{reply, ok, State}.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Handling cast messages
|
|
||||||
-spec(handle_cast(Request :: term(), State :: #state{}) ->
|
|
||||||
{noreply, NewState :: #state{}} |
|
|
||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
|
||||||
handle_cast({poll_task, Ref, HostPid, ReceiverPid, DeviceUUID, Timeout}, State = #state{}) ->
|
|
||||||
Command = #{
|
|
||||||
<<"device_uuid">> => DeviceUUID,
|
|
||||||
<<"timeout">> => Timeout,
|
|
||||||
<<"command">> => <<"query_status">>
|
|
||||||
},
|
|
||||||
BinCommand = iolist_to_binary(jiffy:encode(Command, [force_utf8])),
|
|
||||||
|
|
||||||
case iot_host:publish_message(HostPid, 17, {aes, BinCommand}, Timeout * 1000) of
|
|
||||||
{ok, Reply} when is_binary(Reply) ->
|
|
||||||
case catch jiffy:decode(Reply, [return_maps]) of
|
|
||||||
%% 返回的是一个数组
|
|
||||||
#{<<"result">> := ResponseList} when is_list(ResponseList) andalso length(ResponseList) > 0 ->
|
|
||||||
EdgeStatusList = lists:map(fun(#{<<"edge_status">> := EdgeStatus}) -> EdgeStatus end, ResponseList),
|
|
||||||
case lists:any(fun(S) -> S =:= 1 end, EdgeStatusList) of
|
|
||||||
true ->
|
|
||||||
ReceiverPid ! {poll_task_reply, Ref, {ok, 1}};
|
|
||||||
false ->
|
|
||||||
case lists:all(fun(S) -> S =:= 0 end, EdgeStatusList) of
|
|
||||||
true ->
|
|
||||||
ReceiverPid ! {poll_task_reply, Ref, {ok, 0}};
|
|
||||||
false ->
|
|
||||||
ReceiverPid ! {poll_task_reply, Ref, {ok, lists:min(EdgeStatusList)}}
|
|
||||||
end
|
|
||||||
end;
|
|
||||||
_ ->
|
|
||||||
ReceiverPid ! {poll_task_reply, Ref, {error, <<"invalid reply json">>}}
|
|
||||||
end;
|
|
||||||
{error, Reason} ->
|
|
||||||
ReceiverPid ! {poll_task_reply, Ref, {error, Reason}}
|
|
||||||
end,
|
|
||||||
{stop, normal, State};
|
|
||||||
handle_cast(_Request, State = #state{}) ->
|
|
||||||
{noreply, State}.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Handling all non call/cast messages
|
|
||||||
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
|
|
||||||
{noreply, NewState :: #state{}} |
|
|
||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
|
||||||
handle_info(_Info, State = #state{}) ->
|
|
||||||
{noreply, State}.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc This function is called by a gen_server when it is about to
|
|
||||||
%% terminate. It should be the opposite of Module:init/1 and do any
|
|
||||||
%% necessary cleaning up. When it returns, the gen_server terminates
|
|
||||||
%% with Reason. The return value is ignored.
|
|
||||||
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
|
|
||||||
State :: #state{}) -> term()).
|
|
||||||
terminate(_Reason, _State = #state{}) ->
|
|
||||||
ok.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Convert process state when code is changed
|
|
||||||
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
|
|
||||||
Extra :: term()) ->
|
|
||||||
{ok, NewState :: #state{}} | {error, Reason :: term()}).
|
|
||||||
code_change(_OldVsn, State = #state{}, _Extra) ->
|
|
||||||
{ok, State}.
|
|
||||||
|
|
||||||
%%%===================================================================
|
|
||||||
%%% Internal functions
|
|
||||||
%%%===================================================================
|
|
||||||
@ -43,7 +43,10 @@
|
|||||||
%% websocket相关
|
%% websocket相关
|
||||||
channel_pid :: undefined | pid(),
|
channel_pid :: undefined | pid(),
|
||||||
%% 主机的相关信息
|
%% 主机的相关信息
|
||||||
metrics = #{} :: map()
|
metrics = #{} :: map(),
|
||||||
|
|
||||||
|
%% 设备状态轮询器
|
||||||
|
poller_pid :: pid()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
@ -199,10 +202,13 @@ init([UUID]) ->
|
|||||||
erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat_ticker),
|
erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat_ticker),
|
||||||
|
|
||||||
StateName = case AuthorizeStatus =:= 1 of
|
StateName = case AuthorizeStatus =:= 1 of
|
||||||
true -> ?STATE_ACTIVATED;
|
true ->
|
||||||
false -> ?STATE_DENIED
|
?STATE_ACTIVATED;
|
||||||
|
false ->
|
||||||
|
?STATE_DENIED
|
||||||
end,
|
end,
|
||||||
{ok, StateName, #state{host_id = HostId, uuid = UUID, aes = Aes, name = Name, has_session = false}};
|
{ok, PollerPid} = iot_host_device_poller:start_link(HostId, self()),
|
||||||
|
{ok, StateName, #state{host_id = HostId, uuid = UUID, aes = Aes, name = Name, poller_pid = PollerPid, has_session = false}};
|
||||||
undefined ->
|
undefined ->
|
||||||
lager:warning("[iot_host] host uuid: ~p, loaded from mysql failed", [UUID]),
|
lager:warning("[iot_host] host uuid: ~p, loaded from mysql failed", [UUID]),
|
||||||
ignore
|
ignore
|
||||||
@ -305,25 +311,35 @@ handle_event({call, From}, {send_directive, Directive}, _, State = #state{uuid =
|
|||||||
{keep_state, State, [{reply, From, {error, <<"主机离线,发送指令失败"/utf8>>}}]};
|
{keep_state, State, [{reply, From, {error, <<"主机离线,发送指令失败"/utf8>>}}]};
|
||||||
|
|
||||||
%% 激活主机
|
%% 激活主机
|
||||||
handle_event({call, From}, {activate, true}, _, State = #state{uuid = UUID, aes = Aes, channel_pid = ChannelPid}) when is_pid(ChannelPid) ->
|
handle_event({call, From}, {activate, true}, _, State = #state{uuid = UUID, aes = Aes, channel_pid = ChannelPid, poller_pid = PollerPid}) when is_pid(ChannelPid) ->
|
||||||
BinReply = jiffy:encode(#{<<"auth">> => true, <<"aes">> => Aes}, [force_utf8]),
|
BinReply = jiffy:encode(#{<<"auth">> => true, <<"aes">> => Aes}, [force_utf8]),
|
||||||
ws_channel:send(ChannelPid, <<8:8, BinReply/binary>>),
|
ws_channel:send(ChannelPid, <<8:8, BinReply/binary>>),
|
||||||
lager:debug("[iot_host] uuid: ~p, activate: true, will send message: ~p", [UUID, BinReply]),
|
lager:debug("[iot_host] uuid: ~p, activate: true, will send message: ~p", [UUID, BinReply]),
|
||||||
|
%% 启动轮询
|
||||||
|
iot_host_device_poller:boot(PollerPid),
|
||||||
|
|
||||||
{next_state, ?STATE_ACTIVATED, State, [{reply, From, ok}]};
|
{next_state, ?STATE_ACTIVATED, State, [{reply, From, ok}]};
|
||||||
handle_event({call, From}, {activate, true}, _, State = #state{uuid = UUID, channel_pid = undefined}) ->
|
handle_event({call, From}, {activate, true}, _, State = #state{uuid = UUID, channel_pid = undefined}) ->
|
||||||
lager:debug("[iot_host] uuid: ~p, activate: true, no channel", [UUID]),
|
lager:debug("[iot_host] uuid: ~p, activate: true, no channel", [UUID]),
|
||||||
{next_state, ?STATE_ACTIVATED, State, [{reply, From, ok}]};
|
{next_state, ?STATE_ACTIVATED, State, [{reply, From, ok}]};
|
||||||
|
|
||||||
%% 关闭授权
|
%% 关闭授权
|
||||||
handle_event({call, From}, {activate, false}, _, State = #state{uuid = UUID, channel_pid = ChannelPid}) when is_pid(ChannelPid) ->
|
handle_event({call, From}, {activate, false}, _, State = #state{uuid = UUID, channel_pid = ChannelPid, poller_pid = PollerPid}) when is_pid(ChannelPid) ->
|
||||||
BinReply = jiffy:encode(#{<<"auth">> => false}, [force_utf8]),
|
BinReply = jiffy:encode(#{<<"auth">> => false}, [force_utf8]),
|
||||||
ws_channel:send(ChannelPid, <<8:8, BinReply/binary>>),
|
ws_channel:send(ChannelPid, <<8:8, BinReply/binary>>),
|
||||||
ws_channel:stop(ChannelPid, closed),
|
ws_channel:stop(ChannelPid, closed),
|
||||||
lager:debug("[iot_host] uuid: ~p, activate: false, will send message: ~p", [UUID, BinReply]),
|
lager:debug("[iot_host] uuid: ~p, activate: false, will send message: ~p", [UUID, BinReply]),
|
||||||
|
|
||||||
|
%% 暂停轮询
|
||||||
|
iot_host_device_poller:pause(PollerPid),
|
||||||
|
|
||||||
{next_state, ?STATE_DENIED, State#state{channel_pid = undefined, has_session = false}, [{reply, From, ok}]};
|
{next_state, ?STATE_DENIED, State#state{channel_pid = undefined, has_session = false}, [{reply, From, ok}]};
|
||||||
|
|
||||||
handle_event({call, From}, {activate, false}, _, State = #state{uuid = UUID, channel_pid = undefined}) ->
|
handle_event({call, From}, {activate, false}, _, State = #state{uuid = UUID, poller_pid = PollerPid, channel_pid = undefined}) ->
|
||||||
lager:debug("[iot_host] uuid: ~p, activate: false, no channel", [UUID]),
|
lager:debug("[iot_host] uuid: ~p, activate: false, no channel", [UUID]),
|
||||||
|
%% 暂停轮询
|
||||||
|
iot_host_device_poller:pause(PollerPid),
|
||||||
|
|
||||||
{next_state, ?STATE_DENIED, State#state{has_session = false}, [{reply, From, ok}]};
|
{next_state, ?STATE_DENIED, State#state{has_session = false}, [{reply, From, ok}]};
|
||||||
|
|
||||||
%% 绑定channel
|
%% 绑定channel
|
||||||
@ -337,7 +353,7 @@ handle_event({call, From}, {attach_channel, _}, _, State = #state{uuid = UUID, c
|
|||||||
{keep_state, State, [{reply, From, {error, <<"channel existed">>}}]};
|
{keep_state, State, [{reply, From, {error, <<"channel existed">>}}]};
|
||||||
|
|
||||||
%% 授权通过后,才能将主机的状态设置为在线状态
|
%% 授权通过后,才能将主机的状态设置为在线状态
|
||||||
handle_event({call, From}, {create_session, PubKey}, ?STATE_ACTIVATED, State = #state{uuid = UUID, aes = Aes, name = Name}) ->
|
handle_event({call, From}, {create_session, PubKey}, ?STATE_ACTIVATED, State = #state{uuid = UUID, poller_pid = PollerPid, aes = Aes, name = Name}) ->
|
||||||
Reply = #{<<"a">> => true, <<"aes">> => Aes},
|
Reply = #{<<"a">> => true, <<"aes">> => Aes},
|
||||||
EncReply = iot_cipher_rsa:encode(Reply, PubKey),
|
EncReply = iot_cipher_rsa:encode(Reply, PubKey),
|
||||||
{ok, AffectedRow} = host_bo:change_status(UUID, ?HOST_ONLINE),
|
{ok, AffectedRow} = host_bo:change_status(UUID, ?HOST_ONLINE),
|
||||||
@ -348,6 +364,9 @@ handle_event({call, From}, {create_session, PubKey}, ?STATE_ACTIVATED, State = #
|
|||||||
|
|
||||||
report_event(UUID, ?HOST_ONLINE),
|
report_event(UUID, ?HOST_ONLINE),
|
||||||
|
|
||||||
|
%% 启动轮询
|
||||||
|
iot_host_device_poller:boot(PollerPid),
|
||||||
|
|
||||||
lager:debug("[iot_host] host_id(session) uuid: ~p, create_session, will change status, affected_row: ~p", [UUID, AffectedRow]),
|
lager:debug("[iot_host] host_id(session) uuid: ~p, create_session, will change status, affected_row: ~p", [UUID, AffectedRow]),
|
||||||
{keep_state, State#state{has_session = true}, [{reply, From, {ok, <<10:8, EncReply/binary>>}}]};
|
{keep_state, State#state{has_session = true}, [{reply, From, {ok, <<10:8, EncReply/binary>>}}]};
|
||||||
|
|
||||||
@ -358,30 +377,42 @@ handle_event({call, From}, {create_session, PubKey}, ?STATE_DENIED, State = #sta
|
|||||||
{keep_state, State#state{has_session = false}, [{reply, From, {ok, <<10:8, EncReply/binary>>}}]};
|
{keep_state, State#state{has_session = false}, [{reply, From, {ok, <<10:8, EncReply/binary>>}}]};
|
||||||
|
|
||||||
%% 重新加载设备信息
|
%% 重新加载设备信息
|
||||||
handle_event({call, From}, {reload_device, DeviceUUID}, _, State) ->
|
handle_event({call, From}, {reload_device, DeviceUUID}, _, State = #state{poller_pid = PollerPid}) ->
|
||||||
case iot_device_sup:ensured_device_started(DeviceUUID) of
|
case iot_device_sup:ensured_device_started(DeviceUUID) of
|
||||||
{ok, DevicePid} ->
|
{ok, DevicePid} ->
|
||||||
iot_device:reload(DevicePid),
|
iot_device:reload(DevicePid),
|
||||||
|
%% 增加设备
|
||||||
|
iot_host_device_poller:add_device(PollerPid, DeviceUUID),
|
||||||
|
|
||||||
{keep_state, State, [{reply, From, ok}]};
|
{keep_state, State, [{reply, From, ok}]};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
{keep_state, State, [{reply, From, {error, Reason}}]}
|
{keep_state, State, [{reply, From, {error, Reason}}]}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
%% 删除设备
|
%% 删除设备
|
||||||
handle_event({call, From}, {delete_device, DeviceUUID}, _, State) ->
|
handle_event({call, From}, {delete_device, DeviceUUID}, _, State = #state{poller_pid = PollerPid}) ->
|
||||||
case iot_device:get_pid(DeviceUUID) of
|
case iot_device:get_pid(DeviceUUID) of
|
||||||
undefined ->
|
undefined ->
|
||||||
ok;
|
ok;
|
||||||
DevicePid when is_pid(DevicePid) ->
|
DevicePid when is_pid(DevicePid) ->
|
||||||
|
%% 删除设备
|
||||||
|
iot_host_device_poller:delete_device(PollerPid, DeviceUUID),
|
||||||
|
|
||||||
iot_device_sup:delete_device(DeviceUUID)
|
iot_device_sup:delete_device(DeviceUUID)
|
||||||
end,
|
end,
|
||||||
{keep_state, State, [{reply, From, ok}]};
|
{keep_state, State, [{reply, From, ok}]};
|
||||||
|
|
||||||
%% 激活设备
|
%% 激活设备
|
||||||
handle_event({call, From}, {activate_device, DeviceUUID, Auth}, _, State) ->
|
handle_event({call, From}, {activate_device, DeviceUUID, Auth}, _, State = #state{poller_pid = PollerPid}) ->
|
||||||
case iot_device_sup:ensured_device_started(DeviceUUID) of
|
case iot_device_sup:ensured_device_started(DeviceUUID) of
|
||||||
{ok, DevicePid} ->
|
{ok, DevicePid} ->
|
||||||
iot_device:auth(DevicePid, Auth),
|
iot_device:auth(DevicePid, Auth),
|
||||||
|
case Auth of
|
||||||
|
true ->
|
||||||
|
iot_host_device_poller:add_device(PollerPid, DeviceUUID);
|
||||||
|
false ->
|
||||||
|
iot_host_device_poller:delete_device(PollerPid, DeviceUUID)
|
||||||
|
end,
|
||||||
|
|
||||||
{keep_state, State, [{reply, From, ok}]};
|
{keep_state, State, [{reply, From, ok}]};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
|
|||||||
220
apps/iot/src/iot_host_device_poller.erl
Normal file
220
apps/iot/src/iot_host_device_poller.erl
Normal file
@ -0,0 +1,220 @@
|
|||||||
|
%%%-------------------------------------------------------------------
|
||||||
|
%%% @author anlicheng
|
||||||
|
%%% @copyright (C) 2025, <COMPANY>
|
||||||
|
%%% @doc
|
||||||
|
%%%
|
||||||
|
%%% @end
|
||||||
|
%%% Created : 29. 4月 2025 12:19
|
||||||
|
%%%-------------------------------------------------------------------
|
||||||
|
-module(iot_host_device_poller).
|
||||||
|
-author("anlicheng").
|
||||||
|
|
||||||
|
-behaviour(gen_server).
|
||||||
|
|
||||||
|
%% API
|
||||||
|
-export([start_link/2]).
|
||||||
|
-export([boot/1, pause/1, add_device/2, delete_device/2]).
|
||||||
|
|
||||||
|
%% gen_server callbacks
|
||||||
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||||
|
|
||||||
|
-define(SERVER, ?MODULE).
|
||||||
|
|
||||||
|
-define(STATUS_BOOTING, booting).
|
||||||
|
-define(STATUS_PAUSED, paused).
|
||||||
|
|
||||||
|
-record(state, {
|
||||||
|
host_id :: integer(),
|
||||||
|
host_pid :: pid(),
|
||||||
|
devices :: [binary()],
|
||||||
|
status = ?STATUS_PAUSED,
|
||||||
|
%% 当前的任务集合
|
||||||
|
tasks = []
|
||||||
|
}).
|
||||||
|
|
||||||
|
%%%===================================================================
|
||||||
|
%%% API
|
||||||
|
%%%===================================================================
|
||||||
|
|
||||||
|
boot(Pid) when is_pid(Pid) ->
|
||||||
|
gen_server:cast(Pid, boot).
|
||||||
|
|
||||||
|
pause(Pid) when is_pid(Pid) ->
|
||||||
|
gen_server:cast(Pid, pause).
|
||||||
|
|
||||||
|
add_device(Pid, DeviceUUID) when is_pid(Pid), is_binary(DeviceUUID) ->
|
||||||
|
gen_server:cast(Pid, {add_device, DeviceUUID}).
|
||||||
|
|
||||||
|
delete_device(Pid, DeviceUUID) when is_pid(Pid), is_binary(DeviceUUID) ->
|
||||||
|
gen_server:cast(Pid, {delete_device, DeviceUUID}).
|
||||||
|
|
||||||
|
%% @doc Spawns the server and registers the local name (unique)
|
||||||
|
-spec(start_link(HostId :: integer(), HostPid :: pid()) ->
|
||||||
|
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
|
||||||
|
start_link(HostId, HostPid) when is_integer(HostId), is_pid(HostPid) ->
|
||||||
|
gen_server:start_link(?MODULE, [HostId, HostPid], []).
|
||||||
|
|
||||||
|
%%%===================================================================
|
||||||
|
%%% gen_server callbacks
|
||||||
|
%%%===================================================================
|
||||||
|
|
||||||
|
%% @private
|
||||||
|
%% @doc Initializes the server
|
||||||
|
-spec(init(Args :: term()) ->
|
||||||
|
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
|
||||||
|
{stop, Reason :: term()} | ignore).
|
||||||
|
init([HostId, HostPid]) ->
|
||||||
|
{ok, Devices} = device_bo:get_host_devices(HostId),
|
||||||
|
{ok, #state{host_id = HostId, host_pid = HostPid, devices = Devices}}.
|
||||||
|
|
||||||
|
%% @private
|
||||||
|
%% @doc Handling call messages
|
||||||
|
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
|
||||||
|
State :: #state{}) ->
|
||||||
|
{reply, Reply :: term(), NewState :: #state{}} |
|
||||||
|
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
|
||||||
|
{noreply, NewState :: #state{}} |
|
||||||
|
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||||
|
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
|
||||||
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
|
handle_call(_Request, _From, State = #state{}) ->
|
||||||
|
{reply, ok, State}.
|
||||||
|
|
||||||
|
%% @private
|
||||||
|
%% @doc Handling cast messages
|
||||||
|
-spec(handle_cast(Request :: term(), State :: #state{}) ->
|
||||||
|
{noreply, NewState :: #state{}} |
|
||||||
|
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||||
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
|
%% 启动
|
||||||
|
handle_cast(boot, State = #state{status = ?STATUS_BOOTING}) ->
|
||||||
|
{noreply, State};
|
||||||
|
handle_cast(boot, State = #state{status = ?STATUS_PAUSED, devices = Devices}) ->
|
||||||
|
case length(Devices) > 0 of
|
||||||
|
true ->
|
||||||
|
trigger_task(0);
|
||||||
|
false ->
|
||||||
|
trigger_task(5000)
|
||||||
|
end,
|
||||||
|
{noreply, State#state{status = ?STATUS_BOOTING, tasks = Devices}};
|
||||||
|
|
||||||
|
%% 暂停
|
||||||
|
handle_cast(pause, State = #state{status = ?STATUS_PAUSED}) ->
|
||||||
|
{noreply, State};
|
||||||
|
handle_cast(pause, State = #state{status = ?STATUS_BOOTING}) ->
|
||||||
|
{noreply, State#state{status = ?STATUS_PAUSED, tasks = []}};
|
||||||
|
|
||||||
|
%% 添加设备
|
||||||
|
handle_cast({add_device, DeviceUUID}, State = #state{devices = Devices}) ->
|
||||||
|
NDevices = case lists:member(DeviceUUID, Devices) of
|
||||||
|
true ->
|
||||||
|
Devices;
|
||||||
|
false ->
|
||||||
|
[DeviceUUID|Devices]
|
||||||
|
end,
|
||||||
|
{noreply, State#state{devices = NDevices}};
|
||||||
|
|
||||||
|
%% 删除设备
|
||||||
|
handle_cast({delete_device, DeviceUUID}, State = #state{devices = Devices}) ->
|
||||||
|
{noreply, State#state{devices = lists:delete(DeviceUUID, Devices)}}.
|
||||||
|
|
||||||
|
%% @private
|
||||||
|
%% @doc Handling all non call/cast messages
|
||||||
|
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
|
||||||
|
{noreply, NewState :: #state{}} |
|
||||||
|
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||||
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
|
handle_info({timeout, _, task_ticker}, State = #state{host_id = HostId, host_pid = HostPid, status = Status, tasks = [DeviceUUID|Tail]}) ->
|
||||||
|
case Status of
|
||||||
|
?STATUS_BOOTING ->
|
||||||
|
lager:debug("[iot_host_device_poller] host_id: ~p, start new task for device_uuid: ~p", [HostId, DeviceUUID]),
|
||||||
|
catch task(HostPid, DeviceUUID, 5),
|
||||||
|
trigger_task(0),
|
||||||
|
{noreply, State#state{tasks = Tail}};
|
||||||
|
?STATUS_PAUSED ->
|
||||||
|
lager:debug("[iot_host_device_poller] host_id: ~p, host status is paused", [HostId]),
|
||||||
|
{noreply, State#state{tasks = []}}
|
||||||
|
end;
|
||||||
|
|
||||||
|
handle_info(next_task, State = #state{devices = Devices, status = Status, tasks = []}) ->
|
||||||
|
case Status of
|
||||||
|
?STATUS_BOOTING ->
|
||||||
|
case length(Devices) > 0 of
|
||||||
|
true ->
|
||||||
|
trigger_task(0);
|
||||||
|
false ->
|
||||||
|
trigger_task(5000)
|
||||||
|
end,
|
||||||
|
{noreply, State#state{tasks = Devices}};
|
||||||
|
?STATUS_PAUSED ->
|
||||||
|
{noreply, State#state{tasks = []}}
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% @private
|
||||||
|
%% @doc This function is called by a gen_server when it is about to
|
||||||
|
%% terminate. It should be the opposite of Module:init/1 and do any
|
||||||
|
%% necessary cleaning up. When it returns, the gen_server terminates
|
||||||
|
%% with Reason. The return value is ignored.
|
||||||
|
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
|
||||||
|
State :: #state{}) -> term()).
|
||||||
|
terminate(_Reason, _State = #state{}) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
%% @private
|
||||||
|
%% @doc Convert process state when code is changed
|
||||||
|
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
|
||||||
|
Extra :: term()) ->
|
||||||
|
{ok, NewState :: #state{}} | {error, Reason :: term()}).
|
||||||
|
code_change(_OldVsn, State = #state{}, _Extra) ->
|
||||||
|
{ok, State}.
|
||||||
|
|
||||||
|
%%%===================================================================
|
||||||
|
%%% Internal functions
|
||||||
|
%%%===================================================================
|
||||||
|
|
||||||
|
-spec trigger_task(Time :: integer()) -> no_return().
|
||||||
|
trigger_task(Time) when is_integer(Time) ->
|
||||||
|
erlang:start_timer(Time, self(), task_ticker).
|
||||||
|
|
||||||
|
-spec task(HostPid :: pid(), DeviceUUID :: binary(), Timeout :: integer()) -> no_return().
|
||||||
|
task(HostPid, DeviceUUID, Timeout) when is_pid(HostPid), is_binary(DeviceUUID), is_integer(Timeout) ->
|
||||||
|
Command = #{
|
||||||
|
<<"device_uuid">> => DeviceUUID,
|
||||||
|
<<"timeout">> => Timeout,
|
||||||
|
<<"command">> => <<"query_status">>
|
||||||
|
},
|
||||||
|
BinCommand = iolist_to_binary(jiffy:encode(Command, [force_utf8])),
|
||||||
|
|
||||||
|
case iot_device:get_pid(DeviceUUID) of
|
||||||
|
undefined ->
|
||||||
|
ok;
|
||||||
|
DevicePid when is_pid(DevicePid) ->
|
||||||
|
case is_process_alive(DevicePid) andalso iot_device:is_activated(DevicePid) of
|
||||||
|
true ->
|
||||||
|
case iot_host:publish_message(HostPid, 17, {aes, BinCommand}, Timeout * 1000) of
|
||||||
|
{ok, Reply} when is_binary(Reply) ->
|
||||||
|
case catch jiffy:decode(Reply, [return_maps]) of
|
||||||
|
%% 返回的是一个数组
|
||||||
|
#{<<"result">> := ResponseList} when is_list(ResponseList) andalso length(ResponseList) > 0 ->
|
||||||
|
EdgeStatusList = lists:map(fun(#{<<"edge_status">> := EdgeStatus}) -> EdgeStatus end, ResponseList),
|
||||||
|
case lists:any(fun(S) -> S =:= 1 end, EdgeStatusList) of
|
||||||
|
true ->
|
||||||
|
DevicePid ! {poll_task_reply, {ok, 1}};
|
||||||
|
false ->
|
||||||
|
case lists:all(fun(S) -> S =:= 0 end, EdgeStatusList) of
|
||||||
|
true ->
|
||||||
|
DevicePid, {poll_task_reply, {ok, 0}};
|
||||||
|
false ->
|
||||||
|
DevicePid, {poll_task_reply, {ok, lists:min(EdgeStatusList)}}
|
||||||
|
end
|
||||||
|
end;
|
||||||
|
_ ->
|
||||||
|
DevicePid, {poll_task_reply, {error, <<"invalid reply json">>}}
|
||||||
|
end;
|
||||||
|
{error, Reason} ->
|
||||||
|
DevicePid, {poll_task_reply, {error, Reason}}
|
||||||
|
end;
|
||||||
|
false ->
|
||||||
|
ok
|
||||||
|
end
|
||||||
|
end.
|
||||||
Loading…
x
Reference in New Issue
Block a user