From cfedc75e1e984f187ba1b7ad4ad9e6e738d6af48 Mon Sep 17 00:00:00 2001 From: anlicheng Date: Thu, 31 Aug 2023 16:12:00 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=9F=BA=E4=BA=8Eudp?= =?UTF-8?q?=E7=9A=84=E5=BF=83=E8=B7=B3=E6=A3=80=E6=9F=A5=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/iot/include/iot.hrl | 1 + apps/iot/src/http_handler/host_handler.erl | 10 ++ apps/iot/src/iot_app.erl | 11 +- apps/iot/src/iot_device.erl | 14 +-- apps/iot/src/iot_host.erl | 137 +++++++++++++-------- config/sys-dev.config | 4 + config/sys-prod.config | 4 + 7 files changed, 116 insertions(+), 65 deletions(-) diff --git a/apps/iot/include/iot.hrl b/apps/iot/include/iot.hrl index 9f353da..8936196 100644 --- a/apps/iot/include/iot.hrl +++ b/apps/iot/include/iot.hrl @@ -15,6 +15,7 @@ %% 设备是否在线状态 -define(DEVICE_OFFLINE, 0). -define(DEVICE_ONLINE, 1). +-define(DEVICE_UNKNOWN, 2). %% 下发的任务状态 -define(TASK_STATUS_INIT, -1). %% 未接入 diff --git a/apps/iot/src/http_handler/host_handler.erl b/apps/iot/src/http_handler/host_handler.erl index ff5feae..c005f59 100644 --- a/apps/iot/src/http_handler/host_handler.erl +++ b/apps/iot/src/http_handler/host_handler.erl @@ -32,6 +32,16 @@ handle_request("GET", "/host/metric", #{<<"uuid">> := UUID}, _) -> end end; +%% 处理主机的授权的 取消激活 +handle_request("GET", "/host/status", _, #{<<"uuid">> := UUID}) when is_binary(UUID) -> + case iot_host:get_pid(UUID) of + undefined -> + {ok, 200, iot_util:json_error(404, <<"host not found">>)}; + Pid when is_pid(Pid) -> + {ok, StatusInfo} = iot_host:get_status(Pid), + {ok, 200, iot_util:json_data(StatusInfo)} + end; + %% 重新加载对应的主机信息 handle_request("POST", "/host/reload", _, #{<<"uuid">> := UUID}) when is_binary(UUID) -> lager:debug("[host_handler] will reload host uuid: ~p", [UUID]), diff --git a/apps/iot/src/iot_app.erl b/apps/iot/src/iot_app.erl index cda4d95..22af7b9 100644 --- a/apps/iot/src/iot_app.erl +++ b/apps/iot/src/iot_app.erl @@ -19,6 +19,8 @@ start(_StartType, _StartArgs) -> %% 加速内存的回收 erlang:system_flag(fullsweep_after, 16), + %% 启动udp服务 + start_udp_server(), %% 启动http服务 start_http_server(), %% 启动redis服务器 @@ -84,7 +86,14 @@ start_redis_server() -> ], {ok, _} = esockd:open(redis_server, Port, TransOpts, {redis_protocol, start_link, []}), - lager:debug("[iot_app] the rpc server start at: ~p", [Port]). + lager:debug("[iot_app] the redis server start at: ~p", [Port]). + +start_udp_server() -> + {ok, Props} = application:get_env(iot, udp_server), + Port = proplists:get_value(port, Props), + UdpOpts = [{udp_options, [binary, {reuseaddr, true}]}], + {ok, _} = esockd:open_udp(iot_udp_server, Port, UdpOpts, {iot_udp_server, start_link, []}), + lager:debug("[iot_app] the udp server start at: ~p", [Port]). %% 启动内存数据库 start_mnesia() -> diff --git a/apps/iot/src/iot_device.erl b/apps/iot/src/iot_device.erl index b174c2f..569ba0d 100644 --- a/apps/iot/src/iot_device.erl +++ b/apps/iot/src/iot_device.erl @@ -16,7 +16,7 @@ -export([start_link/2, is_activated/1, change_status/2, reload/1, auth/2]). %% gen_statem callbacks --export([init/1, format_status/2, handle_event/4, terminate/3, code_change/4, callback_mode/0]). +-export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]). %% 终端是否授权 -define(DEVICE_AUTH_DENIED, 0). @@ -43,7 +43,9 @@ get_pid(DeviceUUID) when is_binary(DeviceUUID) -> get_name(DeviceUUID) when is_binary(DeviceUUID) -> binary_to_atom(<<"iot_device:", DeviceUUID/binary>>). --spec is_activated(Pid :: pid()) -> boolean(). +-spec is_activated(Pid :: pid() | undefined) -> boolean(). +is_activated(undefined) -> + false; is_activated(Pid) when is_pid(Pid) -> gen_statem:call(Pid, is_activated). @@ -103,14 +105,6 @@ init0(#{<<"device_uuid">> := DeviceUUID, <<"status">> := Status, <<"authorize_st callback_mode() -> handle_event_function. -%% @private -%% @doc Called (1) whenever sys:get_status/1,2 is called by gen_statem or -%% (2) when gen_statem terminates abnormally. -%% This callback is optional. -format_status(_Opt, [_PDict, _StateName, _State]) -> - Status = some_term, - Status. - %% @private %% @doc If callback_mode is handle_event_function, then whenever a %% gen_statem receives an event from call/2, cast/2, or as a normal diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index b38c7e4..08ae114 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -16,14 +16,18 @@ -define(STATE_DENIED, denied). -define(STATE_SESSION, session). +%% 心跳包检测时间间隔, 2分钟检测一次 +-define(HEARTBEAT_INTERVAL, 120 * 1000). + %% API -export([start_link/2, get_name/1, get_alias_name/1, get_pid/1, handle/2, activate/2]). --export([get_metric/1, publish_message/4, get_aes/1]). +-export([get_metric/1, publish_message/4, get_aes/1, get_status/1]). -export([create_session/2, attach_channel/2]). -export([reload_device/2, delete_device/2, activate_device/3]). +-export([heartbeat/1]). %% gen_statem callbacks --export([init/1, format_status/2, handle_event/4, terminate/3, code_change/4, callback_mode/0]). +-export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]). -record(state, { host_id :: integer(), @@ -32,6 +36,9 @@ %% aes的key, 后续通讯需要基于这个加密 aes = <<>> :: binary(), + %% 心跳计数器 + heartbeat_counter = 0 :: integer(), + %% websocket相关 channel_pid :: undefined | pid(), monitor_ref :: undefined | reference(), @@ -67,6 +74,10 @@ handle(Pid, Packet) when is_pid(Pid) -> get_aes(Pid) when is_pid(Pid) -> gen_statem:call(Pid, get_aes). +-spec get_status(Pid :: pid()) -> {ok, Status :: map()}. +get_status(Pid) when is_pid(Pid) -> + gen_statem:call(Pid, get_status). + %% 激活主机, true 表示激活; false表示关闭激活 -spec activate(Pid :: pid(), Auth :: boolean()) -> ok. activate(Pid, Auth) when is_pid(Pid), is_boolean(Auth) -> @@ -116,6 +127,12 @@ delete_device(Pid, DeviceUUID) when is_pid(Pid), is_binary(DeviceUUID) -> activate_device(Pid, DeviceUUID, Auth) when is_pid(Pid), is_binary(DeviceUUID), is_boolean(Auth) -> gen_statem:call(Pid, {activate_device, DeviceUUID, Auth}). +-spec heartbeat(Pid :: pid()) -> no_return(). +heartbeat(undefined) -> + ok; +heartbeat(Pid) when is_pid(Pid) -> + gen_statem:cast(Pid, heartbeat). + %% @doc Creates a gen_statem process which calls Module:init/1 to %% initialize. To ensure a synchronized start-up procedure, this %% function does not return until Module:init/1 has returned. @@ -131,17 +148,21 @@ start_link(Name, UUID) when is_atom(Name), is_binary(UUID) -> %% gen_statem:start_link/[3,4], this function is called by the new %% process to initialize. init([UUID]) -> - %% 重启时,认为主机是离线状态; 等待主机主动建立连接 {ok, _} = host_bo:change_status(UUID, ?HOST_OFFLINE), - case host_bo:get_host_by_uuid(UUID) of {ok, #{<<"id">> := HostId}} -> %% 通过host_id注册别名, 可以避免通过查询数据库获取HostPid AliasName = get_alias_name(HostId), global:register_name(AliasName, self()), - Aes = list_to_binary(iot_util:rand_bytes(32)), + %% 启动主机相关的devices + {ok, Devices} = device_bo:get_host_devices(HostId), + lists:foreach(fun iot_device_sup:ensured_device_started/1, Devices), + + %% 心跳检测机制 + erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat_ticker), + {ok, ?STATE_DENIED, #state{host_id = HostId, uuid = UUID, aes = Aes}}; undefined -> lager:warning("[iot_host] host uuid: ~p, loaded from mysql failed", [UUID]), @@ -154,14 +175,6 @@ init([UUID]) -> callback_mode() -> handle_event_function. -%% @private -%% @doc Called (1) whenever sys:get_status/1,2 is called by gen_statem or -%% (2) when gen_statem terminates abnormally. -%% This callback is optional. -format_status(_Opt, [_PDict, _StateName, _State]) -> - Status = some_term, - Status. - %% @private %% @doc If callback_mode is handle_event_function, then whenever a %% gen_statem receives an event from call/2, cast/2, or as a normal @@ -172,6 +185,31 @@ handle_event({call, From}, get_metric, _, State = #state{metrics = Metrics}) -> handle_event({call, From}, get_aes, _, State = #state{aes = Aes}) -> {keep_state, State, [{reply, From, {ok, Aes}}]}; +%% 获取主机的状态 +handle_event({call, From}, get_status, StateName, State = #state{host_id = HostId, channel_pid = ChannelPid, heartbeat_counter = HeartbeatCounter, metrics = Metrics}) -> + %% 启动主机相关的devices + {ok, Devices} = device_bo:get_host_devices(HostId), + DeviceInfos = lists:map(fun(DeviceUUID) -> + DevicePid = iot_device:get_pid(DeviceUUID), + case iot_device:is_activated(DevicePid) of + true -> + {DeviceUUID, <<"online">>}; + false -> + {DeviceUUID, <<"offline">>} + end + end, Devices), + + Status = if StateName == ?STATE_SESSION -> <<"session">>; true -> <<"denied">> end, + HasChannel = (ChannelPid /= undefined), + Reply = #{ + <<"status">> => Status, + <<"has_channel">> => HasChannel, + <<"heartbeat_counter">> => HeartbeatCounter, + <<"metrics">> => Metrics, + <<"device_infos">> => DeviceInfos + }, + {keep_state, State, [{reply, From, {ok, Reply}}]}; + %% 发送普通格式的消息, 激活的时候,会话时创建不成功的; 发送aes类型的命令的时候,必须要求session是存在的 handle_event({call, From}, {publish_message, ReceiverPid, CommandType, {aes, Command0}}, ?STATE_SESSION, State = #state{uuid = UUID, aes = AES, channel_pid = ChannelPid}) -> lager:debug("[iot_host] host: ~p, will publish aes message: ~p", [UUID, Command0]), @@ -193,22 +231,20 @@ handle_event({call, From}, {publish_message, _, _, _}, _, State) -> {keep_state, State, [{reply, From, {error, <<"主机状态错误,发送命令失败"/utf8>>}}]}; %% 关闭授权 -handle_event({call, From}, {activate, Auth}, StateName, State = #state{host_id = HostId, uuid = UUID, monitor_ref = MRef, channel_pid = ChannelPid}) -> - case {StateName, Auth} of - {?STATE_DENIED, _} -> - {keep_state, State, [{reply, From, ok}]}; - {?STATE_SESSION, false} -> - %% 取消之前的monitor - erlang:demonitor(MRef), - ws_channel:stop(ChannelPid, closed), +handle_event({call, From}, {activate, _}, ?STATE_DENIED, State) -> + {keep_state, State, [{reply, From, ok}]}; - {ok, _} = host_bo:change_status(UUID, ?HOST_OFFLINE), - change_devices_status(HostId, ?DEVICE_OFFLINE), +handle_event({call, From}, {activate, true}, ?STATE_SESSION, State) -> + {keep_state, State, [{reply, From, ok}]}; +handle_event({call, From}, {activate, false}, ?STATE_SESSION, State = #state{host_id = HostId, uuid = UUID, monitor_ref = MRef, channel_pid = ChannelPid}) -> + %% 取消之前的monitor + erlang:demonitor(MRef), + ws_channel:stop(ChannelPid, closed), - {next_state, ?STATE_DENIED, State#state{monitor_ref = undefined, channel_pid = undefined}, [{reply, From, ok}]}; - {?STATE_SESSION, true} -> - {keep_state, State, [{reply, From, ok}]} - end; + {ok, _} = host_bo:change_status(UUID, ?HOST_OFFLINE), + change_devices_status(HostId, ?DEVICE_UNKNOWN), + + {next_state, ?STATE_DENIED, State#state{monitor_ref = undefined, channel_pid = undefined}, [{reply, From, ok}]}; %% 绑定channel handle_event({call, From}, {attach_channel, ChannelPid}, _, State = #state{uuid = UUID, channel_pid = undefined}) -> @@ -226,7 +262,7 @@ handle_event({call, From}, {attach_channel, ChannelPid}, _, State = #state{uuid {keep_state, State#state{channel_pid = ChannelPid, monitor_ref = MRef}, [{reply, From, ok}]}; %% 授权通过后,才能将主机的状态设置为在线状态 -handle_event({call, From}, {create_session, PubKey}, _, State = #state{uuid = UUID, aes = Aes, host_id = HostId}) -> +handle_event({call, From}, {create_session, PubKey}, _, State = #state{uuid = UUID, aes = Aes}) -> {ok, #{<<"authorize_status">> := AuthorizeStatus}} = host_bo:get_host_by_uuid(UUID), case AuthorizeStatus =:= 1 of true -> @@ -235,9 +271,6 @@ handle_event({call, From}, {create_session, PubKey}, _, State = #state{uuid = UU {ok, AffectedRow} = host_bo:change_status(UUID, ?HOST_ONLINE), lager:debug("[iot_host] host_id(~p) uuid: ~p, create_session, will change status, affected_row: ~p", [?STATE_SESSION, UUID, AffectedRow]), - %% 启动主机相关的device - start_devices(HostId), - {next_state, ?STATE_SESSION, State, [{reply, From, {ok, <<10:8, EncReply/binary>>}}]}; false -> lager:debug("[iot_host] host_id(denied) uuid: ~p, create_session, will not change host status", [UUID]), @@ -378,19 +411,27 @@ handle_event(cast, {handle, {event, Event0}}, ?STATE_SESSION, State = #state{uui end, {keep_state, State}; -%% 当websocket断开的时候,则设置主机状态为下线状态; 主机的状态需要转换 -handle_event(info, {'DOWN', Ref, process, ChannelPid, Reason}, StateName, State = #state{host_id = HostId, uuid = UUID, monitor_ref = Ref, channel_pid = ChannelPid}) -> - lager:warning("[iot_host] channel: ~p, down with reason: ~p, state name: ~p, state: ~p", [ChannelPid, Reason, StateName, State]), - {ok, _} = host_bo:change_status(UUID, ?HOST_OFFLINE), - change_devices_status(HostId, ?DEVICE_OFFLINE), +%% 心跳机制 +handle_event(cast, heartbeat, _, State = #state{uuid = UUID, heartbeat_counter = HeartbeatCounter}) -> + lager:debug("[iot_host] uuip: ~p, get heartbeat, counter is: ~p", [UUID, HeartbeatCounter]), + {keep_state, State#state{heartbeat_counter = HeartbeatCounter + 1}}; - %% 会话状态如果链接丢失,需要切换到activated状态,其他情况保持不变 - case StateName =:= session of +handle_event(info, {timeout, _, heartbeat_ticker}, StateName, State = #state{uuid = UUID, host_id = HostId, heartbeat_counter = HeartbeatCounter}) -> + case StateName =:= ?STATE_DENIED andalso HeartbeatCounter =:= 0 of true -> - {next_state, activated, State#state{channel_pid = undefined}}; + {ok, _} = host_bo:change_status(UUID, ?HOST_OFFLINE), + change_devices_status(HostId, ?DEVICE_UNKNOWN); false -> - {keep_state, State#state{channel_pid = undefined}} - end; + ok + end, + erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat_ticker), + + {keep_state, State#state{heartbeat_counter = 0}}; + +%% 当websocket断开的时候,则设置主机状态为下线状态; 主机的状态需要转换 +handle_event(info, {'DOWN', Ref, process, ChannelPid, Reason}, StateName, State = #state{uuid = UUID, monitor_ref = Ref, channel_pid = ChannelPid}) -> + lager:warning("[iot_host] uuid: ~p, channel: ~p, down with reason: ~p, state name: ~p, state: ~p", [UUID, ChannelPid, Reason, StateName, State]), + {next_state, ?STATE_DENIED, State#state{channel_pid = undefined}}; handle_event(EventType, EventContent, StateName, State) -> lager:warning("[iot_host] unknown event_type: ~p, event: ~p, state name: ~p, state: ~p", [EventType, EventContent, StateName, State]), @@ -405,7 +446,7 @@ handle_event(EventType, EventContent, StateName, State) -> terminate(Reason, StateName, _State = #state{host_id = HostId, uuid = UUID}) -> lager:debug("[iot_host] host: ~p, terminate with reason: ~p, state_name: ~p", [UUID, Reason, StateName]), host_bo:change_status(UUID, ?HOST_OFFLINE), - change_devices_status(HostId, ?DEVICE_OFFLINE), + change_devices_status(HostId, ?DEVICE_UNKNOWN), ok. %% @private @@ -417,18 +458,6 @@ code_change(_OldVsn, StateName, State = #state{}, _Extra) -> %%% Internal functions %%%=================================================================== -start_devices(HostId) when is_integer(HostId) -> - %% 启动主机相关的device,此时device的状态为离线状态 - {ok, Devices} = device_bo:get_host_devices(HostId), - lists:foreach(fun(DeviceUUID) -> - case iot_device_sup:ensured_device_started(DeviceUUID) of - {ok, DevicePid} -> - iot_device:change_status(DevicePid, ?DEVICE_OFFLINE); - {error, Reason} -> - lager:notice("[iot_host] start_device: ~p, get error: ~p", [DeviceUUID, Reason]) - end - end, Devices). - %% 修改设备的状态 change_devices_status(HostId, NewStatus) when is_integer(HostId), is_integer(NewStatus) -> {ok, Devices} = device_bo:get_host_devices(HostId), diff --git a/config/sys-dev.config b/config/sys-dev.config index 1d26a19..f8ef5c9 100644 --- a/config/sys-dev.config +++ b/config/sys-dev.config @@ -14,6 +14,10 @@ {backlog, 10240} ]}, + {udp_server, [ + {port, 18080} + ]}, + %% 目标服务器地址 {emqx_server, [ {host, {39, 98, 184, 67}}, diff --git a/config/sys-prod.config b/config/sys-prod.config index 67cdc7d..71f660c 100644 --- a/config/sys-prod.config +++ b/config/sys-prod.config @@ -14,6 +14,10 @@ {backlog, 10240} ]}, + {udp_server, [ + {port, 18080} + ]}, + %% 目标服务器地址 %{emqx_server, [ % {host, {39, 98, 184, 67}},