diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index 1bdfabd..9b276b4 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -9,6 +9,7 @@ -module(iot_host). -author("aresei"). -include("iot.hrl"). +-include("message_pb.hrl"). -behaviour(gen_statem). @@ -21,7 +22,7 @@ %% API -export([start_link/2, get_name/1, get_alias_name/1, get_pid/1, handle/2, activate/2]). --export([get_metric/1, publish_message/4, get_aes/1, get_status/1]). +-export([get_metric/1, publish_message/4, get_status/1]). -export([publish_directive/6, send_directive/5]). -export([attach_channel/2]). -export([reload_device/2, delete_device/2, activate_device/3]). @@ -34,8 +35,6 @@ host_id :: integer(), %% 从数据库里面读取到的数据 uuid :: binary(), - %% aes的key, 后续通讯需要基于这个加密 - aes = <<>> :: binary(), has_session = false :: boolean(), %% 心跳计数器 heartbeat_counter = 0 :: integer(), @@ -67,10 +66,6 @@ get_alias_name(HostId0) when is_integer(HostId0) -> handle(Pid, Packet) when is_pid(Pid) -> gen_statem:cast(Pid, {handle, Packet}). --spec get_aes(Pid :: pid()) -> {ok, Aes :: binary()}. -get_aes(Pid) when is_pid(Pid) -> - gen_statem:call(Pid, get_aes). - -spec get_status(Pid :: pid()) -> {ok, Status :: map()}. get_status(Pid) when is_pid(Pid) -> gen_statem:call(Pid, get_status). @@ -197,7 +192,7 @@ init([UUID]) -> true -> ?STATE_ACTIVATED; false -> ?STATE_DENIED end, - {ok, StateName, #state{host_id = HostId, uuid = UUID, aes = Aes, has_session = false}}; + {ok, StateName, #state{host_id = HostId, uuid = UUID, has_session = false}}; undefined -> lager:warning("[iot_host] host uuid: ~p, loaded from mysql failed", [UUID]), ignore @@ -217,9 +212,6 @@ callback_mode() -> handle_event({call, From}, get_metric, _, State = #state{metrics = Metrics}) -> {keep_state, State, [{reply, From, {ok, Metrics}}]}; -handle_event({call, From}, get_aes, _, State = #state{aes = Aes}) -> - {keep_state, State, [{reply, From, {ok, Aes}}]}; - %% 获取主机的状态 handle_event({call, From}, get_status, _, State = #state{host_id = HostId, channel_pid = ChannelPid, heartbeat_counter = HeartbeatCounter, metrics = Metrics, has_session = HasSession}) -> %% 启动主机相关的devices @@ -242,20 +234,9 @@ handle_event({call, From}, get_status, _, State = #state{host_id = HostId, chann }, {keep_state, State, [{reply, From, {ok, Reply}}]}; -%% 发送普通格式的消息, 激活的时候,会话时创建不成功的; 发送aes类型的命令的时候,必须要求session是存在的 -handle_event({call, From}, {publish_message, ReceiverPid, CommandType, {aes, Command0}}, ?STATE_ACTIVATED, - State = #state{uuid = UUID, aes = AES, channel_pid = ChannelPid, has_session = true}) -> - - lager:debug("[iot_host] host: ~p, will publish aes message: ~p", [UUID, Command0]), - Command = iot_cipher_aes:encrypt(AES, Command0), - %% 通过websocket发送请求 - Ref = ws_channel:publish(ChannelPid, ReceiverPid, <>), - - {keep_state, State, [{reply, From, {ok, Ref}}]}; - %% 只要channel存在,就负责将消息推送到边缘端主机 handle_event({call, From}, {publish_message, ReceiverPid, CommandType, Command}, _, - State = #state{uuid = UUID, channel_pid = ChannelPid}) when is_binary(Command), is_pid(ChannelPid) -> + State = #state{uuid = UUID, channel_pid = ChannelPid, has_session = true}) when is_binary(Command), is_pid(ChannelPid) -> %% 通过websocket发送请求 lager:debug("[iot_host] host: ~p, will publish message: ~p", [UUID, Command]), @@ -268,11 +249,10 @@ handle_event({call, From}, {publish_message, _, _, _}, _, State = #state{uuid = {keep_state, State, [{reply, From, {error, <<"主机离线,发送命令失败"/utf8>>}}]}; %% 发送指令时, 指令要通过aes加密,必须要求session是存在的 -handle_event({call, From}, {publish_directive, ReceiverPid, Directive0}, ?STATE_ACTIVATED, - State = #state{uuid = UUID, aes = AES, channel_pid = ChannelPid, has_session = true}) -> +handle_event({call, From}, {publish_directive, ReceiverPid, Directive}, ?STATE_ACTIVATED, + State = #state{uuid = UUID, channel_pid = ChannelPid, has_session = true}) -> - lager:debug("[iot_host] host: ~p, will publish_directive: ~p", [UUID, Directive0]), - Directive = iot_cipher_aes:encrypt(AES, Directive0), + lager:debug("[iot_host] host: ~p, will publish_directive: ~p", [UUID, Directive]), %% 通过websocket发送请求 Ref = ws_channel:publish(ChannelPid, ReceiverPid, <<16:8, Directive/binary>>), @@ -284,11 +264,10 @@ handle_event({call, From}, {publish_directive, _, Directive}, _, State = #state{ {keep_state, State, [{reply, From, {error, <<"主机离线,发送指令失败"/utf8>>}}]}; %% 发送指令时, 指令要通过aes加密,必须要求session是存在的 -handle_event({call, From}, {send_directive, Directive0}, ?STATE_ACTIVATED, - State = #state{uuid = UUID, aes = AES, channel_pid = ChannelPid, has_session = true}) -> +handle_event({call, From}, {send_directive, Directive}, ?STATE_ACTIVATED, + State = #state{uuid = UUID, channel_pid = ChannelPid, has_session = true}) -> - lager:debug("[iot_host] host: ~p, will publish_directive: ~p", [UUID, Directive0]), - Directive = iot_cipher_aes:encrypt(AES, Directive0), + lager:debug("[iot_host] host: ~p, will publish_directive: ~p", [UUID, Directive]), %% 通过websocket发送请求 ws_channel:send(ChannelPid, <<16:8, Directive/binary>>), @@ -300,7 +279,7 @@ handle_event({call, From}, {send_directive, Directive}, _, State = #state{uuid = {keep_state, State, [{reply, From, {error, <<"主机离线,发送指令失败"/utf8>>}}]}; %% 激活主机 -handle_event({call, From}, {activate, true}, _, State = #state{uuid = UUID, aes = Aes, channel_pid = ChannelPid}) when is_pid(ChannelPid) -> +handle_event({call, From}, {activate, true}, _, State = #state{uuid = UUID, channel_pid = ChannelPid}) when is_pid(ChannelPid) -> BinReply = jiffy:encode(#{<<"auth">> => true, <<"aes">> => Aes}, [force_utf8]), ws_channel:send(ChannelPid, <<8:8, BinReply/binary>>), lager:debug("[iot_host] uuid: ~p, activate: true, will send message: ~p", [UUID, BinReply]), @@ -372,129 +351,101 @@ handle_event({call, From}, {activate_device, DeviceUUID, Auth}, _, State) -> end; %% 需要将消息转换成json格式然后再处理, 需要在host进程里面处理, 数据带有props,服务端暂时未用到 -handle_event(cast, {handle, {data, Data}}, ?STATE_ACTIVATED, State = #state{aes = AES, has_session = true}) -> - PlainData = iot_cipher_aes:decrypt(AES, Data), - case catch jiffy:decode(PlainData, [return_maps]) of - Info when is_map(Info) -> - handle_data(Info, State); - Other -> - lager:notice("[iot_host] the data is invalid json: ~p", [Other]) - end, +handle_event(cast, {handle, {data, Data}}, ?STATE_ACTIVATED, State = #state{has_session = true}) -> + handle_data(Data, State), {keep_state, State}; %% ping的数据是通过aes加密后的,因此需要在有会话的情况下才行 -handle_event(cast, {handle, {ping, CipherMetric}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, aes = AES, has_session = true}) -> - MetricsInfo = iot_cipher_aes:decrypt(AES, CipherMetric), - case catch jiffy:decode(MetricsInfo, [return_maps]) of - Metrics when is_map(Metrics) -> - lager:debug("[iot_host] host_id uuid: ~p, get ping: ~p", [UUID, Metrics]), - {keep_state, State#state{metrics = Metrics}}; - Other -> - lager:warning("[iot_host] host_id: ~p, ping is invalid json: ~p", [UUID, Other]), - {keep_state, State} - end; +handle_event(cast, {handle, {ping, Metrics}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true}) -> + lager:debug("[iot_host] host_id uuid: ~p, get ping: ~p", [UUID, Metrics]), + {keep_state, State#state{metrics = Metrics}}; -handle_event(cast, {handle, {inform, Info0}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, host_id = HostId, aes = AES, has_session = true}) -> - Info = iot_cipher_aes:decrypt(AES, Info0), - case catch jiffy:decode(Info, [return_maps]) of - #{<<"at">> := At, <<"services">> := ServiceInforms} -> - lager:debug("[iot_host] host: ~p, service infos is: ~p", [UUID, ServiceInforms]), - lists:foreach(fun(#{<<"props">> := Props, <<"name">> := Name, <<"version">> := Version, <<"version_copy">> := VersionCopy, <<"status">> := Status}) -> - %% props 主机id:场景id:微服务id - {SceneId, MicroId} = parse_props(Props), - micro_inform_log:insert(#{ - <<"host_id">> => HostId, - <<"scene_id">> => SceneId, - <<"service_name">> => Name, - <<"version">> => Version, - <<"version_copy">> => VersionCopy, - <<"status">> => Status, - <<"created_at">> => At - }), - micro_set_bo:change_status(HostId, SceneId, MicroId, Status) - end, ServiceInforms); - Error -> - lager:warning("[iot_host] inform get error: ~p", [Error]) - end, +handle_event(cast, {handle, {inform, Inform}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, host_id = HostId, has_session = true}) -> + #service_inform{name = Name, props = Props, version = Version, version_copy = VersionCopy, status = Status, at = At} = Inform, + lager:debug("[iot_host] host: ~p, service infos is: ~p", [UUID, Inform]), + %% props 主机id:场景id:微服务id + {SceneId, MicroId} = parse_props(Props), + micro_inform_log:insert(#{ + <<"host_id">> => HostId, + <<"scene_id">> => SceneId, + <<"service_name">> => Name, + <<"version">> => Version, + <<"version_copy">> => VersionCopy, + <<"status">> => Status, + <<"created_at">> => At + }), + micro_set_bo:change_status(HostId, SceneId, MicroId, Status), {keep_state, State}; -handle_event(cast, {handle, {feedback_step, Info0}}, ?STATE_ACTIVATED, State = #state{aes = AES, has_session = true}) -> - Info = iot_cipher_aes:decrypt(AES, Info0), - case catch jiffy:decode(Info, [return_maps]) of - Data = #{<<"task_id">> := TaskId, <<"code">> := Code} -> - Result = scene_feedback_step:insert(#{ - <<"task_id">> => TaskId, - <<"code">> => Code, - <<"created_at">> => iot_util:current_time() - }), - lager:debug("[iot_host] feedback_step info: ~p, insert result: ~p", [Data, Result]); - Other -> - lager:warning("[iot_host] feedback_step error: ~p", [Other]) - end, +handle_event(cast, {handle, {feedback_step, FeedbackStep}}, ?STATE_ACTIVATED, State = #state{has_session = true}) -> + #feedback_step{task_id = TaskId, code = Code} = FeedbackStep, + Result = scene_feedback_step:insert(#{ + <<"task_id">> => TaskId, + <<"code">> => Code, + <<"created_at">> => iot_util:current_time() + }), + lager:debug("[iot_host] feedback_step: ~p, insert result: ~p", [FeedbackStep, Result]), + {keep_state, State}; -handle_event(cast, {handle, {feedback_result, Info0}}, ?STATE_ACTIVATED, State = #state{aes = AES, has_session = true}) -> - Info = iot_cipher_aes:decrypt(AES, Info0), - case catch jiffy:decode(Info, [return_maps]) of - #{<<"task_id">> := TaskId, <<"time">> := Time, <<"code">> := Code, <<"reason">> := Reason, <<"error">> := Error, <<"type">> := Type} -> - scene_feedback:insert(#{ - <<"task_id">> => TaskId, - <<"task_type">> => Type, - <<"code">> => Code, - <<"reason">> => Reason, - <<"error">> => Error, - <<"created_at">> => Time - }); - Other -> - lager:warning("[iot_host] feedback_result error: ~p", [Other]) - end, +handle_event(cast, {handle, {feedback_result, FeedbackResult}}, ?STATE_ACTIVATED, State = #state{has_session = true}) -> + #feedback_result{task_id = TaskId, time = Time, code = Code, reason = Reason, error = Error, task_type = Type} = FeedbackResult, + scene_feedback:insert(#{ + <<"task_id">> => TaskId, + <<"task_type">> => Type, + <<"code">> => Code, + <<"reason">> => Reason, + <<"error">> => Error, + <<"created_at">> => Time + }), {keep_state, State}; -handle_event(cast, {handle, {event, Event0}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, aes = AES, has_session = true}) -> - EventText = iot_cipher_aes:decrypt(AES, Event0), - lager:debug("[iot_host] uuid: ~p, get event: ~p", [UUID, EventText]), - case catch jiffy:decode(EventText, [return_maps]) of - #{<<"event_type">> := ?EVENT_DEVICE, <<"params">> := #{<<"device_uuid">> := DeviceUUID, <<"status">> := Status}} -> - DevicePid = iot_device:get_pid(DeviceUUID), - iot_device:change_status(DevicePid, Status); - Event when is_map(Event) -> - lager:warning("[iot_host] host: ~p, event: ~p, not supported", [UUID, Event]); - Other -> - lager:warning("[iot_host] host: ~p, event error: ~p", [UUID, Other]) - end, - {keep_state, State}; +%handle_event(cast, {handle, {event, Event}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true}) -> +% lager:debug("[iot_host] uuid: ~p, get event: ~p", [UUID, Event]), +% #event{event_type = EventType, params = } = Event, +% +% case catch jiffy:decode(EventText, [return_maps]) of +% #{<<"event_type">> := ?EVENT_DEVICE, <<"params">> := #{<<"device_uuid">> := DeviceUUID, <<"status">> := Status}} -> +% DevicePid = iot_device:get_pid(DeviceUUID), +% iot_device:change_status(DevicePid, Status); +% Event when is_map(Event) -> +% lager:warning("[iot_host] host: ~p, event: ~p, not supported", [UUID, Event]); +% Other -> +% lager:warning("[iot_host] host: ~p, event error: ~p", [UUID, Other]) +% end, +% {keep_state, State}; -handle_event(cast, {handle, {ai_event, Event0}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, aes = AES, has_session = true}) -> - EventText = iot_cipher_aes:decrypt(AES, Event0), - lager:debug("[iot_host] uuid: ~p, get ai_event: ~p", [UUID, EventText]), - case catch jiffy:decode(EventText, [return_maps]) of - #{<<"event_type">> := EventType, <<"params">> := Params0 = #{<<"device_uuid">> := DeviceUUID, <<"props">> := Props}} -> - case iot_device:is_alive(DeviceUUID) of - error -> - lager:notice("[iot_host] uuid: ~p, device_uuid: ~p is not alive, get ai_event: ~p", [UUID, DeviceUUID, EventText]), - ok; - {ok, DevicePid} -> - Params = maps:remove(<<"props">>, Params0), - {SceneId, MicroId} = parse_props(Props), - - %% 保存数据到mysql - Message = iolist_to_binary(jiffy:encode(Params, [force_utf8])), - case ai_event_logs_bo:insert(UUID, DeviceUUID, SceneId, MicroId, EventType, Message) of - {ok, LogId} -> - iot_api:ai_event(LogId); - _ -> - ok - end, - iot_device:change_status(DevicePid, ?DEVICE_ONLINE), - - iot_ai_router:route_uuid(DeviceUUID, EventType, Params) - end; - Event when is_map(Event) -> - lager:warning("[iot_host] host: ~p, event: ~p, not supported", [UUID, Event]); - Other -> - lager:warning("[iot_host] host: ~p, event error: ~p", [UUID, Other]) - end, - {keep_state, State}; +%handle_event(cast, {handle, {ai_event, Event0}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true}) -> +% EventText = iot_cipher_aes:decrypt(AES, Event0), +% lager:debug("[iot_host] uuid: ~p, get ai_event: ~p", [UUID, EventText]), +% case catch jiffy:decode(EventText, [return_maps]) of +% #{<<"event_type">> := EventType, <<"params">> := Params0 = #{<<"device_uuid">> := DeviceUUID, <<"props">> := Props}} -> +% case iot_device:is_alive(DeviceUUID) of +% error -> +% lager:notice("[iot_host] uuid: ~p, device_uuid: ~p is not alive, get ai_event: ~p", [UUID, DeviceUUID, EventText]), +% ok; +% {ok, DevicePid} -> +% Params = maps:remove(<<"props">>, Params0), +% {SceneId, MicroId} = parse_props(Props), +% +% %% 保存数据到mysql +% Message = iolist_to_binary(jiffy:encode(Params, [force_utf8])), +% case ai_event_logs_bo:insert(UUID, DeviceUUID, SceneId, MicroId, EventType, Message) of +% {ok, LogId} -> +% iot_api:ai_event(LogId); +% _ -> +% ok +% end, +% iot_device:change_status(DevicePid, ?DEVICE_ONLINE), +% +% iot_ai_router:route_uuid(DeviceUUID, EventType, Params) +% end; +% Event when is_map(Event) -> +% lager:warning("[iot_host] host: ~p, event: ~p, not supported", [UUID, Event]); +% Other -> +% lager:warning("[iot_host] host: ~p, event error: ~p", [UUID, Other]) +% end, +% {keep_state, State}; %% 心跳机制 handle_event(cast, heartbeat, _, State = #state{heartbeat_counter = HeartbeatCounter}) -> @@ -557,7 +508,7 @@ code_change(_OldVsn, StateName, State = #state{}, _Extra) -> %%%=================================================================== %% 处理相关数据 -handle_data(#{<<"device_uuid">> := DeviceUUID, <<"service_name">> := ServiceName, <<"at">> := Timestamp, <<"fields">> := FieldsList, <<"tags">> := Tags}, #state{uuid = UUID}) +handle_data(#data{device_uuid = DeviceUUID, service_name = ServiceName, at = Timestamp, fields = FieldsList, tags = Tags}, #state{uuid = UUID}) when is_binary(DeviceUUID), DeviceUUID /= <<>> -> case iot_device:get_pid(DeviceUUID) of @@ -580,7 +531,7 @@ handle_data(#{<<"device_uuid">> := DeviceUUID, <<"service_name">> := ServiceName end end; -handle_data(#{<<"service_name">> := ServiceName, <<"at">> := Timestamp, <<"fields">> := FieldsList, <<"tags">> := Tags}, #state{uuid = UUID}) -> +handle_data(#data{service_name = ServiceName, at = Timestamp, fields = FieldsList, tags = Tags}, #state{uuid = UUID}) -> %% 查找终端设备对应的点位信息 iot_router:route_uuid(UUID, FieldsList, Timestamp), @@ -610,11 +561,10 @@ report_event(UUID, NewStatus) when is_binary(UUID), is_integer(NewStatus) -> lager:debug("[iot_host] host_uuid: ~p, route fields: ~p", [UUID, FieldsList]). %% 将当前的state转换成map -state_map(#state{host_id = HostId, uuid = UUID, aes = Aes, has_session = HasSession, heartbeat_counter = HeartbeatCounter, channel_pid = ChannelPid, metrics = Metrics}) -> +state_map(#state{host_id = HostId, uuid = UUID, has_session = HasSession, heartbeat_counter = HeartbeatCounter, channel_pid = ChannelPid, metrics = Metrics}) -> #{ host_id => HostId, uuid => UUID, - aes => Aes, has_session => HasSession, heartbeat_counter => HeartbeatCounter, channel_pid => ChannelPid,