fix host2

This commit is contained in:
anlicheng 2023-06-19 11:47:03 +08:00
parent d6b82f7ab2
commit 37248fc2e3

397
apps/iot/src/iot_host2.erl Normal file
View File

@ -0,0 +1,397 @@
%%%-------------------------------------------------------------------
%%% @author aresei
%%% @copyright (C) 2023, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 19. 6 2023 10:32
%%%-------------------------------------------------------------------
-module(iot_host2).
-author("aresei").
-include("iot.hrl").
-behaviour(gen_statem).
%% API
-export([start_link/2, get_name/1, get_pid/1, handle/2, reload/1, activate/2]).
-export([get_metric/1, aes_encode/3, downstream_topic/1, upstream_topic/1, get_aes/1, make_assoc/1, rsa_encode/3]).
-export([has_session/1]).
%% gen_statem callbacks
-export([init/1, format_status/2, handle_event/4, terminate/3, code_change/4, callback_mode/0]).
-define(SERVER, ?MODULE).
%% , host上传的间隔大一些才行
-define(TICKER_INTERVAL, 5000 * 2).
-record(state, {
host_id :: integer(),
%%
uuid :: binary(),
%%
status :: integer(),
%% rsa公钥
pub_key = <<>> :: binary(),
%% aes的key,
aes = <<>> :: binary(),
%%
metrics = #{} :: map(),
%% ping请求
is_answered = false :: boolean(),
%% id
increment_id = 1,
%%
assoc_map = #{}
}).
%%%===================================================================
%%% API
%%%===================================================================
-spec get_pid(UUID :: binary()) -> undefined | pid().
get_pid(UUID) when is_binary(UUID) ->
Name = get_name(UUID),
whereis(Name).
-spec get_name(UUID :: binary()) -> atom().
get_name(UUID) when is_binary(UUID) ->
binary_to_atom(<<"iot_host:", UUID/binary>>).
%%
-spec downstream_topic(UUID :: binary()) -> Topic :: binary().
downstream_topic(UUID) when is_binary(UUID) ->
<<"host/downstream/", UUID/binary>>.
-spec upstream_topic(UUID :: binary()) -> Topic :: binary().
upstream_topic(UUID) when is_binary(UUID) ->
<<"host/upstream/", UUID/binary>>.
%%
-spec handle(Pid :: pid(), Payload :: binary() | map()) -> no_return().
handle(Pid, Payload) when is_pid(Pid), is_binary(Payload); is_map(Payload) ->
gen_statem:cast(Pid, {handle, Payload}).
%%
-spec reload(Pid :: pid()) -> ok | {error, Reason :: any()}.
reload(Pid) when is_pid(Pid) ->
gen_statem:call(Pid, reload).
-spec get_aes(Pid :: pid()) -> Aes :: binary().
get_aes(Pid) when is_pid(Pid) ->
gen_statem:call(Pid, get_aes).
-spec make_assoc(Pid :: pid()) -> {ok, Assoc :: binary()}.
make_assoc(Pid) when is_pid(Pid) ->
gen_statem:call(Pid, {make_assoc, self()}).
%% , true false表示关闭激活
-spec activate(Pid :: pid(), Auth :: boolean()) -> ok.
activate(Pid, Auth) when is_pid(Pid), is_boolean(Auth) ->
gen_statem:call(Pid, {activate, Auth}).
-spec get_metric(Pid :: pid()) -> {ok, MetricInfo :: map()}.
get_metric(Pid) when is_pid(Pid) ->
gen_statem:call(Pid, get_metric).
-spec has_session(Pid :: pid()) -> boolean().
has_session(Pid) when is_pid(Pid) ->
gen_statem:call(Pid, has_session).
%% rsa加密的指令都是不需要会话存在的
-spec rsa_encode(Pid :: pid(), CommandType :: integer(), PlainText :: binary()) ->
{ok, EncText :: binary()} | {error, Reason :: binary()}.
rsa_encode(Pid, CommandType, PlainText) when is_pid(Pid), is_integer(CommandType), is_binary(PlainText) ->
gen_statem:call(Pid, {rsa_encode, CommandType, PlainText}).
-spec aes_encode(Pid :: pid(), CommandType :: integer(), Params :: binary()) ->
{ok, Command :: binary()} | {error, Reason :: any()}.
aes_encode(Pid, CommandType, Params) when is_pid(Pid), is_integer(CommandType), is_binary(Params) ->
gen_statem:call(Pid, {aes_encode, CommandType, Params}).
%% @doc Creates a gen_statem process which calls Module:init/1 to
%% initialize. To ensure a synchronized start-up procedure, this
%% function does not return until Module:init/1 has returned.
start_link(Name, UUID) when is_atom(Name), is_binary(UUID) ->
gen_statem:start_link({local, Name}, ?MODULE, [UUID], []).
%%%===================================================================
%%% gen_statem callbacks
%%%===================================================================
%% @private
%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or
%% gen_statem:start_link/[3,4], this function is called by the new
%% process to initialize.
init([UUID]) ->
case host_bo:get_host_by_uuid(UUID) of
{ok, #{<<"status">> := Status, <<"id">> := HostId}} ->
%%
erlang:start_timer(?TICKER_INTERVAL, self(), ping_ticker),
Aes = list_to_binary(iot_util:rand_bytes(32)),
%%
gen_server:cast(self(), need_auth),
StateName = case Status =:= ?HOST_STATUS_INACTIVE of
true -> denied;
false -> activated
end,
{ok, StateName, #state{host_id = HostId, uuid = UUID, aes = Aes, status = Status}};
undefined ->
lager:warning("[iot_host] host uuid: ~p, loaded from mysql failed", [UUID]),
ignore
end.
%% @private
%% @doc This function is called by a gen_statem when it needs to find out
%% the callback mode of the callback module.
callback_mode() ->
handle_event_function.
%% @private
%% @doc Called (1) whenever sys:get_status/1,2 is called by gen_statem or
%% (2) when gen_statem terminates abnormally.
%% This callback is optional.
format_status(_Opt, [_PDict, _StateName, _State]) ->
Status = some_term,
Status.
%% @private
%% @doc If callback_mode is handle_event_function, then whenever a
%% gen_statem receives an event from call/2, cast/2, or as a normal
%% process message, this function is called.
handle_event({call, From}, get_metric, session, State = #state{metrics = Metrics}) ->
{keep_state, State, [{reply, From, {ok, Metrics}}]};
handle_event({call, From}, get_metric, _, State) ->
{keep_state, State, [{reply, From, {ok, #{}}}]};
handle_event({call, From}, get_aes, _, State = #state{aes = Aes}) ->
{keep_state, State, [{reply, From, {ok, Aes}}]};
handle_event({call, From}, has_session, StateName, State) ->
HasSession = (StateName =:= session) orelse false,
{keep_state, State, [{reply, From, HasSession}]};
%% rsa加密
handle_event({call, From}, {rsa_encode, CommandType, PlainText}, session, State = #state{pub_key = PubKey}) ->
Reply = iot_cipher_rsa:encode(PlainText, PubKey),
{keep_state, State, [{reply, From, {ok, <<CommandType:8, Reply/binary>>}}]};
handle_event({call, From}, {rsa_encode, _, _}, _, State) ->
{keep_state, State, [{reply, From, {error, <<"会话未建立"/utf8>>}}]};
%% aes加密
handle_event({call, From}, {aes_encode, CommandType, Command}, session, State = #state{aes = AES}) ->
EncCommand = iot_cipher_aes:encrypt(AES, Command),
{keep_state, State, [{reply, From, {ok, <<CommandType:8, EncCommand/binary>>}}]};
handle_event({call, From}, {aes_encode, _, _}, _, State) ->
{keep_state, State, [{reply, From, {error, <<"会话未建立"/utf8>>}}]};
handle_event({call, From}, reload, StateName, State = #state{uuid = UUID}) ->
%%
case host_bo:get_host_by_uuid(UUID) of
{ok, Host = #{<<"status">> := Status}} ->
lager:debug("[iot_host] reload host uuid: ~p, successed", [Host]),
case StateName == denied andalso Status =/= ?HOST_STATUS_INACTIVE of
true ->
{next_state, activated, State#state{status = Status}, [{reply, From, ok}]};
false ->
{keep_state, State#state{status = Status}, [{reply, From, ok}]}
end;
undefined ->
lager:debug("[iot_host] reload host uuid: ~p, failed", [UUID]),
{keep_state, State, [{reply, From, {error, <<"host not found">>}}]}
end;
%%
handle_event({call, From}, {activate, false}, _, State) ->
{next_state, denied, State#state{pub_key = <<>>}, [{reply, From, ok}]};
%%
handle_event({call, From}, {activate, true}, denied, State) ->
{next_state, activated, State, [{reply, From, ok}]};
handle_event({call, From}, {activate, true}, _, State) ->
{keep_state, State, [{reply, From, ok}]};
handle_event(cast, need_auth, _StateName, State = #state{uuid = UUID}) ->
Reply = jiffy:encode(#{<<"auth">> => false, <<"aes">> => <<"">>}, [force_utf8]),
{ok, Ref} = iot_mqtt_publisher:publish(downstream_topic(UUID), <<8:8, Reply/binary>>, 1),
receive
{ok, Ref, PacketId} ->
lager:debug("[iot_host] host_id uuid: ~p, packet_id: ~p, publish need_auth reply success", [UUID, PacketId])
after 5000 ->
lager:debug("[iot_host] host_id uuid: ~p, publish need_auth reply get error is: timeout", [UUID])
end,
{keep_state, State};
%% json格式然后再处理, host进程里面处理
%%
handle_event(cast, {handle, Payload}, _StateName, State) ->
Message = catch jiffy:decode(Payload, [return_maps]),
lager:debug("[iot_host] get message: ~p", [Message]),
gen_statem:cast(self(), {handle_message, Message}),
{keep_state, State#state{is_answered = true}};
handle_event(cast, {handle_message, #{<<"method">> := <<"create_session">>, <<"params">> := #{<<"pub_key">> := PubKey}}}, denied, State = #state{uuid = UUID}) ->
lager:debug("[iot_host] host_id uuid: ~p, create_session", [UUID]),
Reply = #{<<"a">> => false, <<"aes">> => <<"">>},
EncReply = iot_cipher_rsa:encode(Reply, PubKey),
{ok, Ref} = iot_mqtt_publisher:publish(downstream_topic(UUID), <<10:8, EncReply/binary>>, 1),
receive
{ok, Ref, PacketId} ->
lager:debug("[iot_host] host_id uuid: ~p, packet_id: ~p, publish register reply success", [UUID, PacketId])
after 10000 ->
lager:debug("[iot_host] host_id uuid: ~p, publish register reply get error is: timeout", [UUID])
end,
{keep_state, State};
handle_event(cast, {handle_message, #{<<"method">> := <<"create_session">>, <<"params">> := #{<<"pub_key">> := PubKey}}}, _StateName, State = #state{uuid = UUID, aes = Aes}) ->
lager:debug("[iot_host] host_id uuid: ~p, create_session", [UUID]),
Reply = #{<<"a">> => true, <<"aes">> => Aes},
EncReply = iot_cipher_rsa:encode(Reply, PubKey),
{ok, Ref} = iot_mqtt_publisher:publish(downstream_topic(UUID), <<10:8, EncReply/binary>>, 1),
receive
{ok, Ref, PacketId} ->
lager:debug("[iot_host] host_id uuid: ~p, packet_id: ~p, publish register reply success", [UUID, PacketId]),
{next_state, session, State#state{pub_key = PubKey}}
after 10000 ->
lager:debug("[iot_host] host_id uuid: ~p, publish register reply get error is: timeout", [UUID]),
{keep_state, State}
end;
handle_event(cast, {handle_message, #{<<"method">> := <<"data">>, <<"params">> := Data}}, session, State = #state{uuid = UUID, aes = AES}) ->
PlainData = iot_cipher_aes:decrypt(AES, base64:decode(Data)),
case catch jiffy:decode(PlainData, [return_maps]) of
Infos when is_list(Infos) ->
lager:debug("[iot_host] the data is: ~p", [Infos]),
%% , TODO , Fields里面包含了 <<"device_id">>
lists:foreach(fun(Info = #{<<"service_name">> := ServiceName, <<"fields">> := FieldsList, <<"tags">> := Tags}) when is_binary(ServiceName) ->
Timestamp = maps:get(<<"at">>, Info, iot_util:timestamp()),
NTags = Tags#{<<"uuid">> => UUID, <<"service_name">> => ServiceName},
%% measurement来保存数据
[Measurement | _] = binary:split(ServiceName, <<":">>),
Points = lists:map(fun(Fields) -> influx_point:new(Measurement, NTags, Fields, Timestamp) end, FieldsList),
Precision = influx_client:get_precision(Timestamp),
poolboy:transaction(influx_pool, fun(Pid) -> influx_client:write(Pid, <<"iot">>, <<"iot">>, Precision, Points) end)
end, Infos);
Other ->
lager:debug("[iot_message_handler] the metric is invalid json: ~p", [Other])
end,
{keep_state, State};
handle_event(cast, {handle_message, #{<<"method">> := <<"ping">>, <<"params">> := CipherMetric}}, session, State = #state{uuid = UUID, aes = AES}) ->
MetricsInfo = iot_cipher_aes:decrypt(AES, base64:decode(CipherMetric)),
Metrics = jiffy:decode(MetricsInfo, [return_maps]),
lager:debug("[iot_host] host_id uuid: ~p, get ping: ~p", [UUID, Metrics]),
{keep_state, State#state{metrics = Metrics}};
handle_event(cast, {handle_message, #{<<"method">> := <<"inform">>, <<"params">> := Info0}}, session, State = #state{host_id = HostId, aes = AES}) ->
Info = iot_cipher_aes:decrypt(AES, base64:decode(Info0)),
case catch jiffy:decode(Info, [return_maps]) of
#{<<"at">> := At, <<"services">> := ServiceInforms} ->
lists:foreach(fun(#{<<"props">> := Props, <<"name">> := Name, <<"version">> := Version, <<"version_copy">> := VersionCopy, <<"status">> := Status}) ->
%% props id:id:id
[_, SceneId, MicroId] = binary:split(Props, <<":">>, [global]),
micro_inform_log:insert(#{
<<"host_id">> => HostId,
<<"scene_id">> => SceneId,
<<"service_name">> => Name,
<<"version">> => Version,
<<"version_copy">> => VersionCopy,
<<"status">> => Status,
<<"created_at">> => At
}),
micro_set_bo:change_status(HostId, SceneId, MicroId, Status)
end, ServiceInforms);
Error ->
lager:warning("[iot_host] inform get error: ~p", [Error])
end,
{keep_state, State};
handle_event(cast, {handle_message, #{<<"method">> := <<"feedback_result">>, <<"params">> := Info0}}, session, State = #state{aes = AES}) ->
Info = iot_cipher_aes:decrypt(AES, base64:decode(Info0)),
case catch jiffy:decode(Info, [return_maps]) of
#{<<"task_id">> := TaskId, <<"time">> := Time, <<"code">> := Code, <<"reason">> := Reason, <<"error">> := Error, <<"type">> := Type} ->
scene_feedback:insert(#{
<<"task_id">> => TaskId,
<<"task_type">> => Type,
<<"code">> => Code,
<<"reason">> => Reason,
<<"error">> => Error,
<<"created_at">> => Time
});
Other ->
lager:warning("[iot_host] feedback_result error: ~p", [Other])
end,
{keep_state, State};
%% , : {"code": 0|1, "message": "", "assoc": string}
handle_event(cast, {handle_message, Msg = #{<<"code">> := _Code, <<"assoc">> := Assoc}}, _, State = #state{assoc_map = AssocMap}) ->
case maps:take(Assoc, AssocMap) of
error ->
{keep_state, State};
{ReceiverPid, NAssocMap} ->
ReceiverPid ! {host_reply, Assoc, Msg},
{keep_state, State#state{assoc_map = NAssocMap}}
end;
handle_event(info, {timeout, _, ping_ticker}, _StateName, State = #state{uuid = UUID, is_answered = IsAnswered, status = Status}) ->
erlang:start_timer(?TICKER_INTERVAL, self(), ping_ticker),
%% : keep_status
NextStatus = if
not IsAnswered andalso Status == ?HOST_STATUS_ONLINE ->
{change_status, ?HOST_STATUS_OFFLINE};
IsAnswered andalso Status == ?HOST_STATUS_OFFLINE ->
{change_status, ?HOST_STATUS_ONLINE};
true ->
keep_status
end,
case NextStatus of
keep_status ->
{keep_state, State#state{is_answered = false}};
{change_status, NStatus} ->
case host_bo:change_status(UUID, NStatus) of
{ok, _} ->
{keep_state, State#state{status = NStatus, is_answered = false}};
{error, Reason} ->
lager:warning("[iot_host] change host status of uuid: ~p, error: ~p", [UUID, Reason]),
{keep_state, State#state{is_answered = false}}
end
end;
handle_event(EventType, EventContent, StateName, State) ->
lager:warning("[iot_host] unknown event_type: ~p, event: ~p, state name: ~p, state: ~p", [EventType, EventContent, StateName, State]),
{keep_state, State}.
%% @private
%% @doc This function is called by a gen_statem when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_statem terminates with
%% Reason. The return value is ignored.
terminate(Reason, _StateName, _State = #state{uuid = UUID}) ->
lager:debug("[iot_host] terminate with reason: ~p", [Reason]),
ChangeResult = host_bo:change_status(UUID, ?HOST_STATUS_OFFLINE),
lager:debug("[iot_host] change host: ~p, status result is: ~p", [UUID, ChangeResult]),
ok.
%% @private
%% @doc Convert process state when code is changed
code_change(_OldVsn, StateName, State = #state{}, _Extra) ->
{ok, StateName, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================