完善主机流程
This commit is contained in:
parent
8231ef42b4
commit
de77d77176
@ -8,11 +8,6 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
-author("licheng5").
|
||||
|
||||
%% 主机状态定义
|
||||
-define(HOST_STATUS_INACTIVE, -1). %% 未接入
|
||||
-define(HOST_STATUS_OFFLINE, 0). %% 离线
|
||||
-define(HOST_STATUS_ONLINE, 1). %% 在线
|
||||
|
||||
%% 下发的任务状态
|
||||
-define(TASK_STATUS_INIT, -1). %% 未接入
|
||||
-define(TASK_STATUS_FAILED, 0). %% 离线
|
||||
@ -20,7 +15,7 @@
|
||||
|
||||
%% 主机端上报数据类型标识
|
||||
%% 建立到websocket的register关系
|
||||
-define(METHOD_REGISTER, 16#00).
|
||||
-define(METHOD_AUTH, 16#00).
|
||||
-define(METHOD_CREATE_SESSION, 16#01).
|
||||
|
||||
-define(METHOD_DATA, 16#02).
|
||||
@ -28,6 +23,7 @@
|
||||
-define(METHOD_INFORM, 16#04).
|
||||
-define(METHOD_FEEDBACK_STEP, 16#05).
|
||||
-define(METHOD_FEEDBACK_RESULT, 16#06).
|
||||
-define(METHOD_EVENT, 16#07).
|
||||
|
||||
%% 消息体类型
|
||||
-define(PACKET_REQUEST, 16#01).
|
||||
|
||||
@ -11,8 +11,7 @@
|
||||
-include("iot.hrl").
|
||||
|
||||
%% API
|
||||
-export([get_all_hosts/0, change_status/2, is_authorized/1, get_host/1, get_host_by_uuid/1, create_host/1]).
|
||||
-export([ensured_host/1]).
|
||||
-export([get_all_hosts/0, change_status/2, get_host_by_uuid/1]).
|
||||
|
||||
-spec get_all_hosts() -> UUIDList :: [binary()].
|
||||
get_all_hosts() ->
|
||||
@ -23,40 +22,10 @@ get_all_hosts() ->
|
||||
[]
|
||||
end.
|
||||
|
||||
get_host(HostId) when is_integer(HostId) ->
|
||||
mysql_pool:get_row(mysql_iot, <<"SELECT * FROM host WHERE id = ? LIMIT 1">>, [HostId]).
|
||||
|
||||
get_host_by_uuid(UUID) when is_binary(UUID) ->
|
||||
mysql_pool:get_row(mysql_iot, <<"SELECT * FROM host WHERE uuid = ? LIMIT 1">>, [UUID]).
|
||||
|
||||
create_host(UUID) when is_binary(UUID) ->
|
||||
mysql_pool:insert(mysql_iot, <<"host">>, #{<<"UUID">> => UUID, <<"status">> => ?HOST_STATUS_INACTIVE}, true).
|
||||
|
||||
%% 修改主机的状态
|
||||
-spec change_status(UUID :: binary(), Status :: integer()) -> {ok, AffectedRows :: integer()} | {error, Reason :: any()}.
|
||||
change_status(UUID, Status) when is_binary(UUID), is_integer(Status) ->
|
||||
mysql_pool:update_by(mysql_iot, <<"UPDATE host SET status = ? WHERE uuid = ? LIMIT 1">>, [Status, UUID]).
|
||||
|
||||
%% 判断主机是否已经授权
|
||||
is_authorized(HostId) when is_integer(HostId) ->
|
||||
case mysql_pool:get_row(mysql_iot, <<"SELECT * FROM host WHERE id = ? LIMIT 1">>, [HostId]) of
|
||||
{ok, #{<<"status">> := Status}} when Status =/= ?HOST_STATUS_INACTIVE ->
|
||||
true;
|
||||
_ ->
|
||||
false
|
||||
end.
|
||||
|
||||
-spec ensured_host(UUID :: binary()) -> ok | {error, Reason :: any()}.
|
||||
ensured_host(UUID) when is_binary(UUID) ->
|
||||
%% 查找数据库,如果没有则插入
|
||||
case get_host_by_uuid(UUID) of
|
||||
{ok, _} ->
|
||||
ok;
|
||||
undefined ->
|
||||
case create_host(UUID) of
|
||||
{ok, _} ->
|
||||
ok;
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end
|
||||
end.
|
||||
@ -12,6 +12,14 @@
|
||||
|
||||
-behaviour(gen_statem).
|
||||
|
||||
%% 主机是否在线
|
||||
-define(HOST_OFFLINE, 0).
|
||||
-define(HOST_ONLINE, 1).
|
||||
|
||||
%% 主机是否授权
|
||||
-define(HOST_DENIED, 0).
|
||||
-define(HOST_AUTHED, 1).
|
||||
|
||||
%% API
|
||||
-export([start_link/2, get_name/1, get_pid/1, handle/2, reload/1, activate/2]).
|
||||
-export([get_metric/1, publish_message/4, get_aes/1]).
|
||||
@ -24,7 +32,6 @@
|
||||
host_id :: integer(),
|
||||
%% 从数据库里面读取到的数据
|
||||
uuid :: binary(),
|
||||
|
||||
%% aes的key, 后续通讯需要基于这个加密
|
||||
aes = <<>> :: binary(),
|
||||
|
||||
@ -112,18 +119,17 @@ start_link(Name, UUID) when is_atom(Name), is_binary(UUID) ->
|
||||
%% gen_statem:start_link/[3,4], this function is called by the new
|
||||
%% process to initialize.
|
||||
init([UUID]) ->
|
||||
case host_bo:get_host_by_uuid(UUID) of
|
||||
{ok, #{<<"status">> := Status, <<"id">> := HostId}} ->
|
||||
Aes = list_to_binary(iot_util:rand_bytes(32)),
|
||||
|
||||
StateName = case Status =:= ?HOST_STATUS_INACTIVE of
|
||||
true ->
|
||||
denied;
|
||||
false ->
|
||||
%% 重启时,认为主机是离线状态; 等待主机主动建立连接
|
||||
{ok, _} = host_bo:change_status(UUID, ?HOST_STATUS_OFFLINE),
|
||||
activated
|
||||
{ok, _} = host_bo:change_status(UUID, ?HOST_OFFLINE),
|
||||
|
||||
case host_bo:get_host_by_uuid(UUID) of
|
||||
{ok, #{<<"authorize_status">> := AuthorizeStatus, <<"id">> := HostId}} ->
|
||||
Aes = list_to_binary(iot_util:rand_bytes(32)),
|
||||
StateName = case AuthorizeStatus =:= ?HOST_AUTHED of
|
||||
false -> denied;
|
||||
true -> activated
|
||||
end,
|
||||
|
||||
{ok, StateName, #state{host_id = HostId, uuid = UUID, aes = Aes}};
|
||||
undefined ->
|
||||
lager:warning("[iot_host] host uuid: ~p, loaded from mysql failed", [UUID]),
|
||||
@ -148,10 +154,8 @@ format_status(_Opt, [_PDict, _StateName, _State]) ->
|
||||
%% @doc If callback_mode is handle_event_function, then whenever a
|
||||
%% gen_statem receives an event from call/2, cast/2, or as a normal
|
||||
%% process message, this function is called.
|
||||
handle_event({call, From}, get_metric, session, State = #state{metrics = Metrics}) ->
|
||||
handle_event({call, From}, get_metric, _, State = #state{metrics = Metrics}) ->
|
||||
{keep_state, State, [{reply, From, {ok, Metrics}}]};
|
||||
handle_event({call, From}, get_metric, _, State) ->
|
||||
{keep_state, State, [{reply, From, {ok, #{}}}]};
|
||||
|
||||
handle_event({call, From}, get_aes, _, State = #state{aes = Aes}) ->
|
||||
{keep_state, State, [{reply, From, {ok, Aes}}]};
|
||||
@ -165,34 +169,31 @@ handle_event({call, From}, {publish_message, ReceiverPid, CommandType, {aes, Com
|
||||
{keep_state, State, [{reply, From, {ok, Ref}}]};
|
||||
|
||||
%% 只要channel存在,就负责将消息推送到边缘端主机
|
||||
handle_event({call, From}, {publish_message, ReceiverPid, CommandType, Command}, _, State = #state{channel_pid = ChannelPid}) when is_binary(Command), is_pid(ChannelPid) ->
|
||||
handle_event({call, From}, {publish_message, ReceiverPid, CommandType, Command}, _, State = #state{uuid = UUID, channel_pid = ChannelPid}) when is_binary(Command), is_pid(ChannelPid) ->
|
||||
%% 通过websocket发送请求
|
||||
lager:debug("[iot_host] will publish_message: ~p", [Command]),
|
||||
lager:debug("[iot_host] host: ~p, will publish_message: ~p", [UUID, Command]),
|
||||
Ref = ws_channel:publish(ChannelPid, ReceiverPid, <<CommandType:8, Command/binary>>),
|
||||
|
||||
{keep_state, State, [{reply, From, {ok, Ref}}]};
|
||||
|
||||
handle_event({call, From}, {publish_message, _, _, _}, _, State) ->
|
||||
{keep_state, State, [{reply, From, {error, <<"命令类型错误,发送命令失败"/utf8>>}}]};
|
||||
{keep_state, State, [{reply, From, {error, <<"主机状态错误,发送命令失败"/utf8>>}}]};
|
||||
|
||||
%% 重新加载主机信息
|
||||
handle_event({call, From}, reload, StateName, State = #state{uuid = UUID}) ->
|
||||
%% 重新加载主机信息
|
||||
case host_bo:get_host_by_uuid(UUID) of
|
||||
{ok, Host = #{<<"status">> := Status}} ->
|
||||
{ok, Host = #{<<"authorize_status">> := AuthorizeStatus}} = host_bo:get_host_by_uuid(UUID),
|
||||
lager:debug("[iot_host] reload host uuid: ~p, successed", [Host]),
|
||||
case StateName == denied andalso Status =/= ?HOST_STATUS_INACTIVE of
|
||||
case StateName == denied andalso AuthorizeStatus =:= ?HOST_AUTHED of
|
||||
true ->
|
||||
{next_state, activated, State, [{reply, From, ok}]};
|
||||
false ->
|
||||
{keep_state, State, [{reply, From, ok}]}
|
||||
end;
|
||||
undefined ->
|
||||
lager:debug("[iot_host] reload host uuid: ~p, failed", [UUID]),
|
||||
{keep_state, State, [{reply, From, {error, <<"host not found">>}}]}
|
||||
end;
|
||||
|
||||
%% 关闭授权
|
||||
handle_event({call, From}, {activate, false}, _, State) ->
|
||||
handle_event({call, From}, {activate, false}, _, State = #state{uuid = UUID}) ->
|
||||
{ok, _} = host_bo:change_status(UUID, ?HOST_OFFLINE),
|
||||
{next_state, denied, State, [{reply, From, ok}]};
|
||||
%% 开启授权
|
||||
handle_event({call, From}, {activate, true}, denied, State) ->
|
||||
@ -225,7 +226,7 @@ handle_event({call, From}, {create_session, PubKey}, denied, State = #state{uuid
|
||||
handle_event({call, From}, {create_session, PubKey}, StateName, State = #state{uuid = UUID, aes = Aes}) ->
|
||||
Reply = #{<<"a">> => true, <<"aes">> => Aes},
|
||||
EncReply = iot_cipher_rsa:encode(Reply, PubKey),
|
||||
{ok, AffectedRow} = host_bo:change_status(UUID, ?HOST_STATUS_ONLINE),
|
||||
{ok, AffectedRow} = host_bo:change_status(UUID, ?HOST_ONLINE),
|
||||
lager:debug("[iot_host] host_id(~p) uuid: ~p, create_session, will change status, affected_row: ~p", [StateName, UUID, AffectedRow]),
|
||||
|
||||
{next_state, session, State, [{reply, From, {ok, <<10:8, EncReply/binary>>}}]};
|
||||
@ -263,7 +264,8 @@ handle_event(cast, {handle, {data, Data}}, session, State = #state{uuid = UUID,
|
||||
end,
|
||||
{keep_state, State};
|
||||
|
||||
handle_event(cast, {handle, {ping, CipherMetric}}, session, State = #state{uuid = UUID, aes = AES}) ->
|
||||
%% 任意状态下都可以ping
|
||||
handle_event(cast, {handle, {ping, CipherMetric}}, _, State = #state{uuid = UUID, aes = AES}) ->
|
||||
MetricsInfo = iot_cipher_aes:decrypt(AES, CipherMetric),
|
||||
case catch jiffy:decode(MetricsInfo, [return_maps]) of
|
||||
Metrics when is_map(Metrics) ->
|
||||
@ -274,12 +276,11 @@ handle_event(cast, {handle, {ping, CipherMetric}}, session, State = #state{uuid
|
||||
{keep_state, State}
|
||||
end;
|
||||
|
||||
handle_event(cast, {handle, {inform, Info0}}, session, State = #state{host_id = HostId, aes = AES}) ->
|
||||
lager:debug("[iot_host] handler inform, aes: ~p", [AES]),
|
||||
handle_event(cast, {handle, {inform, Info0}}, session, State = #state{uuid = UUID, host_id = HostId, aes = AES}) ->
|
||||
Info = iot_cipher_aes:decrypt(AES, Info0),
|
||||
case catch jiffy:decode(Info, [return_maps]) of
|
||||
#{<<"at">> := At, <<"services">> := ServiceInforms} ->
|
||||
lager:debug("[iot_host] service infos is: ~p", [ServiceInforms]),
|
||||
lager:debug("[iot_host] host: ~p, service infos is: ~p", [UUID, ServiceInforms]),
|
||||
lists:foreach(fun(#{<<"props">> := Props, <<"name">> := Name, <<"version">> := Version, <<"version_copy">> := VersionCopy, <<"status">> := Status}) ->
|
||||
%% props 主机id:场景id:微服务id
|
||||
[_, SceneId0, MicroId0] = binary:split(Props, <<":">>, [global]),
|
||||
@ -334,10 +335,14 @@ handle_event(cast, {handle, {feedback_result, Info0}}, session, State = #state{a
|
||||
end,
|
||||
{keep_state, State};
|
||||
|
||||
%% 其他情况丢弃数据
|
||||
handle_event(cast, {handle, {data, _}}, _, State) ->
|
||||
{keep_state, State};
|
||||
|
||||
%% 当websocket断开的时候,则设置主机状态为下线状态; 主机的状态需要转换
|
||||
handle_event(info, {'DOWN', Ref, process, ChannelPid, Reason}, StateName, State = #state{uuid = UUID, monitor_ref = Ref, channel_pid = ChannelPid}) ->
|
||||
lager:warning("[iot_host] channel: ~p, down with reason: ~p, state name: ~p, state: ~p", [ChannelPid, Reason, StateName, State]),
|
||||
{ok, _} = host_bo:change_status(UUID, ?HOST_STATUS_OFFLINE),
|
||||
{ok, _} = host_bo:change_status(UUID, ?HOST_OFFLINE),
|
||||
|
||||
%% 会话状态如果链接丢失,需要切换到activated状态,其他情况保持不变
|
||||
case StateName =:= session of
|
||||
@ -359,7 +364,7 @@ handle_event(EventType, EventContent, StateName, State) ->
|
||||
%% Reason. The return value is ignored.
|
||||
terminate(Reason, StateName, _State = #state{uuid = UUID}) ->
|
||||
lager:debug("[iot_host] host: ~p, terminate with reason: ~p, state_name: ~p", [UUID, Reason, StateName]),
|
||||
host_bo:change_status(UUID, ?HOST_STATUS_OFFLINE),
|
||||
host_bo:change_status(UUID, ?HOST_OFFLINE),
|
||||
ok.
|
||||
|
||||
%% @private
|
||||
|
||||
@ -52,24 +52,25 @@ websocket_init(_State) ->
|
||||
%% 初始状态为true
|
||||
{ok, #state{packet_id = 1}}.
|
||||
|
||||
websocket_handle({binary, <<?PACKET_REQUEST, PacketId:32, ?METHOD_REGISTER:8, Data/binary>>}, State) ->
|
||||
websocket_handle({binary, <<?PACKET_REQUEST, PacketId:32, ?METHOD_AUTH:8, Data/binary>>}, State) ->
|
||||
#{<<"uuid">> := UUID, <<"timestamp">> := Timestamp, <<"salt">> := Salt, <<"username">> := Username, <<"token">> := Token} = jiffy:decode(Data, [return_maps]),
|
||||
lager:debug("[ws_channel] register uuid: ~p, request message: ~p", [UUID, Data]),
|
||||
lager:debug("[ws_channel] auth uuid: ~p, request message: ~p", [UUID, Data]),
|
||||
case iot_auth:check(Username, Token, UUID, Salt, Timestamp) of
|
||||
true ->
|
||||
%% 查找数据库,如果没有则插入
|
||||
ok = host_bo:ensured_host(UUID),
|
||||
case host_bo:get_host_by_uuid(UUID) of
|
||||
undefined ->
|
||||
lager:warning("[ws_channel] uuid: ~p, user: ~p, host not found", [UUID, Username]),
|
||||
{stop, State};
|
||||
{ok, _} ->
|
||||
%% 尝试启动主机的服务进程
|
||||
{ok, HostPid} = iot_host_sup:ensured_host_started(UUID),
|
||||
ok = iot_host:attach_channel(HostPid, self()),
|
||||
%% 建立到host的monitor
|
||||
erlang:monitor(process, HostPid),
|
||||
|
||||
Reply = jiffy:encode(#{<<"code">> => 1, <<"message">> => <<"ok">>}, [force_utf8]),
|
||||
|
||||
lager:debug("[ws_channel] register success, host uuid: ~p", [UUID]),
|
||||
|
||||
{reply, {binary, <<?PACKET_RESPONSE, PacketId:32, 0:8, Reply/binary>>}, State#state{uuid = UUID, host_pid = HostPid}};
|
||||
{reply, {binary, <<?PACKET_RESPONSE, PacketId:32, 0:8, Reply/binary>>}, State#state{uuid = UUID, host_pid = HostPid}}
|
||||
end;
|
||||
false ->
|
||||
lager:warning("[ws_channel] uuid: ~p, user: ~p, auth failed", [UUID, Username]),
|
||||
{stop, State}
|
||||
|
||||
@ -14,7 +14,7 @@
|
||||
|
||||
## 消息类型说明
|
||||
|
||||
### register消息,
|
||||
### register消息
|
||||
|
||||
#### 请求
|
||||
<<0x01, PacketId:4, Method:1, Body:任意长度>>
|
||||
@ -67,6 +67,13 @@ Body: 公钥信息
|
||||
PacketId: 4字节整数, 值为0;
|
||||
Body: 公钥信息
|
||||
|
||||
### 主机上传终端设备的在线或离线事件
|
||||
<<0x01, PacketId:4, 0x07, DeviceUUID:32/binary, Status:1>>
|
||||
|
||||
PacketId: 4字节整数, 值为0;
|
||||
DeviceUUID: 设备32位UUID
|
||||
Status: 1字节整数,1表示在线,0表示离线
|
||||
|
||||
### data北向数据上传 (无响应)
|
||||
|
||||
#### 微服务产生的数据,点位信息为主机的点位信息
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user