This commit is contained in:
anlicheng 2025-04-22 14:23:57 +08:00
parent aa9b4dde7d
commit 66d78e1929

View File

@ -9,6 +9,7 @@
-module(iot_host).
-author("aresei").
-include("iot.hrl").
-include("message_pb.hrl").
-behaviour(gen_statem).
@ -21,7 +22,7 @@
%% API
-export([start_link/2, get_name/1, get_alias_name/1, get_pid/1, handle/2, activate/2]).
-export([get_metric/1, publish_message/4, get_aes/1, get_status/1]).
-export([get_metric/1, publish_message/4, get_status/1]).
-export([publish_directive/6, send_directive/5]).
-export([attach_channel/2]).
-export([reload_device/2, delete_device/2, activate_device/3]).
@ -34,8 +35,6 @@
host_id :: integer(),
%%
uuid :: binary(),
%% aes的key,
aes = <<>> :: binary(),
has_session = false :: boolean(),
%%
heartbeat_counter = 0 :: integer(),
@ -67,10 +66,6 @@ get_alias_name(HostId0) when is_integer(HostId0) ->
handle(Pid, Packet) when is_pid(Pid) ->
gen_statem:cast(Pid, {handle, Packet}).
-spec get_aes(Pid :: pid()) -> {ok, Aes :: binary()}.
get_aes(Pid) when is_pid(Pid) ->
gen_statem:call(Pid, get_aes).
-spec get_status(Pid :: pid()) -> {ok, Status :: map()}.
get_status(Pid) when is_pid(Pid) ->
gen_statem:call(Pid, get_status).
@ -197,7 +192,7 @@ init([UUID]) ->
true -> ?STATE_ACTIVATED;
false -> ?STATE_DENIED
end,
{ok, StateName, #state{host_id = HostId, uuid = UUID, aes = Aes, has_session = false}};
{ok, StateName, #state{host_id = HostId, uuid = UUID, has_session = false}};
undefined ->
lager:warning("[iot_host] host uuid: ~p, loaded from mysql failed", [UUID]),
ignore
@ -217,9 +212,6 @@ callback_mode() ->
handle_event({call, From}, get_metric, _, State = #state{metrics = Metrics}) ->
{keep_state, State, [{reply, From, {ok, Metrics}}]};
handle_event({call, From}, get_aes, _, State = #state{aes = Aes}) ->
{keep_state, State, [{reply, From, {ok, Aes}}]};
%%
handle_event({call, From}, get_status, _, State = #state{host_id = HostId, channel_pid = ChannelPid, heartbeat_counter = HeartbeatCounter, metrics = Metrics, has_session = HasSession}) ->
%% devices
@ -242,20 +234,9 @@ handle_event({call, From}, get_status, _, State = #state{host_id = HostId, chann
},
{keep_state, State, [{reply, From, {ok, Reply}}]};
%% , ; aes类型的命令的时候session是存在的
handle_event({call, From}, {publish_message, ReceiverPid, CommandType, {aes, Command0}}, ?STATE_ACTIVATED,
State = #state{uuid = UUID, aes = AES, channel_pid = ChannelPid, has_session = true}) ->
lager:debug("[iot_host] host: ~p, will publish aes message: ~p", [UUID, Command0]),
Command = iot_cipher_aes:encrypt(AES, Command0),
%% websocket发送请求
Ref = ws_channel:publish(ChannelPid, ReceiverPid, <<CommandType:8, Command/binary>>),
{keep_state, State, [{reply, From, {ok, Ref}}]};
%% channel存在
handle_event({call, From}, {publish_message, ReceiverPid, CommandType, Command}, _,
State = #state{uuid = UUID, channel_pid = ChannelPid}) when is_binary(Command), is_pid(ChannelPid) ->
State = #state{uuid = UUID, channel_pid = ChannelPid, has_session = true}) when is_binary(Command), is_pid(ChannelPid) ->
%% websocket发送请求
lager:debug("[iot_host] host: ~p, will publish message: ~p", [UUID, Command]),
@ -268,11 +249,10 @@ handle_event({call, From}, {publish_message, _, _, _}, _, State = #state{uuid =
{keep_state, State, [{reply, From, {error, <<"主机离线,发送命令失败"/utf8>>}}]};
%% , aes加密session是存在的
handle_event({call, From}, {publish_directive, ReceiverPid, Directive0}, ?STATE_ACTIVATED,
State = #state{uuid = UUID, aes = AES, channel_pid = ChannelPid, has_session = true}) ->
handle_event({call, From}, {publish_directive, ReceiverPid, Directive}, ?STATE_ACTIVATED,
State = #state{uuid = UUID, channel_pid = ChannelPid, has_session = true}) ->
lager:debug("[iot_host] host: ~p, will publish_directive: ~p", [UUID, Directive0]),
Directive = iot_cipher_aes:encrypt(AES, Directive0),
lager:debug("[iot_host] host: ~p, will publish_directive: ~p", [UUID, Directive]),
%% websocket发送请求
Ref = ws_channel:publish(ChannelPid, ReceiverPid, <<16:8, Directive/binary>>),
@ -284,11 +264,10 @@ handle_event({call, From}, {publish_directive, _, Directive}, _, State = #state{
{keep_state, State, [{reply, From, {error, <<"主机离线,发送指令失败"/utf8>>}}]};
%% , aes加密session是存在的
handle_event({call, From}, {send_directive, Directive0}, ?STATE_ACTIVATED,
State = #state{uuid = UUID, aes = AES, channel_pid = ChannelPid, has_session = true}) ->
handle_event({call, From}, {send_directive, Directive}, ?STATE_ACTIVATED,
State = #state{uuid = UUID, channel_pid = ChannelPid, has_session = true}) ->
lager:debug("[iot_host] host: ~p, will publish_directive: ~p", [UUID, Directive0]),
Directive = iot_cipher_aes:encrypt(AES, Directive0),
lager:debug("[iot_host] host: ~p, will publish_directive: ~p", [UUID, Directive]),
%% websocket发送请求
ws_channel:send(ChannelPid, <<16:8, Directive/binary>>),
@ -300,7 +279,7 @@ handle_event({call, From}, {send_directive, Directive}, _, State = #state{uuid =
{keep_state, State, [{reply, From, {error, <<"主机离线,发送指令失败"/utf8>>}}]};
%%
handle_event({call, From}, {activate, true}, _, State = #state{uuid = UUID, aes = Aes, channel_pid = ChannelPid}) when is_pid(ChannelPid) ->
handle_event({call, From}, {activate, true}, _, State = #state{uuid = UUID, channel_pid = ChannelPid}) when is_pid(ChannelPid) ->
BinReply = jiffy:encode(#{<<"auth">> => true, <<"aes">> => Aes}, [force_utf8]),
ws_channel:send(ChannelPid, <<8:8, BinReply/binary>>),
lager:debug("[iot_host] uuid: ~p, activate: true, will send message: ~p", [UUID, BinReply]),
@ -372,34 +351,18 @@ handle_event({call, From}, {activate_device, DeviceUUID, Auth}, _, State) ->
end;
%% json格式然后再处理, host进程里面处理, props
handle_event(cast, {handle, {data, Data}}, ?STATE_ACTIVATED, State = #state{aes = AES, has_session = true}) ->
PlainData = iot_cipher_aes:decrypt(AES, Data),
case catch jiffy:decode(PlainData, [return_maps]) of
Info when is_map(Info) ->
handle_data(Info, State);
Other ->
lager:notice("[iot_host] the data is invalid json: ~p", [Other])
end,
handle_event(cast, {handle, {data, Data}}, ?STATE_ACTIVATED, State = #state{has_session = true}) ->
handle_data(Data, State),
{keep_state, State};
%% ping的数据是通过aes加密后的
handle_event(cast, {handle, {ping, CipherMetric}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, aes = AES, has_session = true}) ->
MetricsInfo = iot_cipher_aes:decrypt(AES, CipherMetric),
case catch jiffy:decode(MetricsInfo, [return_maps]) of
Metrics when is_map(Metrics) ->
handle_event(cast, {handle, {ping, Metrics}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true}) ->
lager:debug("[iot_host] host_id uuid: ~p, get ping: ~p", [UUID, Metrics]),
{keep_state, State#state{metrics = Metrics}};
Other ->
lager:warning("[iot_host] host_id: ~p, ping is invalid json: ~p", [UUID, Other]),
{keep_state, State}
end;
handle_event(cast, {handle, {inform, Info0}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, host_id = HostId, aes = AES, has_session = true}) ->
Info = iot_cipher_aes:decrypt(AES, Info0),
case catch jiffy:decode(Info, [return_maps]) of
#{<<"at">> := At, <<"services">> := ServiceInforms} ->
lager:debug("[iot_host] host: ~p, service infos is: ~p", [UUID, ServiceInforms]),
lists:foreach(fun(#{<<"props">> := Props, <<"name">> := Name, <<"version">> := Version, <<"version_copy">> := VersionCopy, <<"status">> := Status}) ->
handle_event(cast, {handle, {inform, Inform}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, host_id = HostId, has_session = true}) ->
#service_inform{name = Name, props = Props, version = Version, version_copy = VersionCopy, status = Status, at = At} = Inform,
lager:debug("[iot_host] host: ~p, service infos is: ~p", [UUID, Inform]),
%% props id:id:id
{SceneId, MicroId} = parse_props(Props),
micro_inform_log:insert(#{
@ -411,32 +374,22 @@ handle_event(cast, {handle, {inform, Info0}}, ?STATE_ACTIVATED, State = #state{u
<<"status">> => Status,
<<"created_at">> => At
}),
micro_set_bo:change_status(HostId, SceneId, MicroId, Status)
end, ServiceInforms);
Error ->
lager:warning("[iot_host] inform get error: ~p", [Error])
end,
micro_set_bo:change_status(HostId, SceneId, MicroId, Status),
{keep_state, State};
handle_event(cast, {handle, {feedback_step, Info0}}, ?STATE_ACTIVATED, State = #state{aes = AES, has_session = true}) ->
Info = iot_cipher_aes:decrypt(AES, Info0),
case catch jiffy:decode(Info, [return_maps]) of
Data = #{<<"task_id">> := TaskId, <<"code">> := Code} ->
handle_event(cast, {handle, {feedback_step, FeedbackStep}}, ?STATE_ACTIVATED, State = #state{has_session = true}) ->
#feedback_step{task_id = TaskId, code = Code} = FeedbackStep,
Result = scene_feedback_step:insert(#{
<<"task_id">> => TaskId,
<<"code">> => Code,
<<"created_at">> => iot_util:current_time()
}),
lager:debug("[iot_host] feedback_step info: ~p, insert result: ~p", [Data, Result]);
Other ->
lager:warning("[iot_host] feedback_step error: ~p", [Other])
end,
lager:debug("[iot_host] feedback_step: ~p, insert result: ~p", [FeedbackStep, Result]),
{keep_state, State};
handle_event(cast, {handle, {feedback_result, Info0}}, ?STATE_ACTIVATED, State = #state{aes = AES, has_session = true}) ->
Info = iot_cipher_aes:decrypt(AES, Info0),
case catch jiffy:decode(Info, [return_maps]) of
#{<<"task_id">> := TaskId, <<"time">> := Time, <<"code">> := Code, <<"reason">> := Reason, <<"error">> := Error, <<"type">> := Type} ->
handle_event(cast, {handle, {feedback_result, FeedbackResult}}, ?STATE_ACTIVATED, State = #state{has_session = true}) ->
#feedback_result{task_id = TaskId, time = Time, code = Code, reason = Reason, error = Error, task_type = Type} = FeedbackResult,
scene_feedback:insert(#{
<<"task_id">> => TaskId,
<<"task_type">> => Type,
@ -444,57 +397,55 @@ handle_event(cast, {handle, {feedback_result, Info0}}, ?STATE_ACTIVATED, State =
<<"reason">> => Reason,
<<"error">> => Error,
<<"created_at">> => Time
});
Other ->
lager:warning("[iot_host] feedback_result error: ~p", [Other])
end,
}),
{keep_state, State};
handle_event(cast, {handle, {event, Event0}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, aes = AES, has_session = true}) ->
EventText = iot_cipher_aes:decrypt(AES, Event0),
lager:debug("[iot_host] uuid: ~p, get event: ~p", [UUID, EventText]),
case catch jiffy:decode(EventText, [return_maps]) of
#{<<"event_type">> := ?EVENT_DEVICE, <<"params">> := #{<<"device_uuid">> := DeviceUUID, <<"status">> := Status}} ->
DevicePid = iot_device:get_pid(DeviceUUID),
iot_device:change_status(DevicePid, Status);
Event when is_map(Event) ->
lager:warning("[iot_host] host: ~p, event: ~p, not supported", [UUID, Event]);
Other ->
lager:warning("[iot_host] host: ~p, event error: ~p", [UUID, Other])
end,
{keep_state, State};
%handle_event(cast, {handle, {event, Event}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true}) ->
% lager:debug("[iot_host] uuid: ~p, get event: ~p", [UUID, Event]),
% #event{event_type = EventType, params = } = Event,
%
% case catch jiffy:decode(EventText, [return_maps]) of
% #{<<"event_type">> := ?EVENT_DEVICE, <<"params">> := #{<<"device_uuid">> := DeviceUUID, <<"status">> := Status}} ->
% DevicePid = iot_device:get_pid(DeviceUUID),
% iot_device:change_status(DevicePid, Status);
% Event when is_map(Event) ->
% lager:warning("[iot_host] host: ~p, event: ~p, not supported", [UUID, Event]);
% Other ->
% lager:warning("[iot_host] host: ~p, event error: ~p", [UUID, Other])
% end,
% {keep_state, State};
handle_event(cast, {handle, {ai_event, Event0}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, aes = AES, has_session = true}) ->
EventText = iot_cipher_aes:decrypt(AES, Event0),
lager:debug("[iot_host] uuid: ~p, get ai_event: ~p", [UUID, EventText]),
case catch jiffy:decode(EventText, [return_maps]) of
#{<<"event_type">> := EventType, <<"params">> := Params0 = #{<<"device_uuid">> := DeviceUUID, <<"props">> := Props}} ->
case iot_device:is_alive(DeviceUUID) of
error ->
lager:notice("[iot_host] uuid: ~p, device_uuid: ~p is not alive, get ai_event: ~p", [UUID, DeviceUUID, EventText]),
ok;
{ok, DevicePid} ->
Params = maps:remove(<<"props">>, Params0),
{SceneId, MicroId} = parse_props(Props),
%% mysql
Message = iolist_to_binary(jiffy:encode(Params, [force_utf8])),
case ai_event_logs_bo:insert(UUID, DeviceUUID, SceneId, MicroId, EventType, Message) of
{ok, LogId} ->
iot_api:ai_event(LogId);
_ ->
ok
end,
iot_device:change_status(DevicePid, ?DEVICE_ONLINE),
iot_ai_router:route_uuid(DeviceUUID, EventType, Params)
end;
Event when is_map(Event) ->
lager:warning("[iot_host] host: ~p, event: ~p, not supported", [UUID, Event]);
Other ->
lager:warning("[iot_host] host: ~p, event error: ~p", [UUID, Other])
end,
{keep_state, State};
%handle_event(cast, {handle, {ai_event, Event0}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true}) ->
% EventText = iot_cipher_aes:decrypt(AES, Event0),
% lager:debug("[iot_host] uuid: ~p, get ai_event: ~p", [UUID, EventText]),
% case catch jiffy:decode(EventText, [return_maps]) of
% #{<<"event_type">> := EventType, <<"params">> := Params0 = #{<<"device_uuid">> := DeviceUUID, <<"props">> := Props}} ->
% case iot_device:is_alive(DeviceUUID) of
% error ->
% lager:notice("[iot_host] uuid: ~p, device_uuid: ~p is not alive, get ai_event: ~p", [UUID, DeviceUUID, EventText]),
% ok;
% {ok, DevicePid} ->
% Params = maps:remove(<<"props">>, Params0),
% {SceneId, MicroId} = parse_props(Props),
%
% %% mysql
% Message = iolist_to_binary(jiffy:encode(Params, [force_utf8])),
% case ai_event_logs_bo:insert(UUID, DeviceUUID, SceneId, MicroId, EventType, Message) of
% {ok, LogId} ->
% iot_api:ai_event(LogId);
% _ ->
% ok
% end,
% iot_device:change_status(DevicePid, ?DEVICE_ONLINE),
%
% iot_ai_router:route_uuid(DeviceUUID, EventType, Params)
% end;
% Event when is_map(Event) ->
% lager:warning("[iot_host] host: ~p, event: ~p, not supported", [UUID, Event]);
% Other ->
% lager:warning("[iot_host] host: ~p, event error: ~p", [UUID, Other])
% end,
% {keep_state, State};
%%
handle_event(cast, heartbeat, _, State = #state{heartbeat_counter = HeartbeatCounter}) ->
@ -557,7 +508,7 @@ code_change(_OldVsn, StateName, State = #state{}, _Extra) ->
%%%===================================================================
%%
handle_data(#{<<"device_uuid">> := DeviceUUID, <<"service_name">> := ServiceName, <<"at">> := Timestamp, <<"fields">> := FieldsList, <<"tags">> := Tags}, #state{uuid = UUID})
handle_data(#data{device_uuid = DeviceUUID, service_name = ServiceName, at = Timestamp, fields = FieldsList, tags = Tags}, #state{uuid = UUID})
when is_binary(DeviceUUID), DeviceUUID /= <<>> ->
case iot_device:get_pid(DeviceUUID) of
@ -580,7 +531,7 @@ handle_data(#{<<"device_uuid">> := DeviceUUID, <<"service_name">> := ServiceName
end
end;
handle_data(#{<<"service_name">> := ServiceName, <<"at">> := Timestamp, <<"fields">> := FieldsList, <<"tags">> := Tags}, #state{uuid = UUID}) ->
handle_data(#data{service_name = ServiceName, at = Timestamp, fields = FieldsList, tags = Tags}, #state{uuid = UUID}) ->
%%
iot_router:route_uuid(UUID, FieldsList, Timestamp),
@ -610,11 +561,10 @@ report_event(UUID, NewStatus) when is_binary(UUID), is_integer(NewStatus) ->
lager:debug("[iot_host] host_uuid: ~p, route fields: ~p", [UUID, FieldsList]).
%% state转换成map
state_map(#state{host_id = HostId, uuid = UUID, aes = Aes, has_session = HasSession, heartbeat_counter = HeartbeatCounter, channel_pid = ChannelPid, metrics = Metrics}) ->
state_map(#state{host_id = HostId, uuid = UUID, has_session = HasSession, heartbeat_counter = HeartbeatCounter, channel_pid = ChannelPid, metrics = Metrics}) ->
#{
host_id => HostId,
uuid => UUID,
aes => Aes,
has_session => HasSession,
heartbeat_counter => HeartbeatCounter,
channel_pid => ChannelPid,