change iot_host gen_server to gen_statem
This commit is contained in:
parent
e5944eea95
commit
f269525ba9
@ -2,30 +2,29 @@
|
|||||||
%%% @author aresei
|
%%% @author aresei
|
||||||
%%% @copyright (C) 2023, <COMPANY>
|
%%% @copyright (C) 2023, <COMPANY>
|
||||||
%%% @doc
|
%%% @doc
|
||||||
%%% TODO
|
%%%
|
||||||
%%% 1. 指令下发是需要微服务名称的,微服务的名称里面是带copy的完整名称: (modbus:12345)
|
|
||||||
%%% @end
|
%%% @end
|
||||||
%%% Created : 12. 3月 2023 21:27
|
%%% Created : 19. 6月 2023 10:32
|
||||||
%%%-------------------------------------------------------------------
|
%%%-------------------------------------------------------------------
|
||||||
-module(iot_host).
|
-module(iot_host).
|
||||||
-author("aresei").
|
-author("aresei").
|
||||||
-include("iot.hrl").
|
-include("iot.hrl").
|
||||||
|
|
||||||
-behaviour(gen_server).
|
-behaviour(gen_statem).
|
||||||
|
|
||||||
%% 心跳包的间隔周期, 值需要比host上传的间隔大一些才行
|
|
||||||
-define(TICKER_INTERVAL, 5000 * 2).
|
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([start_link/2, get_name/1, get_pid/1, handle/2, reload/1, activate/2]).
|
-export([start_link/2, get_name/1, get_pid/1, handle/2, reload/1, activate/2]).
|
||||||
-export([get_metric/1, aes_encode/3, downstream_topic/1, upstream_topic/1, get_aes/1, make_assoc/1, rsa_encode/3]).
|
-export([get_metric/1, aes_encode/3, downstream_topic/1, upstream_topic/1, get_aes/1, make_assoc/1, rsa_encode/3]).
|
||||||
-export([has_session/1]).
|
-export([has_session/1]).
|
||||||
|
|
||||||
%% gen_server callbacks
|
%% gen_statem callbacks
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
-export([init/1, format_status/2, handle_event/4, terminate/3, code_change/4, callback_mode/0]).
|
||||||
|
|
||||||
-define(SERVER, ?MODULE).
|
-define(SERVER, ?MODULE).
|
||||||
|
|
||||||
|
%% 心跳包的间隔周期, 值需要比host上传的间隔大一些才行
|
||||||
|
-define(TICKER_INTERVAL, 5000 * 2).
|
||||||
|
|
||||||
-record(state, {
|
-record(state, {
|
||||||
host_id :: integer(),
|
host_id :: integer(),
|
||||||
%% 从数据库里面读取到的数据
|
%% 从数据库里面读取到的数据
|
||||||
@ -44,12 +43,6 @@
|
|||||||
%% 是否获取到了ping请求
|
%% 是否获取到了ping请求
|
||||||
is_answered = false :: boolean(),
|
is_answered = false :: boolean(),
|
||||||
|
|
||||||
%% 标识当前主机是否已经注册,每次会话时主机需要重新协商会话; 通过status的值判断是否已经激活,值为:-1时,表示未激活
|
|
||||||
is_activated :: boolean(),
|
|
||||||
|
|
||||||
%% 会话状态
|
|
||||||
has_session = false :: boolean(),
|
|
||||||
|
|
||||||
%% 任务的自增id
|
%% 任务的自增id
|
||||||
increment_id = 1,
|
increment_id = 1,
|
||||||
%% 关联数据
|
%% 关联数据
|
||||||
@ -59,7 +52,6 @@
|
|||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
%%% API
|
%%% API
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
|
|
||||||
-spec get_pid(UUID :: binary()) -> undefined | pid().
|
-spec get_pid(UUID :: binary()) -> undefined | pid().
|
||||||
get_pid(UUID) when is_binary(UUID) ->
|
get_pid(UUID) when is_binary(UUID) ->
|
||||||
Name = get_name(UUID),
|
Name = get_name(UUID),
|
||||||
@ -81,60 +73,60 @@ upstream_topic(UUID) when is_binary(UUID) ->
|
|||||||
%% 处理消息
|
%% 处理消息
|
||||||
-spec handle(Pid :: pid(), Payload :: binary() | map()) -> no_return().
|
-spec handle(Pid :: pid(), Payload :: binary() | map()) -> no_return().
|
||||||
handle(Pid, Payload) when is_pid(Pid), is_binary(Payload); is_map(Payload) ->
|
handle(Pid, Payload) when is_pid(Pid), is_binary(Payload); is_map(Payload) ->
|
||||||
gen_server:cast(Pid, {handle, Payload}).
|
gen_statem:cast(Pid, {handle, Payload}).
|
||||||
|
|
||||||
%% 重新加载主机的基本信息
|
%% 重新加载主机的基本信息
|
||||||
-spec reload(Pid :: pid()) -> ok | {error, Reason :: any()}.
|
-spec reload(Pid :: pid()) -> ok | {error, Reason :: any()}.
|
||||||
reload(Pid) when is_pid(Pid) ->
|
reload(Pid) when is_pid(Pid) ->
|
||||||
gen_server:call(Pid, reload).
|
gen_statem:call(Pid, reload).
|
||||||
|
|
||||||
-spec get_aes(Pid :: pid()) -> Aes :: binary().
|
-spec get_aes(Pid :: pid()) -> Aes :: binary().
|
||||||
get_aes(Pid) when is_pid(Pid) ->
|
get_aes(Pid) when is_pid(Pid) ->
|
||||||
gen_server:call(Pid, get_aes).
|
gen_statem:call(Pid, get_aes).
|
||||||
|
|
||||||
-spec make_assoc(Pid :: pid()) -> {ok, Assoc :: binary()}.
|
-spec make_assoc(Pid :: pid()) -> {ok, Assoc :: binary()}.
|
||||||
make_assoc(Pid) when is_pid(Pid) ->
|
make_assoc(Pid) when is_pid(Pid) ->
|
||||||
gen_server:call(Pid, {make_assoc, self()}).
|
gen_statem:call(Pid, {make_assoc, self()}).
|
||||||
|
|
||||||
%% 激活主机, true 表示激活; false表示关闭激活
|
%% 激活主机, true 表示激活; false表示关闭激活
|
||||||
-spec activate(Pid :: pid(), Auth :: boolean()) -> ok.
|
-spec activate(Pid :: pid(), Auth :: boolean()) -> ok.
|
||||||
activate(Pid, Auth) when is_pid(Pid), is_boolean(Auth) ->
|
activate(Pid, Auth) when is_pid(Pid), is_boolean(Auth) ->
|
||||||
gen_server:call(Pid, {activate, Auth}).
|
gen_statem:call(Pid, {activate, Auth}).
|
||||||
|
|
||||||
-spec get_metric(Pid :: pid()) -> {ok, MetricInfo :: map()}.
|
-spec get_metric(Pid :: pid()) -> {ok, MetricInfo :: map()}.
|
||||||
get_metric(Pid) when is_pid(Pid) ->
|
get_metric(Pid) when is_pid(Pid) ->
|
||||||
gen_server:call(Pid, get_metric).
|
gen_statem:call(Pid, get_metric).
|
||||||
|
|
||||||
-spec has_session(Pid :: pid()) -> boolean().
|
-spec has_session(Pid :: pid()) -> boolean().
|
||||||
has_session(Pid) when is_pid(Pid) ->
|
has_session(Pid) when is_pid(Pid) ->
|
||||||
gen_server:call(Pid, has_session).
|
gen_statem:call(Pid, has_session).
|
||||||
|
|
||||||
%% 基于rsa加密的指令都是不需要会话存在的
|
%% 基于rsa加密的指令都是不需要会话存在的
|
||||||
-spec rsa_encode(Pid :: pid(), CommandType :: integer(), PlainText :: binary()) ->
|
-spec rsa_encode(Pid :: pid(), CommandType :: integer(), PlainText :: binary()) ->
|
||||||
{ok, EncText :: binary()} | {error, Reason :: binary()}.
|
{ok, EncText :: binary()} | {error, Reason :: binary()}.
|
||||||
rsa_encode(Pid, CommandType, PlainText) when is_pid(Pid), is_integer(CommandType), is_binary(PlainText) ->
|
rsa_encode(Pid, CommandType, PlainText) when is_pid(Pid), is_integer(CommandType), is_binary(PlainText) ->
|
||||||
gen_server:call(Pid, {rsa_encode, CommandType, PlainText}).
|
gen_statem:call(Pid, {rsa_encode, CommandType, PlainText}).
|
||||||
|
|
||||||
-spec aes_encode(Pid :: pid(), CommandType :: integer(), Params :: binary()) ->
|
-spec aes_encode(Pid :: pid(), CommandType :: integer(), Params :: binary()) ->
|
||||||
{ok, Command :: binary()} | {error, Reason :: any()}.
|
{ok, Command :: binary()} | {error, Reason :: any()}.
|
||||||
aes_encode(Pid, CommandType, Params) when is_pid(Pid), is_integer(CommandType), is_binary(Params) ->
|
aes_encode(Pid, CommandType, Params) when is_pid(Pid), is_integer(CommandType), is_binary(Params) ->
|
||||||
gen_server:call(Pid, {aes_encode, CommandType, Params}).
|
gen_statem:call(Pid, {aes_encode, CommandType, Params}).
|
||||||
|
|
||||||
%% @doc Spawns the server and registers the local name (unique)
|
|
||||||
-spec(start_link(Name :: atom(), UUID :: binary()) ->
|
%% @doc Creates a gen_statem process which calls Module:init/1 to
|
||||||
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
|
%% initialize. To ensure a synchronized start-up procedure, this
|
||||||
|
%% function does not return until Module:init/1 has returned.
|
||||||
start_link(Name, UUID) when is_atom(Name), is_binary(UUID) ->
|
start_link(Name, UUID) when is_atom(Name), is_binary(UUID) ->
|
||||||
gen_server:start_link({local, Name}, ?MODULE, [UUID], []).
|
gen_statem:start_link({local, Name}, ?MODULE, [UUID], []).
|
||||||
|
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
%%% gen_server callbacks
|
%%% gen_statem callbacks
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
%% @doc Initializes the server
|
%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or
|
||||||
-spec(init(Args :: term()) ->
|
%% gen_statem:start_link/[3,4], this function is called by the new
|
||||||
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
|
%% process to initialize.
|
||||||
{stop, Reason :: term()} | ignore).
|
|
||||||
init([UUID]) ->
|
init([UUID]) ->
|
||||||
case host_bo:get_host_by_uuid(UUID) of
|
case host_bo:get_host_by_uuid(UUID) of
|
||||||
{ok, #{<<"status">> := Status, <<"id">> := HostId}} ->
|
{ok, #{<<"status">> := Status, <<"id">> := HostId}} ->
|
||||||
@ -144,92 +136,88 @@ init([UUID]) ->
|
|||||||
%% 告知主机端需要重新授权
|
%% 告知主机端需要重新授权
|
||||||
gen_server:cast(self(), need_auth),
|
gen_server:cast(self(), need_auth),
|
||||||
|
|
||||||
{ok, #state{host_id = HostId, uuid = UUID, aes = Aes, is_activated = (Status /= ?HOST_STATUS_INACTIVE), status = Status, has_session = false}};
|
StateName = case Status =:= ?HOST_STATUS_INACTIVE of
|
||||||
|
true -> denied;
|
||||||
|
false -> activated
|
||||||
|
end,
|
||||||
|
|
||||||
|
{ok, StateName, #state{host_id = HostId, uuid = UUID, aes = Aes, status = Status}};
|
||||||
undefined ->
|
undefined ->
|
||||||
lager:warning("[iot_host] host uuid: ~p, loaded from mysql failed", [UUID]),
|
lager:warning("[iot_host] host uuid: ~p, loaded from mysql failed", [UUID]),
|
||||||
ignore
|
ignore
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
%% @doc Handling call messages
|
%% @doc This function is called by a gen_statem when it needs to find out
|
||||||
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
|
%% the callback mode of the callback module.
|
||||||
State :: #state{}) ->
|
callback_mode() ->
|
||||||
{reply, Reply :: term(), NewState :: #state{}} |
|
handle_event_function.
|
||||||
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
|
|
||||||
{noreply, NewState :: #state{}} |
|
|
||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
|
||||||
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
|
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
|
||||||
handle_call(get_metric, _From, State = #state{metrics = Metrics, has_session = true}) ->
|
|
||||||
{reply, {ok, Metrics}, State};
|
|
||||||
handle_call(get_metric, _From, State) ->
|
|
||||||
{reply, {ok, #{}}, State};
|
|
||||||
|
|
||||||
handle_call(get_aes, _From, State = #state{aes = Aes}) ->
|
%% @private
|
||||||
{reply, {ok, Aes}, State};
|
%% @doc Called (1) whenever sys:get_status/1,2 is called by gen_statem or
|
||||||
|
%% (2) when gen_statem terminates abnormally.
|
||||||
|
%% This callback is optional.
|
||||||
|
format_status(_Opt, [_PDict, _StateName, _State]) ->
|
||||||
|
Status = some_term,
|
||||||
|
Status.
|
||||||
|
|
||||||
%% 获取当前是否存在会话
|
%% @private
|
||||||
handle_call(has_session, _From, State = #state{has_session = HasSession}) ->
|
%% @doc If callback_mode is handle_event_function, then whenever a
|
||||||
{reply, HasSession, State};
|
%% gen_statem receives an event from call/2, cast/2, or as a normal
|
||||||
|
%% process message, this function is called.
|
||||||
|
handle_event({call, From}, get_metric, session, State = #state{metrics = Metrics}) ->
|
||||||
|
{keep_state, State, [{reply, From, {ok, Metrics}}]};
|
||||||
|
handle_event({call, From}, get_metric, _, State) ->
|
||||||
|
{keep_state, State, [{reply, From, {ok, #{}}}]};
|
||||||
|
|
||||||
%% 实现rsa加密
|
handle_event({call, From}, get_aes, _, State = #state{aes = Aes}) ->
|
||||||
%% 基于rsa加密的指令都是不需要会话存在的
|
{keep_state, State, [{reply, From, {ok, Aes}}]};
|
||||||
handle_call({rsa_encode, _, _}, _From, State = #state{pub_key = <<>>}) ->
|
|
||||||
{reply, {error, <<"会话未建立"/utf8>>}, State};
|
handle_event({call, From}, has_session, StateName, State) ->
|
||||||
handle_call({rsa_encode, CommandType, PlainText}, _From, State = #state{pub_key = PubKey}) ->
|
HasSession = (StateName =:= session) orelse false,
|
||||||
|
{keep_state, State, [{reply, From, HasSession}]};
|
||||||
|
|
||||||
|
%% 基于rsa加密
|
||||||
|
handle_event({call, From}, {rsa_encode, CommandType, PlainText}, session, State = #state{pub_key = PubKey}) ->
|
||||||
Reply = iot_cipher_rsa:encode(PlainText, PubKey),
|
Reply = iot_cipher_rsa:encode(PlainText, PubKey),
|
||||||
|
{keep_state, State, [{reply, From, {ok, <<CommandType:8, Reply/binary>>}}]};
|
||||||
|
handle_event({call, From}, {rsa_encode, _, _}, _, State) ->
|
||||||
|
{keep_state, State, [{reply, From, {error, <<"会话未建立"/utf8>>}}]};
|
||||||
|
|
||||||
{reply, {ok, <<CommandType:8, Reply/binary>>}, State};
|
%% 基于aes加密
|
||||||
|
handle_event({call, From}, {aes_encode, CommandType, Command}, session, State = #state{aes = AES}) ->
|
||||||
|
EncCommand = iot_cipher_aes:encrypt(AES, Command),
|
||||||
|
{keep_state, State, [{reply, From, {ok, <<CommandType:8, EncCommand/binary>>}}]};
|
||||||
|
handle_event({call, From}, {aes_encode, _, _}, _, State) ->
|
||||||
|
{keep_state, State, [{reply, From, {error, <<"会话未建立"/utf8>>}}]};
|
||||||
|
|
||||||
handle_call({make_assoc, ReceivePid}, _From, State = #state{uuid = UUID, increment_id = IncrementId, assoc_map = AssocMap}) ->
|
handle_event({call, From}, reload, StateName, State = #state{uuid = UUID}) ->
|
||||||
BinIncrementId = erlang:integer_to_binary(IncrementId),
|
|
||||||
Assoc = <<UUID/binary, ":assoc:", BinIncrementId/binary>>,
|
|
||||||
|
|
||||||
{reply, {ok, Assoc}, State#state{assoc_map = maps:put(Assoc, ReceivePid, AssocMap), increment_id = IncrementId + 1}};
|
|
||||||
|
|
||||||
%% 重新加载主机信息
|
|
||||||
handle_call(reload, _From, State = #state{uuid = UUID}) ->
|
|
||||||
%% 重新加载主机信息
|
%% 重新加载主机信息
|
||||||
case host_bo:get_host_by_uuid(UUID) of
|
case host_bo:get_host_by_uuid(UUID) of
|
||||||
{ok, Host = #{<<"status">> := Status, <<"id">> := HostId}} ->
|
{ok, Host = #{<<"status">> := Status}} ->
|
||||||
lager:debug("[iot_host] reload host uuid: ~p, successed", [Host]),
|
lager:debug("[iot_host] reload host uuid: ~p, successed", [Host]),
|
||||||
{reply, ok, State#state{host_id = HostId, is_activated = (Status /= ?HOST_STATUS_INACTIVE), status = Status}};
|
case StateName == denied andalso Status =/= ?HOST_STATUS_INACTIVE of
|
||||||
|
true ->
|
||||||
|
{next_state, activated, State#state{status = Status}, [{reply, From, ok}]};
|
||||||
|
false ->
|
||||||
|
{keep_state, State#state{status = Status}, [{reply, From, ok}]}
|
||||||
|
end;
|
||||||
undefined ->
|
undefined ->
|
||||||
lager:debug("[iot_host] reload host uuid: ~p, failed", [UUID]),
|
lager:debug("[iot_host] reload host uuid: ~p, failed", [UUID]),
|
||||||
{reply, {error, <<"host not found">>}, State}
|
{keep_state, State, [{reply, From, {error, <<"host not found">>}}]}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
% 处理主机的激活
|
%% 关闭授权
|
||||||
handle_call({activate, Auth}, _From, State = #state{uuid = UUID, is_activated = IsActivated}) ->
|
handle_event({call, From}, {activate, false}, _, State) ->
|
||||||
lager:debug("[iot_host] host uuid: ~p, new activate status: ~p, before status is: ~p", [UUID, Auth, IsActivated]),
|
{next_state, denied, State#state{pub_key = <<>>}, [{reply, From, ok}]};
|
||||||
%% 关闭授权时,认为会话时关闭状态
|
%% 开启授权
|
||||||
case Auth of
|
handle_event({call, From}, {activate, true}, denied, State) ->
|
||||||
true ->
|
{next_state, activated, State, [{reply, From, ok}]};
|
||||||
{reply, ok, State#state{is_activated = true}};
|
handle_event({call, From}, {activate, true}, _, State) ->
|
||||||
false ->
|
{keep_state, State, [{reply, From, ok}]};
|
||||||
{reply, ok, State#state{is_activated = false, has_session = false, pub_key = <<>>}}
|
|
||||||
end;
|
|
||||||
|
|
||||||
%% 创建命令
|
handle_event(cast, need_auth, _StateName, State = #state{uuid = UUID}) ->
|
||||||
handle_call({aes_encode, _, _}, _From, State = #state{has_session = false}) ->
|
|
||||||
{reply, {error, <<"会话未建立"/utf8>>}, State};
|
|
||||||
handle_call({aes_encode, CommandType, Command}, _From, State = #state{aes = AES, has_session = true}) ->
|
|
||||||
EncCommand = iot_cipher_aes:encrypt(AES, Command),
|
|
||||||
|
|
||||||
{reply, {ok, <<CommandType:8, EncCommand/binary>>}, State};
|
|
||||||
|
|
||||||
handle_call(Info, _From, State) ->
|
|
||||||
lager:debug("[iot_host] handle info: ~p", [Info]),
|
|
||||||
|
|
||||||
{reply, ok, State}.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Handling cast messages
|
|
||||||
-spec(handle_cast(Request :: term(), State :: #state{}) ->
|
|
||||||
{noreply, NewState :: #state{}} |
|
|
||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
|
||||||
handle_cast(need_auth, State = #state{uuid = UUID}) ->
|
|
||||||
Reply = jiffy:encode(#{<<"auth">> => false, <<"aes">> => <<"">>}, [force_utf8]),
|
Reply = jiffy:encode(#{<<"auth">> => false, <<"aes">> => <<"">>}, [force_utf8]),
|
||||||
{ok, Ref} = iot_mqtt_publisher:publish(downstream_topic(UUID), <<8:8, Reply/binary>>, 1),
|
{ok, Ref} = iot_mqtt_publisher:publish(downstream_topic(UUID), <<8:8, Reply/binary>>, 1),
|
||||||
receive
|
receive
|
||||||
@ -238,130 +226,86 @@ handle_cast(need_auth, State = #state{uuid = UUID}) ->
|
|||||||
after 5000 ->
|
after 5000 ->
|
||||||
lager:debug("[iot_host] host_id uuid: ~p, publish need_auth reply get error is: timeout", [UUID])
|
lager:debug("[iot_host] host_id uuid: ~p, publish need_auth reply get error is: timeout", [UUID])
|
||||||
end,
|
end,
|
||||||
{noreply, State};
|
{keep_state, State};
|
||||||
handle_cast({handle, Payload}, State) ->
|
|
||||||
%% 处理消息
|
|
||||||
NState = handle_message(Payload, State),
|
|
||||||
{noreply, NState#state{is_answered = true}}.
|
|
||||||
|
|
||||||
%% @private
|
%% 需要将消息转换成json格式然后再处理, 需要在host进程里面处理
|
||||||
%% @doc Handling all non call/cast messages
|
%% 收到消息则认为主机端已经发送了心跳包
|
||||||
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
|
handle_event(cast, {handle, Payload}, _StateName, State) ->
|
||||||
{noreply, NewState :: #state{}} |
|
|
||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
|
||||||
%% 超时,并且没有收到任何消息
|
|
||||||
handle_info({timeout, _, ping_ticker}, State = #state{uuid = UUID, is_answered = IsAnswered, status = Status}) ->
|
|
||||||
erlang:start_timer(?TICKER_INTERVAL, self(), ping_ticker),
|
|
||||||
%% 需要考虑到主机未激活的情况,主机未激活,返回: keep_status
|
|
||||||
NextStatus = if
|
|
||||||
not IsAnswered andalso Status == ?HOST_STATUS_ONLINE ->
|
|
||||||
{next_status, ?HOST_STATUS_OFFLINE};
|
|
||||||
IsAnswered andalso Status == ?HOST_STATUS_OFFLINE ->
|
|
||||||
{next_status, ?HOST_STATUS_ONLINE};
|
|
||||||
true ->
|
|
||||||
keep_status
|
|
||||||
end,
|
|
||||||
|
|
||||||
case NextStatus of
|
|
||||||
keep_status ->
|
|
||||||
{noreply, State#state{is_answered = false}};
|
|
||||||
{next_status, NStatus} ->
|
|
||||||
case host_bo:change_status(UUID, NStatus) of
|
|
||||||
{ok, _} ->
|
|
||||||
{noreply, State#state{status = NStatus, is_answered = false}};
|
|
||||||
{error, Reason} ->
|
|
||||||
lager:warning("[iot_host] change host status of uuid: ~p, error: ~p", [UUID, Reason]),
|
|
||||||
{noreply, State#state{is_answered = false}}
|
|
||||||
end
|
|
||||||
end;
|
|
||||||
|
|
||||||
handle_info(Info, State = #state{uuid = UUID}) ->
|
|
||||||
lager:warning("[iot_host] host uuid: ~p, get unknown info: ~p", [UUID, Info]),
|
|
||||||
{noreply, State}.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc This function is called by a gen_server when it is about to
|
|
||||||
%% terminate. It should be the opposite of Module:init/1 and do any
|
|
||||||
%% necessary cleaning up. When it returns, the gen_server terminates
|
|
||||||
%% with Reason. The return value is ignored.
|
|
||||||
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
|
|
||||||
State :: #state{}) -> term()).
|
|
||||||
terminate(Reason, #state{uuid = UUID}) ->
|
|
||||||
lager:debug("[iot_host] terminate with reason: ~p", [Reason]),
|
|
||||||
ChangeResult = host_bo:change_status(UUID, ?HOST_STATUS_OFFLINE),
|
|
||||||
lager:debug("[iot_host] change host: ~p, status result is: ~p", [UUID, ChangeResult]),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Convert process state when code is changed
|
|
||||||
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
|
|
||||||
Extra :: term()) ->
|
|
||||||
{ok, NewState :: #state{}} | {error, Reason :: term()}).
|
|
||||||
code_change(_OldVsn, State = #state{}, _Extra) ->
|
|
||||||
{ok, State}.
|
|
||||||
|
|
||||||
%%%===================================================================
|
|
||||||
%%% Internal functions
|
|
||||||
%%%===================================================================
|
|
||||||
|
|
||||||
%% 基于topic分离了,因此数据里面不需要uuid
|
|
||||||
handle_message(Payload, State) when is_binary(Payload) ->
|
|
||||||
Message = catch jiffy:decode(Payload, [return_maps]),
|
Message = catch jiffy:decode(Payload, [return_maps]),
|
||||||
lager:debug("[iot_host] get message: ~p", [Message]),
|
lager:debug("[iot_host] get message: ~p", [Message]),
|
||||||
handle_message(Message, State);
|
gen_statem:cast(self(), {handle_message, Message}),
|
||||||
|
|
||||||
%% create_session操作, 需要等待mqtt服务的响应
|
{keep_state, State#state{is_answered = true}};
|
||||||
handle_message(#{<<"method">> := <<"create_session">>, <<"params">> := #{<<"pub_key">> := PubKey}},
|
|
||||||
State = #state{is_activated = IsActivated, uuid = UUID, aes = Aes}) ->
|
|
||||||
|
|
||||||
|
handle_event(cast, {handle_message, #{<<"method">> := <<"create_session">>, <<"params">> := #{<<"pub_key">> := PubKey}}}, denied, State = #state{uuid = UUID}) ->
|
||||||
lager:debug("[iot_host] host_id uuid: ~p, create_session", [UUID]),
|
lager:debug("[iot_host] host_id uuid: ~p, create_session", [UUID]),
|
||||||
Reply = case IsActivated of
|
Reply = #{<<"a">> => false, <<"aes">> => <<"">>},
|
||||||
true ->
|
EncReply = iot_cipher_rsa:encode(Reply, PubKey),
|
||||||
#{<<"a">> => true, <<"aes">> => Aes};
|
|
||||||
false ->
|
{ok, Ref} = iot_mqtt_publisher:publish(downstream_topic(UUID), <<10:8, EncReply/binary>>, 1),
|
||||||
#{<<"a">> => false, <<"aes">> => <<"">>}
|
receive
|
||||||
end,
|
{ok, Ref, PacketId} ->
|
||||||
|
lager:debug("[iot_host] host_id uuid: ~p, packet_id: ~p, publish register reply success", [UUID, PacketId])
|
||||||
|
after 10000 ->
|
||||||
|
lager:debug("[iot_host] host_id uuid: ~p, publish register reply get error is: timeout", [UUID])
|
||||||
|
end,
|
||||||
|
{keep_state, State};
|
||||||
|
|
||||||
|
handle_event(cast, {handle_message, #{<<"method">> := <<"create_session">>, <<"params">> := #{<<"pub_key">> := PubKey}}}, _StateName, State = #state{uuid = UUID, aes = Aes}) ->
|
||||||
|
lager:debug("[iot_host] host_id uuid: ~p, create_session", [UUID]),
|
||||||
|
Reply = #{<<"a">> => true, <<"aes">> => Aes},
|
||||||
EncReply = iot_cipher_rsa:encode(Reply, PubKey),
|
EncReply = iot_cipher_rsa:encode(Reply, PubKey),
|
||||||
|
|
||||||
{ok, Ref} = iot_mqtt_publisher:publish(downstream_topic(UUID), <<10:8, EncReply/binary>>, 1),
|
{ok, Ref} = iot_mqtt_publisher:publish(downstream_topic(UUID), <<10:8, EncReply/binary>>, 1),
|
||||||
receive
|
receive
|
||||||
{ok, Ref, PacketId} ->
|
{ok, Ref, PacketId} ->
|
||||||
lager:debug("[iot_host] host_id uuid: ~p, packet_id: ~p, publish register reply success", [UUID, PacketId]),
|
lager:debug("[iot_host] host_id uuid: ~p, packet_id: ~p, publish register reply success", [UUID, PacketId]),
|
||||||
State#state{pub_key = PubKey, has_session = true}
|
{next_state, session, State#state{pub_key = PubKey}}
|
||||||
after 10000 ->
|
after 10000 ->
|
||||||
lager:debug("[iot_host] host_id uuid: ~p, publish register reply get error is: timeout", [UUID]),
|
lager:debug("[iot_host] host_id uuid: ~p, publish register reply get error is: timeout", [UUID]),
|
||||||
State
|
{keep_state, State}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
%% 数据上传
|
handle_event(cast, {handle_message, #{<<"method">> := <<"data">>, <<"params">> := Data}}, session, State = #state{uuid = UUID, aes = AES}) ->
|
||||||
handle_message(#{<<"method">> := <<"data">>, <<"params">> := Data}, State = #state{has_session = true, aes = AES, uuid = UUID}) ->
|
|
||||||
PlainData = iot_cipher_aes:decrypt(AES, base64:decode(Data)),
|
PlainData = iot_cipher_aes:decrypt(AES, base64:decode(Data)),
|
||||||
case catch jiffy:decode(PlainData, [return_maps]) of
|
case catch jiffy:decode(PlainData, [return_maps]) of
|
||||||
Infos when is_list(Infos) ->
|
Infos when is_list(Infos) ->
|
||||||
lager:debug("[iot_host] the data is: ~p", [Infos]),
|
lager:debug("[iot_host] the data is: ~p", [Infos]),
|
||||||
insert_metrics(UUID, Infos);
|
%% 记录数据, TODO 转换点位信息, Fields里面包含了 <<"device_id">> 信息
|
||||||
|
lists:foreach(fun(Info = #{<<"service_name">> := ServiceName, <<"fields">> := FieldsList, <<"tags">> := Tags}) when is_binary(ServiceName) ->
|
||||||
|
Timestamp = maps:get(<<"at">>, Info, iot_util:timestamp()),
|
||||||
|
|
||||||
|
NTags = Tags#{<<"uuid">> => UUID, <<"service_name">> => ServiceName},
|
||||||
|
%% 微服务名前缀作为measurement来保存数据
|
||||||
|
[Measurement | _] = binary:split(ServiceName, <<":">>),
|
||||||
|
|
||||||
|
Points = lists:map(fun(Fields) -> influx_point:new(Measurement, NTags, Fields, Timestamp) end, FieldsList),
|
||||||
|
Precision = influx_client:get_precision(Timestamp),
|
||||||
|
|
||||||
|
poolboy:transaction(influx_pool, fun(Pid) -> influx_client:write(Pid, <<"iot">>, <<"iot">>, Precision, Points) end)
|
||||||
|
end, Infos);
|
||||||
Other ->
|
Other ->
|
||||||
lager:debug("[iot_message_handler] the metric is invalid json: ~p", [Other])
|
lager:debug("[iot_message_handler] the metric is invalid json: ~p", [Other])
|
||||||
end,
|
end,
|
||||||
State;
|
{keep_state, State};
|
||||||
|
|
||||||
%% 处理服务器的ping
|
handle_event(cast, {handle_message, #{<<"method">> := <<"ping">>, <<"params">> := CipherMetric}}, session, State = #state{uuid = UUID, aes = AES}) ->
|
||||||
handle_message(#{<<"method">> := <<"ping">>, <<"params">> := CipherMetric}, State = #state{has_session = true, uuid = UUID, aes = AES}) ->
|
|
||||||
MetricsInfo = iot_cipher_aes:decrypt(AES, base64:decode(CipherMetric)),
|
MetricsInfo = iot_cipher_aes:decrypt(AES, base64:decode(CipherMetric)),
|
||||||
Metrics = jiffy:decode(MetricsInfo, [return_maps]),
|
Metrics = jiffy:decode(MetricsInfo, [return_maps]),
|
||||||
|
|
||||||
lager:debug("[iot_host] host_id uuid: ~p, get ping: ~p", [UUID, Metrics]),
|
lager:debug("[iot_host] host_id uuid: ~p, get ping: ~p", [UUID, Metrics]),
|
||||||
State#state{metrics = Metrics};
|
{keep_state, State#state{metrics = Metrics}};
|
||||||
|
|
||||||
%% 处理微服务的信息上报
|
handle_event(cast, {handle_message, #{<<"method">> := <<"inform">>, <<"params">> := Info0}}, session, State = #state{host_id = HostId, aes = AES}) ->
|
||||||
handle_message(#{<<"method">> := <<"inform">>, <<"params">> := Info0}, State = #state{has_session = true, host_id = HostId, aes = AES}) ->
|
|
||||||
Info = iot_cipher_aes:decrypt(AES, base64:decode(Info0)),
|
Info = iot_cipher_aes:decrypt(AES, base64:decode(Info0)),
|
||||||
case catch jiffy:decode(Info, [return_maps]) of
|
case catch jiffy:decode(Info, [return_maps]) of
|
||||||
#{<<"at">> := At, <<"services">> := ServiceInforms} ->
|
#{<<"at">> := At, <<"services">> := ServiceInforms} ->
|
||||||
lists:foreach(fun(#{<<"props">> := Props, <<"name">> := Name, <<"version">> := Version, <<"version_copy">> := VersionCopy, <<"status">> := Status}) ->
|
lists:foreach(fun(#{<<"props">> := Props, <<"name">> := Name, <<"version">> := Version, <<"version_copy">> := VersionCopy, <<"status">> := Status}) ->
|
||||||
%% props 主机id:场景id:微服务id
|
%% props 主机id:场景id:微服务id
|
||||||
[_, SceneId, MicroId] = binary:split(Props, <<":">>, [global]),
|
[_, SceneId0, MicroId0] = binary:split(Props, <<":">>, [global]),
|
||||||
|
SceneId = binary_to_integer(SceneId0),
|
||||||
|
MicroId = binary_to_integer(MicroId0),
|
||||||
|
|
||||||
micro_inform_log:insert(#{
|
micro_inform_log:insert(#{
|
||||||
<<"host_id">> => HostId,
|
<<"host_id">> => HostId,
|
||||||
<<"scene_id">> => SceneId,
|
<<"scene_id">> => SceneId,
|
||||||
@ -376,52 +320,81 @@ handle_message(#{<<"method">> := <<"inform">>, <<"params">> := Info0}, State = #
|
|||||||
Error ->
|
Error ->
|
||||||
lager:warning("[iot_host] inform get error: ~p", [Error])
|
lager:warning("[iot_host] inform get error: ~p", [Error])
|
||||||
end,
|
end,
|
||||||
State;
|
{keep_state, State};
|
||||||
|
|
||||||
%% 处理命令的上报结果
|
handle_event(cast, {handle_message, #{<<"method">> := <<"feedback_result">>, <<"params">> := Info0}}, session, State = #state{aes = AES}) ->
|
||||||
handle_message(#{<<"method">> := <<"feedback_result">>, <<"params">> := Info0}, State = #state{has_session = true, aes = AES}) ->
|
|
||||||
Info = iot_cipher_aes:decrypt(AES, base64:decode(Info0)),
|
Info = iot_cipher_aes:decrypt(AES, base64:decode(Info0)),
|
||||||
case catch jiffy:decode(Info, [return_maps]) of
|
case catch jiffy:decode(Info, [return_maps]) of
|
||||||
#{<<"task_id">> := TaskId, <<"time">> := Time, <<"code">> := Code, <<"reason">> := Reason, <<"error">> := Error, <<"type">> := Type} ->
|
#{<<"task_id">> := TaskId, <<"time">> := Time, <<"code">> := Code, <<"reason">> := Reason, <<"error">> := Error, <<"type">> := Type} ->
|
||||||
scene_feedback:insert(#{
|
scene_feedback:insert(#{
|
||||||
<<"task_id">> => TaskId,
|
<<"task_id">> => TaskId,
|
||||||
<<"task_type">> => Type,
|
<<"task_type">> => Type,
|
||||||
<<"code">> => Code,
|
<<"code">> => Code,
|
||||||
<<"reason">> => Reason,
|
<<"reason">> => Reason,
|
||||||
<<"error">> => Error,
|
<<"error">> => Error,
|
||||||
<<"created_at">> => Time
|
<<"created_at">> => Time
|
||||||
});
|
});
|
||||||
Other ->
|
Other ->
|
||||||
lager:warning("[iot_host] feedback_result error: ~p", [Other])
|
lager:warning("[iot_host] feedback_result error: ~p", [Other])
|
||||||
end,
|
end,
|
||||||
State;
|
{keep_state, State};
|
||||||
|
|
||||||
%% 处理客户端激活的响应, 完整格式为: {"code": 0|1, "message": "", "assoc": string}
|
%% 处理客户端激活的响应, 完整格式为: {"code": 0|1, "message": "", "assoc": string}
|
||||||
handle_message(Msg = #{<<"code">> := _Code, <<"assoc">> := Assoc}, State = #state{assoc_map = AssocMap}) ->
|
handle_event(cast, {handle_message, Msg = #{<<"code">> := _Code, <<"assoc">> := Assoc}}, _, State = #state{assoc_map = AssocMap}) ->
|
||||||
case maps:take(Assoc, AssocMap) of
|
case maps:take(Assoc, AssocMap) of
|
||||||
error ->
|
error ->
|
||||||
State;
|
{keep_state, State};
|
||||||
{ReceiverPid, NAssocMap} ->
|
{ReceiverPid, NAssocMap} ->
|
||||||
ReceiverPid ! {host_reply, Assoc, Msg},
|
ReceiverPid ! {host_reply, Assoc, Msg},
|
||||||
State#state{assoc_map = NAssocMap}
|
{keep_state, State#state{assoc_map = NAssocMap}}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_message(Message, State = #state{uuid = UUID, has_session = HasSession}) ->
|
handle_event(info, {timeout, _, ping_ticker}, _StateName, State = #state{uuid = UUID, is_answered = IsAnswered, status = Status}) ->
|
||||||
not HasSession andalso gen_server:cast(self(), need_auth),
|
erlang:start_timer(?TICKER_INTERVAL, self(), ping_ticker),
|
||||||
lager:warning("[iot_host] host_id uuid: ~p, get a unknown message: ~p, session: ~p", [UUID, Message, HasSession]),
|
%% 需要考虑到主机未激活的情况,主机未激活,返回: keep_status
|
||||||
State.
|
NextStatus = if
|
||||||
|
not IsAnswered andalso Status == ?HOST_STATUS_ONLINE ->
|
||||||
|
{change_status, ?HOST_STATUS_OFFLINE};
|
||||||
|
IsAnswered andalso Status == ?HOST_STATUS_OFFLINE ->
|
||||||
|
{change_status, ?HOST_STATUS_ONLINE};
|
||||||
|
true ->
|
||||||
|
keep_status
|
||||||
|
end,
|
||||||
|
|
||||||
%% 记录数据, TODO 转换点位信息, Fields里面包含了 <<"device_id">> 信息
|
case NextStatus of
|
||||||
insert_metrics(UUID, Infos) when is_binary(UUID), is_list(Infos) ->
|
keep_status ->
|
||||||
[insert_metrics0(UUID, Info) || Info <- Infos].
|
{keep_state, State#state{is_answered = false}};
|
||||||
insert_metrics0(UUID, Info = #{<<"service_name">> := ServiceName, <<"fields">> := FieldsList, <<"tags">> := Tags}) when is_binary(ServiceName) ->
|
{change_status, NStatus} ->
|
||||||
Timestamp = maps:get(<<"at">>, Info, iot_util:timestamp()),
|
case host_bo:change_status(UUID, NStatus) of
|
||||||
|
{ok, _} ->
|
||||||
|
{keep_state, State#state{status = NStatus, is_answered = false}};
|
||||||
|
{error, Reason} ->
|
||||||
|
lager:warning("[iot_host] change host status of uuid: ~p, error: ~p", [UUID, Reason]),
|
||||||
|
{keep_state, State#state{is_answered = false}}
|
||||||
|
end
|
||||||
|
end;
|
||||||
|
|
||||||
NTags = Tags#{<<"uuid">> => UUID, <<"service_name">> => ServiceName},
|
handle_event(EventType, EventContent, StateName, State) ->
|
||||||
%% TODO 微服务名前缀作为measurement来保存数据
|
lager:warning("[iot_host] unknown event_type: ~p, event: ~p, state name: ~p, state: ~p", [EventType, EventContent, StateName, State]),
|
||||||
[Measurement | _] = binary:split(ServiceName, <<":">>),
|
|
||||||
|
|
||||||
Points = lists:map(fun(Fields) -> influx_point:new(Measurement, NTags, Fields, Timestamp) end, FieldsList),
|
{keep_state, State}.
|
||||||
Precision = influx_client:get_precision(Timestamp),
|
|
||||||
|
|
||||||
poolboy:transaction(influx_pool, fun(Pid) -> influx_client:write(Pid, <<"iot">>, <<"iot">>, Precision, Points) end).
|
%% @private
|
||||||
|
%% @doc This function is called by a gen_statem when it is about to
|
||||||
|
%% terminate. It should be the opposite of Module:init/1 and do any
|
||||||
|
%% necessary cleaning up. When it returns, the gen_statem terminates with
|
||||||
|
%% Reason. The return value is ignored.
|
||||||
|
terminate(Reason, _StateName, _State = #state{uuid = UUID}) ->
|
||||||
|
lager:debug("[iot_host] terminate with reason: ~p", [Reason]),
|
||||||
|
ChangeResult = host_bo:change_status(UUID, ?HOST_STATUS_OFFLINE),
|
||||||
|
lager:debug("[iot_host] change host: ~p, status result is: ~p", [UUID, ChangeResult]),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
%% @private
|
||||||
|
%% @doc Convert process state when code is changed
|
||||||
|
code_change(_OldVsn, StateName, State = #state{}, _Extra) ->
|
||||||
|
{ok, StateName, State}.
|
||||||
|
|
||||||
|
%%%===================================================================
|
||||||
|
%%% Internal functions
|
||||||
|
%%%===================================================================
|
||||||
@ -1,400 +0,0 @@
|
|||||||
%%%-------------------------------------------------------------------
|
|
||||||
%%% @author aresei
|
|
||||||
%%% @copyright (C) 2023, <COMPANY>
|
|
||||||
%%% @doc
|
|
||||||
%%%
|
|
||||||
%%% @end
|
|
||||||
%%% Created : 19. 6月 2023 10:32
|
|
||||||
%%%-------------------------------------------------------------------
|
|
||||||
-module(iot_host2).
|
|
||||||
-author("aresei").
|
|
||||||
-include("iot.hrl").
|
|
||||||
|
|
||||||
-behaviour(gen_statem).
|
|
||||||
|
|
||||||
%% API
|
|
||||||
-export([start_link/2, get_name/1, get_pid/1, handle/2, reload/1, activate/2]).
|
|
||||||
-export([get_metric/1, aes_encode/3, downstream_topic/1, upstream_topic/1, get_aes/1, make_assoc/1, rsa_encode/3]).
|
|
||||||
-export([has_session/1]).
|
|
||||||
|
|
||||||
%% gen_statem callbacks
|
|
||||||
-export([init/1, format_status/2, handle_event/4, terminate/3, code_change/4, callback_mode/0]).
|
|
||||||
|
|
||||||
-define(SERVER, ?MODULE).
|
|
||||||
|
|
||||||
%% 心跳包的间隔周期, 值需要比host上传的间隔大一些才行
|
|
||||||
-define(TICKER_INTERVAL, 5000 * 2).
|
|
||||||
|
|
||||||
-record(state, {
|
|
||||||
host_id :: integer(),
|
|
||||||
%% 从数据库里面读取到的数据
|
|
||||||
uuid :: binary(),
|
|
||||||
%% 当前的状态
|
|
||||||
status :: integer(),
|
|
||||||
|
|
||||||
%% rsa公钥
|
|
||||||
pub_key = <<>> :: binary(),
|
|
||||||
%% aes的key, 后续通讯需要基于这个加密
|
|
||||||
aes = <<>> :: binary(),
|
|
||||||
|
|
||||||
%% 主机的相关信息
|
|
||||||
metrics = #{} :: map(),
|
|
||||||
|
|
||||||
%% 是否获取到了ping请求
|
|
||||||
is_answered = false :: boolean(),
|
|
||||||
|
|
||||||
%% 任务的自增id
|
|
||||||
increment_id = 1,
|
|
||||||
%% 关联数据
|
|
||||||
assoc_map = #{}
|
|
||||||
}).
|
|
||||||
|
|
||||||
%%%===================================================================
|
|
||||||
%%% API
|
|
||||||
%%%===================================================================
|
|
||||||
-spec get_pid(UUID :: binary()) -> undefined | pid().
|
|
||||||
get_pid(UUID) when is_binary(UUID) ->
|
|
||||||
Name = get_name(UUID),
|
|
||||||
whereis(Name).
|
|
||||||
|
|
||||||
-spec get_name(UUID :: binary()) -> atom().
|
|
||||||
get_name(UUID) when is_binary(UUID) ->
|
|
||||||
binary_to_atom(<<"iot_host:", UUID/binary>>).
|
|
||||||
|
|
||||||
%% 获取主机的下行主题
|
|
||||||
-spec downstream_topic(UUID :: binary()) -> Topic :: binary().
|
|
||||||
downstream_topic(UUID) when is_binary(UUID) ->
|
|
||||||
<<"host/downstream/", UUID/binary>>.
|
|
||||||
|
|
||||||
-spec upstream_topic(UUID :: binary()) -> Topic :: binary().
|
|
||||||
upstream_topic(UUID) when is_binary(UUID) ->
|
|
||||||
<<"host/upstream/", UUID/binary>>.
|
|
||||||
|
|
||||||
%% 处理消息
|
|
||||||
-spec handle(Pid :: pid(), Payload :: binary() | map()) -> no_return().
|
|
||||||
handle(Pid, Payload) when is_pid(Pid), is_binary(Payload); is_map(Payload) ->
|
|
||||||
gen_statem:cast(Pid, {handle, Payload}).
|
|
||||||
|
|
||||||
%% 重新加载主机的基本信息
|
|
||||||
-spec reload(Pid :: pid()) -> ok | {error, Reason :: any()}.
|
|
||||||
reload(Pid) when is_pid(Pid) ->
|
|
||||||
gen_statem:call(Pid, reload).
|
|
||||||
|
|
||||||
-spec get_aes(Pid :: pid()) -> Aes :: binary().
|
|
||||||
get_aes(Pid) when is_pid(Pid) ->
|
|
||||||
gen_statem:call(Pid, get_aes).
|
|
||||||
|
|
||||||
-spec make_assoc(Pid :: pid()) -> {ok, Assoc :: binary()}.
|
|
||||||
make_assoc(Pid) when is_pid(Pid) ->
|
|
||||||
gen_statem:call(Pid, {make_assoc, self()}).
|
|
||||||
|
|
||||||
%% 激活主机, true 表示激活; false表示关闭激活
|
|
||||||
-spec activate(Pid :: pid(), Auth :: boolean()) -> ok.
|
|
||||||
activate(Pid, Auth) when is_pid(Pid), is_boolean(Auth) ->
|
|
||||||
gen_statem:call(Pid, {activate, Auth}).
|
|
||||||
|
|
||||||
-spec get_metric(Pid :: pid()) -> {ok, MetricInfo :: map()}.
|
|
||||||
get_metric(Pid) when is_pid(Pid) ->
|
|
||||||
gen_statem:call(Pid, get_metric).
|
|
||||||
|
|
||||||
-spec has_session(Pid :: pid()) -> boolean().
|
|
||||||
has_session(Pid) when is_pid(Pid) ->
|
|
||||||
gen_statem:call(Pid, has_session).
|
|
||||||
|
|
||||||
%% 基于rsa加密的指令都是不需要会话存在的
|
|
||||||
-spec rsa_encode(Pid :: pid(), CommandType :: integer(), PlainText :: binary()) ->
|
|
||||||
{ok, EncText :: binary()} | {error, Reason :: binary()}.
|
|
||||||
rsa_encode(Pid, CommandType, PlainText) when is_pid(Pid), is_integer(CommandType), is_binary(PlainText) ->
|
|
||||||
gen_statem:call(Pid, {rsa_encode, CommandType, PlainText}).
|
|
||||||
|
|
||||||
-spec aes_encode(Pid :: pid(), CommandType :: integer(), Params :: binary()) ->
|
|
||||||
{ok, Command :: binary()} | {error, Reason :: any()}.
|
|
||||||
aes_encode(Pid, CommandType, Params) when is_pid(Pid), is_integer(CommandType), is_binary(Params) ->
|
|
||||||
gen_statem:call(Pid, {aes_encode, CommandType, Params}).
|
|
||||||
|
|
||||||
|
|
||||||
%% @doc Creates a gen_statem process which calls Module:init/1 to
|
|
||||||
%% initialize. To ensure a synchronized start-up procedure, this
|
|
||||||
%% function does not return until Module:init/1 has returned.
|
|
||||||
start_link(Name, UUID) when is_atom(Name), is_binary(UUID) ->
|
|
||||||
gen_statem:start_link({local, Name}, ?MODULE, [UUID], []).
|
|
||||||
|
|
||||||
%%%===================================================================
|
|
||||||
%%% gen_statem callbacks
|
|
||||||
%%%===================================================================
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or
|
|
||||||
%% gen_statem:start_link/[3,4], this function is called by the new
|
|
||||||
%% process to initialize.
|
|
||||||
init([UUID]) ->
|
|
||||||
case host_bo:get_host_by_uuid(UUID) of
|
|
||||||
{ok, #{<<"status">> := Status, <<"id">> := HostId}} ->
|
|
||||||
%% 启动心跳定时器
|
|
||||||
erlang:start_timer(?TICKER_INTERVAL, self(), ping_ticker),
|
|
||||||
Aes = list_to_binary(iot_util:rand_bytes(32)),
|
|
||||||
%% 告知主机端需要重新授权
|
|
||||||
gen_server:cast(self(), need_auth),
|
|
||||||
|
|
||||||
StateName = case Status =:= ?HOST_STATUS_INACTIVE of
|
|
||||||
true -> denied;
|
|
||||||
false -> activated
|
|
||||||
end,
|
|
||||||
|
|
||||||
{ok, StateName, #state{host_id = HostId, uuid = UUID, aes = Aes, status = Status}};
|
|
||||||
undefined ->
|
|
||||||
lager:warning("[iot_host] host uuid: ~p, loaded from mysql failed", [UUID]),
|
|
||||||
ignore
|
|
||||||
end.
|
|
||||||
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc This function is called by a gen_statem when it needs to find out
|
|
||||||
%% the callback mode of the callback module.
|
|
||||||
callback_mode() ->
|
|
||||||
handle_event_function.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Called (1) whenever sys:get_status/1,2 is called by gen_statem or
|
|
||||||
%% (2) when gen_statem terminates abnormally.
|
|
||||||
%% This callback is optional.
|
|
||||||
format_status(_Opt, [_PDict, _StateName, _State]) ->
|
|
||||||
Status = some_term,
|
|
||||||
Status.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc If callback_mode is handle_event_function, then whenever a
|
|
||||||
%% gen_statem receives an event from call/2, cast/2, or as a normal
|
|
||||||
%% process message, this function is called.
|
|
||||||
handle_event({call, From}, get_metric, session, State = #state{metrics = Metrics}) ->
|
|
||||||
{keep_state, State, [{reply, From, {ok, Metrics}}]};
|
|
||||||
handle_event({call, From}, get_metric, _, State) ->
|
|
||||||
{keep_state, State, [{reply, From, {ok, #{}}}]};
|
|
||||||
|
|
||||||
handle_event({call, From}, get_aes, _, State = #state{aes = Aes}) ->
|
|
||||||
{keep_state, State, [{reply, From, {ok, Aes}}]};
|
|
||||||
|
|
||||||
handle_event({call, From}, has_session, StateName, State) ->
|
|
||||||
HasSession = (StateName =:= session) orelse false,
|
|
||||||
{keep_state, State, [{reply, From, HasSession}]};
|
|
||||||
|
|
||||||
%% 基于rsa加密
|
|
||||||
handle_event({call, From}, {rsa_encode, CommandType, PlainText}, session, State = #state{pub_key = PubKey}) ->
|
|
||||||
Reply = iot_cipher_rsa:encode(PlainText, PubKey),
|
|
||||||
{keep_state, State, [{reply, From, {ok, <<CommandType:8, Reply/binary>>}}]};
|
|
||||||
handle_event({call, From}, {rsa_encode, _, _}, _, State) ->
|
|
||||||
{keep_state, State, [{reply, From, {error, <<"会话未建立"/utf8>>}}]};
|
|
||||||
|
|
||||||
%% 基于aes加密
|
|
||||||
handle_event({call, From}, {aes_encode, CommandType, Command}, session, State = #state{aes = AES}) ->
|
|
||||||
EncCommand = iot_cipher_aes:encrypt(AES, Command),
|
|
||||||
{keep_state, State, [{reply, From, {ok, <<CommandType:8, EncCommand/binary>>}}]};
|
|
||||||
handle_event({call, From}, {aes_encode, _, _}, _, State) ->
|
|
||||||
{keep_state, State, [{reply, From, {error, <<"会话未建立"/utf8>>}}]};
|
|
||||||
|
|
||||||
handle_event({call, From}, reload, StateName, State = #state{uuid = UUID}) ->
|
|
||||||
%% 重新加载主机信息
|
|
||||||
case host_bo:get_host_by_uuid(UUID) of
|
|
||||||
{ok, Host = #{<<"status">> := Status}} ->
|
|
||||||
lager:debug("[iot_host] reload host uuid: ~p, successed", [Host]),
|
|
||||||
case StateName == denied andalso Status =/= ?HOST_STATUS_INACTIVE of
|
|
||||||
true ->
|
|
||||||
{next_state, activated, State#state{status = Status}, [{reply, From, ok}]};
|
|
||||||
false ->
|
|
||||||
{keep_state, State#state{status = Status}, [{reply, From, ok}]}
|
|
||||||
end;
|
|
||||||
undefined ->
|
|
||||||
lager:debug("[iot_host] reload host uuid: ~p, failed", [UUID]),
|
|
||||||
{keep_state, State, [{reply, From, {error, <<"host not found">>}}]}
|
|
||||||
end;
|
|
||||||
|
|
||||||
%% 关闭授权
|
|
||||||
handle_event({call, From}, {activate, false}, _, State) ->
|
|
||||||
{next_state, denied, State#state{pub_key = <<>>}, [{reply, From, ok}]};
|
|
||||||
%% 开启授权
|
|
||||||
handle_event({call, From}, {activate, true}, denied, State) ->
|
|
||||||
{next_state, activated, State, [{reply, From, ok}]};
|
|
||||||
handle_event({call, From}, {activate, true}, _, State) ->
|
|
||||||
{keep_state, State, [{reply, From, ok}]};
|
|
||||||
|
|
||||||
handle_event(cast, need_auth, _StateName, State = #state{uuid = UUID}) ->
|
|
||||||
Reply = jiffy:encode(#{<<"auth">> => false, <<"aes">> => <<"">>}, [force_utf8]),
|
|
||||||
{ok, Ref} = iot_mqtt_publisher:publish(downstream_topic(UUID), <<8:8, Reply/binary>>, 1),
|
|
||||||
receive
|
|
||||||
{ok, Ref, PacketId} ->
|
|
||||||
lager:debug("[iot_host] host_id uuid: ~p, packet_id: ~p, publish need_auth reply success", [UUID, PacketId])
|
|
||||||
after 5000 ->
|
|
||||||
lager:debug("[iot_host] host_id uuid: ~p, publish need_auth reply get error is: timeout", [UUID])
|
|
||||||
end,
|
|
||||||
{keep_state, State};
|
|
||||||
|
|
||||||
%% 需要将消息转换成json格式然后再处理, 需要在host进程里面处理
|
|
||||||
%% 收到消息则认为主机端已经发送了心跳包
|
|
||||||
handle_event(cast, {handle, Payload}, _StateName, State) ->
|
|
||||||
Message = catch jiffy:decode(Payload, [return_maps]),
|
|
||||||
lager:debug("[iot_host] get message: ~p", [Message]),
|
|
||||||
gen_statem:cast(self(), {handle_message, Message}),
|
|
||||||
|
|
||||||
{keep_state, State#state{is_answered = true}};
|
|
||||||
|
|
||||||
handle_event(cast, {handle_message, #{<<"method">> := <<"create_session">>, <<"params">> := #{<<"pub_key">> := PubKey}}}, denied, State = #state{uuid = UUID}) ->
|
|
||||||
lager:debug("[iot_host] host_id uuid: ~p, create_session", [UUID]),
|
|
||||||
Reply = #{<<"a">> => false, <<"aes">> => <<"">>},
|
|
||||||
EncReply = iot_cipher_rsa:encode(Reply, PubKey),
|
|
||||||
|
|
||||||
{ok, Ref} = iot_mqtt_publisher:publish(downstream_topic(UUID), <<10:8, EncReply/binary>>, 1),
|
|
||||||
receive
|
|
||||||
{ok, Ref, PacketId} ->
|
|
||||||
lager:debug("[iot_host] host_id uuid: ~p, packet_id: ~p, publish register reply success", [UUID, PacketId])
|
|
||||||
after 10000 ->
|
|
||||||
lager:debug("[iot_host] host_id uuid: ~p, publish register reply get error is: timeout", [UUID])
|
|
||||||
end,
|
|
||||||
{keep_state, State};
|
|
||||||
|
|
||||||
handle_event(cast, {handle_message, #{<<"method">> := <<"create_session">>, <<"params">> := #{<<"pub_key">> := PubKey}}}, _StateName, State = #state{uuid = UUID, aes = Aes}) ->
|
|
||||||
lager:debug("[iot_host] host_id uuid: ~p, create_session", [UUID]),
|
|
||||||
Reply = #{<<"a">> => true, <<"aes">> => Aes},
|
|
||||||
EncReply = iot_cipher_rsa:encode(Reply, PubKey),
|
|
||||||
|
|
||||||
{ok, Ref} = iot_mqtt_publisher:publish(downstream_topic(UUID), <<10:8, EncReply/binary>>, 1),
|
|
||||||
receive
|
|
||||||
{ok, Ref, PacketId} ->
|
|
||||||
lager:debug("[iot_host] host_id uuid: ~p, packet_id: ~p, publish register reply success", [UUID, PacketId]),
|
|
||||||
{next_state, session, State#state{pub_key = PubKey}}
|
|
||||||
after 10000 ->
|
|
||||||
lager:debug("[iot_host] host_id uuid: ~p, publish register reply get error is: timeout", [UUID]),
|
|
||||||
{keep_state, State}
|
|
||||||
end;
|
|
||||||
|
|
||||||
handle_event(cast, {handle_message, #{<<"method">> := <<"data">>, <<"params">> := Data}}, session, State = #state{uuid = UUID, aes = AES}) ->
|
|
||||||
PlainData = iot_cipher_aes:decrypt(AES, base64:decode(Data)),
|
|
||||||
case catch jiffy:decode(PlainData, [return_maps]) of
|
|
||||||
Infos when is_list(Infos) ->
|
|
||||||
lager:debug("[iot_host] the data is: ~p", [Infos]),
|
|
||||||
%% 记录数据, TODO 转换点位信息, Fields里面包含了 <<"device_id">> 信息
|
|
||||||
lists:foreach(fun(Info = #{<<"service_name">> := ServiceName, <<"fields">> := FieldsList, <<"tags">> := Tags}) when is_binary(ServiceName) ->
|
|
||||||
Timestamp = maps:get(<<"at">>, Info, iot_util:timestamp()),
|
|
||||||
|
|
||||||
NTags = Tags#{<<"uuid">> => UUID, <<"service_name">> => ServiceName},
|
|
||||||
%% 微服务名前缀作为measurement来保存数据
|
|
||||||
[Measurement | _] = binary:split(ServiceName, <<":">>),
|
|
||||||
|
|
||||||
Points = lists:map(fun(Fields) -> influx_point:new(Measurement, NTags, Fields, Timestamp) end, FieldsList),
|
|
||||||
Precision = influx_client:get_precision(Timestamp),
|
|
||||||
|
|
||||||
poolboy:transaction(influx_pool, fun(Pid) -> influx_client:write(Pid, <<"iot">>, <<"iot">>, Precision, Points) end)
|
|
||||||
end, Infos);
|
|
||||||
Other ->
|
|
||||||
lager:debug("[iot_message_handler] the metric is invalid json: ~p", [Other])
|
|
||||||
end,
|
|
||||||
{keep_state, State};
|
|
||||||
|
|
||||||
handle_event(cast, {handle_message, #{<<"method">> := <<"ping">>, <<"params">> := CipherMetric}}, session, State = #state{uuid = UUID, aes = AES}) ->
|
|
||||||
MetricsInfo = iot_cipher_aes:decrypt(AES, base64:decode(CipherMetric)),
|
|
||||||
Metrics = jiffy:decode(MetricsInfo, [return_maps]),
|
|
||||||
|
|
||||||
lager:debug("[iot_host] host_id uuid: ~p, get ping: ~p", [UUID, Metrics]),
|
|
||||||
{keep_state, State#state{metrics = Metrics}};
|
|
||||||
|
|
||||||
handle_event(cast, {handle_message, #{<<"method">> := <<"inform">>, <<"params">> := Info0}}, session, State = #state{host_id = HostId, aes = AES}) ->
|
|
||||||
Info = iot_cipher_aes:decrypt(AES, base64:decode(Info0)),
|
|
||||||
case catch jiffy:decode(Info, [return_maps]) of
|
|
||||||
#{<<"at">> := At, <<"services">> := ServiceInforms} ->
|
|
||||||
lists:foreach(fun(#{<<"props">> := Props, <<"name">> := Name, <<"version">> := Version, <<"version_copy">> := VersionCopy, <<"status">> := Status}) ->
|
|
||||||
%% props 主机id:场景id:微服务id
|
|
||||||
[_, SceneId0, MicroId0] = binary:split(Props, <<":">>, [global]),
|
|
||||||
SceneId = binary_to_integer(SceneId0),
|
|
||||||
MicroId = binary_to_integer(MicroId0),
|
|
||||||
|
|
||||||
micro_inform_log:insert(#{
|
|
||||||
<<"host_id">> => HostId,
|
|
||||||
<<"scene_id">> => SceneId,
|
|
||||||
<<"service_name">> => Name,
|
|
||||||
<<"version">> => Version,
|
|
||||||
<<"version_copy">> => VersionCopy,
|
|
||||||
<<"status">> => Status,
|
|
||||||
<<"created_at">> => At
|
|
||||||
}),
|
|
||||||
micro_set_bo:change_status(HostId, SceneId, MicroId, Status)
|
|
||||||
end, ServiceInforms);
|
|
||||||
Error ->
|
|
||||||
lager:warning("[iot_host] inform get error: ~p", [Error])
|
|
||||||
end,
|
|
||||||
{keep_state, State};
|
|
||||||
|
|
||||||
handle_event(cast, {handle_message, #{<<"method">> := <<"feedback_result">>, <<"params">> := Info0}}, session, State = #state{aes = AES}) ->
|
|
||||||
Info = iot_cipher_aes:decrypt(AES, base64:decode(Info0)),
|
|
||||||
case catch jiffy:decode(Info, [return_maps]) of
|
|
||||||
#{<<"task_id">> := TaskId, <<"time">> := Time, <<"code">> := Code, <<"reason">> := Reason, <<"error">> := Error, <<"type">> := Type} ->
|
|
||||||
scene_feedback:insert(#{
|
|
||||||
<<"task_id">> => TaskId,
|
|
||||||
<<"task_type">> => Type,
|
|
||||||
<<"code">> => Code,
|
|
||||||
<<"reason">> => Reason,
|
|
||||||
<<"error">> => Error,
|
|
||||||
<<"created_at">> => Time
|
|
||||||
});
|
|
||||||
Other ->
|
|
||||||
lager:warning("[iot_host] feedback_result error: ~p", [Other])
|
|
||||||
end,
|
|
||||||
{keep_state, State};
|
|
||||||
|
|
||||||
%% 处理客户端激活的响应, 完整格式为: {"code": 0|1, "message": "", "assoc": string}
|
|
||||||
handle_event(cast, {handle_message, Msg = #{<<"code">> := _Code, <<"assoc">> := Assoc}}, _, State = #state{assoc_map = AssocMap}) ->
|
|
||||||
case maps:take(Assoc, AssocMap) of
|
|
||||||
error ->
|
|
||||||
{keep_state, State};
|
|
||||||
{ReceiverPid, NAssocMap} ->
|
|
||||||
ReceiverPid ! {host_reply, Assoc, Msg},
|
|
||||||
{keep_state, State#state{assoc_map = NAssocMap}}
|
|
||||||
end;
|
|
||||||
|
|
||||||
handle_event(info, {timeout, _, ping_ticker}, _StateName, State = #state{uuid = UUID, is_answered = IsAnswered, status = Status}) ->
|
|
||||||
erlang:start_timer(?TICKER_INTERVAL, self(), ping_ticker),
|
|
||||||
%% 需要考虑到主机未激活的情况,主机未激活,返回: keep_status
|
|
||||||
NextStatus = if
|
|
||||||
not IsAnswered andalso Status == ?HOST_STATUS_ONLINE ->
|
|
||||||
{change_status, ?HOST_STATUS_OFFLINE};
|
|
||||||
IsAnswered andalso Status == ?HOST_STATUS_OFFLINE ->
|
|
||||||
{change_status, ?HOST_STATUS_ONLINE};
|
|
||||||
true ->
|
|
||||||
keep_status
|
|
||||||
end,
|
|
||||||
|
|
||||||
case NextStatus of
|
|
||||||
keep_status ->
|
|
||||||
{keep_state, State#state{is_answered = false}};
|
|
||||||
{change_status, NStatus} ->
|
|
||||||
case host_bo:change_status(UUID, NStatus) of
|
|
||||||
{ok, _} ->
|
|
||||||
{keep_state, State#state{status = NStatus, is_answered = false}};
|
|
||||||
{error, Reason} ->
|
|
||||||
lager:warning("[iot_host] change host status of uuid: ~p, error: ~p", [UUID, Reason]),
|
|
||||||
{keep_state, State#state{is_answered = false}}
|
|
||||||
end
|
|
||||||
end;
|
|
||||||
|
|
||||||
handle_event(EventType, EventContent, StateName, State) ->
|
|
||||||
lager:warning("[iot_host] unknown event_type: ~p, event: ~p, state name: ~p, state: ~p", [EventType, EventContent, StateName, State]),
|
|
||||||
|
|
||||||
{keep_state, State}.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc This function is called by a gen_statem when it is about to
|
|
||||||
%% terminate. It should be the opposite of Module:init/1 and do any
|
|
||||||
%% necessary cleaning up. When it returns, the gen_statem terminates with
|
|
||||||
%% Reason. The return value is ignored.
|
|
||||||
terminate(Reason, _StateName, _State = #state{uuid = UUID}) ->
|
|
||||||
lager:debug("[iot_host] terminate with reason: ~p", [Reason]),
|
|
||||||
ChangeResult = host_bo:change_status(UUID, ?HOST_STATUS_OFFLINE),
|
|
||||||
lager:debug("[iot_host] change host: ~p, status result is: ~p", [UUID, ChangeResult]),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Convert process state when code is changed
|
|
||||||
code_change(_OldVsn, StateName, State = #state{}, _Extra) ->
|
|
||||||
{ok, StateName, State}.
|
|
||||||
|
|
||||||
%%%===================================================================
|
|
||||||
%%% Internal functions
|
|
||||||
%%%===================================================================
|
|
||||||
Loading…
x
Reference in New Issue
Block a user