From de77d77176324ecf8d2fca727426e1a903ce8abe Mon Sep 17 00:00:00 2001 From: anlicheng Date: Mon, 14 Aug 2023 11:24:18 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84=E4=B8=BB=E6=9C=BA=E6=B5=81?= =?UTF-8?q?=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/iot/include/iot.hrl | 8 +-- apps/iot/src/database/host_bo.erl | 35 +----------- apps/iot/src/iot_host.erl | 79 ++++++++++++++------------- apps/iot/src/websocket/ws_channel.erl | 29 +++++----- docs/websocket.md | 9 ++- 5 files changed, 69 insertions(+), 91 deletions(-) diff --git a/apps/iot/include/iot.hrl b/apps/iot/include/iot.hrl index fda3d26..6a90232 100644 --- a/apps/iot/include/iot.hrl +++ b/apps/iot/include/iot.hrl @@ -8,11 +8,6 @@ %%%------------------------------------------------------------------- -author("licheng5"). -%% 主机状态定义 --define(HOST_STATUS_INACTIVE, -1). %% 未接入 --define(HOST_STATUS_OFFLINE, 0). %% 离线 --define(HOST_STATUS_ONLINE, 1). %% 在线 - %% 下发的任务状态 -define(TASK_STATUS_INIT, -1). %% 未接入 -define(TASK_STATUS_FAILED, 0). %% 离线 @@ -20,7 +15,7 @@ %% 主机端上报数据类型标识 %% 建立到websocket的register关系 --define(METHOD_REGISTER, 16#00). +-define(METHOD_AUTH, 16#00). -define(METHOD_CREATE_SESSION, 16#01). -define(METHOD_DATA, 16#02). @@ -28,6 +23,7 @@ -define(METHOD_INFORM, 16#04). -define(METHOD_FEEDBACK_STEP, 16#05). -define(METHOD_FEEDBACK_RESULT, 16#06). +-define(METHOD_EVENT, 16#07). %% 消息体类型 -define(PACKET_REQUEST, 16#01). diff --git a/apps/iot/src/database/host_bo.erl b/apps/iot/src/database/host_bo.erl index 81f1e98..8204534 100644 --- a/apps/iot/src/database/host_bo.erl +++ b/apps/iot/src/database/host_bo.erl @@ -11,8 +11,7 @@ -include("iot.hrl"). %% API --export([get_all_hosts/0, change_status/2, is_authorized/1, get_host/1, get_host_by_uuid/1, create_host/1]). --export([ensured_host/1]). +-export([get_all_hosts/0, change_status/2, get_host_by_uuid/1]). -spec get_all_hosts() -> UUIDList :: [binary()]. get_all_hosts() -> @@ -23,40 +22,10 @@ get_all_hosts() -> [] end. -get_host(HostId) when is_integer(HostId) -> - mysql_pool:get_row(mysql_iot, <<"SELECT * FROM host WHERE id = ? LIMIT 1">>, [HostId]). - get_host_by_uuid(UUID) when is_binary(UUID) -> mysql_pool:get_row(mysql_iot, <<"SELECT * FROM host WHERE uuid = ? LIMIT 1">>, [UUID]). -create_host(UUID) when is_binary(UUID) -> - mysql_pool:insert(mysql_iot, <<"host">>, #{<<"UUID">> => UUID, <<"status">> => ?HOST_STATUS_INACTIVE}, true). - %% 修改主机的状态 -spec change_status(UUID :: binary(), Status :: integer()) -> {ok, AffectedRows :: integer()} | {error, Reason :: any()}. change_status(UUID, Status) when is_binary(UUID), is_integer(Status) -> - mysql_pool:update_by(mysql_iot, <<"UPDATE host SET status = ? WHERE uuid = ? LIMIT 1">>, [Status, UUID]). - -%% 判断主机是否已经授权 -is_authorized(HostId) when is_integer(HostId) -> - case mysql_pool:get_row(mysql_iot, <<"SELECT * FROM host WHERE id = ? LIMIT 1">>, [HostId]) of - {ok, #{<<"status">> := Status}} when Status =/= ?HOST_STATUS_INACTIVE -> - true; - _ -> - false - end. - --spec ensured_host(UUID :: binary()) -> ok | {error, Reason :: any()}. -ensured_host(UUID) when is_binary(UUID) -> - %% 查找数据库,如果没有则插入 - case get_host_by_uuid(UUID) of - {ok, _} -> - ok; - undefined -> - case create_host(UUID) of - {ok, _} -> - ok; - {error, Reason} -> - {error, Reason} - end - end. \ No newline at end of file + mysql_pool:update_by(mysql_iot, <<"UPDATE host SET status = ? WHERE uuid = ? LIMIT 1">>, [Status, UUID]). \ No newline at end of file diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index 0c96620..b63e2c2 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -12,6 +12,14 @@ -behaviour(gen_statem). +%% 主机是否在线 +-define(HOST_OFFLINE, 0). +-define(HOST_ONLINE, 1). + +%% 主机是否授权 +-define(HOST_DENIED, 0). +-define(HOST_AUTHED, 1). + %% API -export([start_link/2, get_name/1, get_pid/1, handle/2, reload/1, activate/2]). -export([get_metric/1, publish_message/4, get_aes/1]). @@ -24,7 +32,6 @@ host_id :: integer(), %% 从数据库里面读取到的数据 uuid :: binary(), - %% aes的key, 后续通讯需要基于这个加密 aes = <<>> :: binary(), @@ -112,18 +119,17 @@ start_link(Name, UUID) when is_atom(Name), is_binary(UUID) -> %% gen_statem:start_link/[3,4], this function is called by the new %% process to initialize. init([UUID]) -> - case host_bo:get_host_by_uuid(UUID) of - {ok, #{<<"status">> := Status, <<"id">> := HostId}} -> - Aes = list_to_binary(iot_util:rand_bytes(32)), + %% 重启时,认为主机是离线状态; 等待主机主动建立连接 + {ok, _} = host_bo:change_status(UUID, ?HOST_OFFLINE), - StateName = case Status =:= ?HOST_STATUS_INACTIVE of - true -> - denied; - false -> - %% 重启时,认为主机是离线状态; 等待主机主动建立连接 - {ok, _} = host_bo:change_status(UUID, ?HOST_STATUS_OFFLINE), - activated + case host_bo:get_host_by_uuid(UUID) of + {ok, #{<<"authorize_status">> := AuthorizeStatus, <<"id">> := HostId}} -> + Aes = list_to_binary(iot_util:rand_bytes(32)), + StateName = case AuthorizeStatus =:= ?HOST_AUTHED of + false -> denied; + true -> activated end, + {ok, StateName, #state{host_id = HostId, uuid = UUID, aes = Aes}}; undefined -> lager:warning("[iot_host] host uuid: ~p, loaded from mysql failed", [UUID]), @@ -148,10 +154,8 @@ format_status(_Opt, [_PDict, _StateName, _State]) -> %% @doc If callback_mode is handle_event_function, then whenever a %% gen_statem receives an event from call/2, cast/2, or as a normal %% process message, this function is called. -handle_event({call, From}, get_metric, session, State = #state{metrics = Metrics}) -> +handle_event({call, From}, get_metric, _, State = #state{metrics = Metrics}) -> {keep_state, State, [{reply, From, {ok, Metrics}}]}; -handle_event({call, From}, get_metric, _, State) -> - {keep_state, State, [{reply, From, {ok, #{}}}]}; handle_event({call, From}, get_aes, _, State = #state{aes = Aes}) -> {keep_state, State, [{reply, From, {ok, Aes}}]}; @@ -165,34 +169,31 @@ handle_event({call, From}, {publish_message, ReceiverPid, CommandType, {aes, Com {keep_state, State, [{reply, From, {ok, Ref}}]}; %% 只要channel存在,就负责将消息推送到边缘端主机 -handle_event({call, From}, {publish_message, ReceiverPid, CommandType, Command}, _, State = #state{channel_pid = ChannelPid}) when is_binary(Command), is_pid(ChannelPid) -> +handle_event({call, From}, {publish_message, ReceiverPid, CommandType, Command}, _, State = #state{uuid = UUID, channel_pid = ChannelPid}) when is_binary(Command), is_pid(ChannelPid) -> %% 通过websocket发送请求 - lager:debug("[iot_host] will publish_message: ~p", [Command]), + lager:debug("[iot_host] host: ~p, will publish_message: ~p", [UUID, Command]), Ref = ws_channel:publish(ChannelPid, ReceiverPid, <>), {keep_state, State, [{reply, From, {ok, Ref}}]}; handle_event({call, From}, {publish_message, _, _, _}, _, State) -> - {keep_state, State, [{reply, From, {error, <<"命令类型错误,发送命令失败"/utf8>>}}]}; + {keep_state, State, [{reply, From, {error, <<"主机状态错误,发送命令失败"/utf8>>}}]}; +%% 重新加载主机信息 handle_event({call, From}, reload, StateName, State = #state{uuid = UUID}) -> %% 重新加载主机信息 - case host_bo:get_host_by_uuid(UUID) of - {ok, Host = #{<<"status">> := Status}} -> - lager:debug("[iot_host] reload host uuid: ~p, successed", [Host]), - case StateName == denied andalso Status =/= ?HOST_STATUS_INACTIVE of - true -> - {next_state, activated, State, [{reply, From, ok}]}; - false -> - {keep_state, State, [{reply, From, ok}]} - end; - undefined -> - lager:debug("[iot_host] reload host uuid: ~p, failed", [UUID]), - {keep_state, State, [{reply, From, {error, <<"host not found">>}}]} + {ok, Host = #{<<"authorize_status">> := AuthorizeStatus}} = host_bo:get_host_by_uuid(UUID), + lager:debug("[iot_host] reload host uuid: ~p, successed", [Host]), + case StateName == denied andalso AuthorizeStatus =:= ?HOST_AUTHED of + true -> + {next_state, activated, State, [{reply, From, ok}]}; + false -> + {keep_state, State, [{reply, From, ok}]} end; %% 关闭授权 -handle_event({call, From}, {activate, false}, _, State) -> +handle_event({call, From}, {activate, false}, _, State = #state{uuid = UUID}) -> + {ok, _} = host_bo:change_status(UUID, ?HOST_OFFLINE), {next_state, denied, State, [{reply, From, ok}]}; %% 开启授权 handle_event({call, From}, {activate, true}, denied, State) -> @@ -225,7 +226,7 @@ handle_event({call, From}, {create_session, PubKey}, denied, State = #state{uuid handle_event({call, From}, {create_session, PubKey}, StateName, State = #state{uuid = UUID, aes = Aes}) -> Reply = #{<<"a">> => true, <<"aes">> => Aes}, EncReply = iot_cipher_rsa:encode(Reply, PubKey), - {ok, AffectedRow} = host_bo:change_status(UUID, ?HOST_STATUS_ONLINE), + {ok, AffectedRow} = host_bo:change_status(UUID, ?HOST_ONLINE), lager:debug("[iot_host] host_id(~p) uuid: ~p, create_session, will change status, affected_row: ~p", [StateName, UUID, AffectedRow]), {next_state, session, State, [{reply, From, {ok, <<10:8, EncReply/binary>>}}]}; @@ -263,7 +264,8 @@ handle_event(cast, {handle, {data, Data}}, session, State = #state{uuid = UUID, end, {keep_state, State}; -handle_event(cast, {handle, {ping, CipherMetric}}, session, State = #state{uuid = UUID, aes = AES}) -> +%% 任意状态下都可以ping +handle_event(cast, {handle, {ping, CipherMetric}}, _, State = #state{uuid = UUID, aes = AES}) -> MetricsInfo = iot_cipher_aes:decrypt(AES, CipherMetric), case catch jiffy:decode(MetricsInfo, [return_maps]) of Metrics when is_map(Metrics) -> @@ -274,12 +276,11 @@ handle_event(cast, {handle, {ping, CipherMetric}}, session, State = #state{uuid {keep_state, State} end; -handle_event(cast, {handle, {inform, Info0}}, session, State = #state{host_id = HostId, aes = AES}) -> - lager:debug("[iot_host] handler inform, aes: ~p", [AES]), +handle_event(cast, {handle, {inform, Info0}}, session, State = #state{uuid = UUID, host_id = HostId, aes = AES}) -> Info = iot_cipher_aes:decrypt(AES, Info0), case catch jiffy:decode(Info, [return_maps]) of #{<<"at">> := At, <<"services">> := ServiceInforms} -> - lager:debug("[iot_host] service infos is: ~p", [ServiceInforms]), + lager:debug("[iot_host] host: ~p, service infos is: ~p", [UUID, ServiceInforms]), lists:foreach(fun(#{<<"props">> := Props, <<"name">> := Name, <<"version">> := Version, <<"version_copy">> := VersionCopy, <<"status">> := Status}) -> %% props 主机id:场景id:微服务id [_, SceneId0, MicroId0] = binary:split(Props, <<":">>, [global]), @@ -334,10 +335,14 @@ handle_event(cast, {handle, {feedback_result, Info0}}, session, State = #state{a end, {keep_state, State}; +%% 其他情况丢弃数据 +handle_event(cast, {handle, {data, _}}, _, State) -> + {keep_state, State}; + %% 当websocket断开的时候,则设置主机状态为下线状态; 主机的状态需要转换 handle_event(info, {'DOWN', Ref, process, ChannelPid, Reason}, StateName, State = #state{uuid = UUID, monitor_ref = Ref, channel_pid = ChannelPid}) -> lager:warning("[iot_host] channel: ~p, down with reason: ~p, state name: ~p, state: ~p", [ChannelPid, Reason, StateName, State]), - {ok, _} = host_bo:change_status(UUID, ?HOST_STATUS_OFFLINE), + {ok, _} = host_bo:change_status(UUID, ?HOST_OFFLINE), %% 会话状态如果链接丢失,需要切换到activated状态,其他情况保持不变 case StateName =:= session of @@ -359,7 +364,7 @@ handle_event(EventType, EventContent, StateName, State) -> %% Reason. The return value is ignored. terminate(Reason, StateName, _State = #state{uuid = UUID}) -> lager:debug("[iot_host] host: ~p, terminate with reason: ~p, state_name: ~p", [UUID, Reason, StateName]), - host_bo:change_status(UUID, ?HOST_STATUS_OFFLINE), + host_bo:change_status(UUID, ?HOST_OFFLINE), ok. %% @private diff --git a/apps/iot/src/websocket/ws_channel.erl b/apps/iot/src/websocket/ws_channel.erl index 776c906..aa121c3 100644 --- a/apps/iot/src/websocket/ws_channel.erl +++ b/apps/iot/src/websocket/ws_channel.erl @@ -52,24 +52,25 @@ websocket_init(_State) -> %% 初始状态为true {ok, #state{packet_id = 1}}. -websocket_handle({binary, <>}, State) -> +websocket_handle({binary, <>}, State) -> #{<<"uuid">> := UUID, <<"timestamp">> := Timestamp, <<"salt">> := Salt, <<"username">> := Username, <<"token">> := Token} = jiffy:decode(Data, [return_maps]), - lager:debug("[ws_channel] register uuid: ~p, request message: ~p", [UUID, Data]), + lager:debug("[ws_channel] auth uuid: ~p, request message: ~p", [UUID, Data]), case iot_auth:check(Username, Token, UUID, Salt, Timestamp) of true -> - %% 查找数据库,如果没有则插入 - ok = host_bo:ensured_host(UUID), - %% 尝试启动主机的服务进程 - {ok, HostPid} = iot_host_sup:ensured_host_started(UUID), - ok = iot_host:attach_channel(HostPid, self()), - %% 建立到host的monitor - erlang:monitor(process, HostPid), + case host_bo:get_host_by_uuid(UUID) of + undefined -> + lager:warning("[ws_channel] uuid: ~p, user: ~p, host not found", [UUID, Username]), + {stop, State}; + {ok, _} -> + %% 尝试启动主机的服务进程 + {ok, HostPid} = iot_host_sup:ensured_host_started(UUID), + ok = iot_host:attach_channel(HostPid, self()), + %% 建立到host的monitor + erlang:monitor(process, HostPid), + Reply = jiffy:encode(#{<<"code">> => 1, <<"message">> => <<"ok">>}, [force_utf8]), - Reply = jiffy:encode(#{<<"code">> => 1, <<"message">> => <<"ok">>}, [force_utf8]), - - lager:debug("[ws_channel] register success, host uuid: ~p", [UUID]), - - {reply, {binary, <>}, State#state{uuid = UUID, host_pid = HostPid}}; + {reply, {binary, <>}, State#state{uuid = UUID, host_pid = HostPid}} + end; false -> lager:warning("[ws_channel] uuid: ~p, user: ~p, auth failed", [UUID, Username]), {stop, State} diff --git a/docs/websocket.md b/docs/websocket.md index 1f74c1b..4d288a6 100644 --- a/docs/websocket.md +++ b/docs/websocket.md @@ -14,7 +14,7 @@ ## 消息类型说明 -### register消息, +### register消息 #### 请求 <<0x01, PacketId:4, Method:1, Body:任意长度>> @@ -67,6 +67,13 @@ Body: 公钥信息 PacketId: 4字节整数, 值为0; Body: 公钥信息 +### 主机上传终端设备的在线或离线事件 +<<0x01, PacketId:4, 0x07, DeviceUUID:32/binary, Status:1>> + +PacketId: 4字节整数, 值为0; +DeviceUUID: 设备32位UUID +Status: 1字节整数,1表示在线,0表示离线 + ### data北向数据上传 (无响应) #### 微服务产生的数据,点位信息为主机的点位信息