From cd09698e3c175347e18b8252b06271c68498c0fa Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Wed, 7 May 2025 18:27:48 +0800 Subject: [PATCH] simple device --- apps/iot/src/iot_device1.erl | 124 +++++++++++++++++++++++++++++++ apps/iot/src/iot_host.erl | 103 ++++--------------------- apps/iot/src/iot_tcp_channel.erl | 22 +----- 3 files changed, 142 insertions(+), 107 deletions(-) create mode 100644 apps/iot/src/iot_device1.erl diff --git a/apps/iot/src/iot_device1.erl b/apps/iot/src/iot_device1.erl new file mode 100644 index 0000000..f67a492 --- /dev/null +++ b/apps/iot/src/iot_device1.erl @@ -0,0 +1,124 @@ +%%%------------------------------------------------------------------- +%%% @copyright (C) 2023, +%%% @doc +%%% +%%% @end +%%% Created : 14. 8月 2023 11:40 +%%%------------------------------------------------------------------- +-module(iot_device1). +-author("aresei"). +-include("iot.hrl"). + +%% API +-export([new/1, is_activated/1, change_status/2, reload/1, auth/2]). + +%% 终端是否授权 +-define(DEVICE_AUTH_DENIED, 0). +-define(DEVICE_AUTH_AUTHED, 1). + +%% 状态 +-define(STATE_DENIED, denied). +-define(STATE_ACTIVATED, activated). + +-record(device, { + device_uuid :: binary(), + auth_state = ?STATE_DENIED, + status = ?DEVICE_OFFLINE +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +-spec new(DeviceUUID :: binary()) -> error | {ok, Device :: #device{}}. +new(DeviceUUID) when is_binary(DeviceUUID) -> + case device_bo:get_device_by_uuid(DeviceUUID) of + {ok, #{<<"device_uuid">> := DeviceUUID, <<"authorize_status">> := AuthorizeStatus, <<"status">> := Status}} -> + {ok, #device{device_uuid = DeviceUUID, status = Status, auth_state = auth_state(AuthorizeStatus)}}; + undefined -> + lager:warning("[iot_device] device uuid: ~p, loaded from mysql failed", [DeviceUUID]), + error + end. + +-spec is_activated(Device :: #device{}) -> boolean(). +is_activated(#device{auth_state = AuthState}) -> + AuthState =:= ?STATE_ACTIVATED. + +-spec change_status(Device :: #device{}, NewStatus :: integer()) -> NDevice :: #device{}. +change_status(Device = #device{status = Status}, NewStatus) when is_integer(NewStatus), Status =:= NewStatus -> + Device; +change_status(Device = #device{device_uuid = DeviceUUID}, ?DEVICE_ONLINE) -> + {ok, _} = device_bo:change_status(DeviceUUID, ?DEVICE_ONLINE), + report_event(DeviceUUID, ?DEVICE_ONLINE), + Device#device{status = ?DEVICE_ONLINE}; +change_status(Device = #device{device_uuid = DeviceUUID}, ?DEVICE_OFFLINE) -> + {ok, #{<<"status">> := Status}} = device_bo:get_device_by_uuid(DeviceUUID), + case Status of + ?DEVICE_NOT_JOINED -> + lager:debug("[iot_device] device: ~p, device_maybe_offline, not joined, can not change to offline", [DeviceUUID]), + Device#device{status = ?DEVICE_NOT_JOINED}; + ?DEVICE_OFFLINE -> + lager:debug("[iot_device] device: ~p, device_maybe_offline, is offline, do nothing", [DeviceUUID]), + Device#device{status = ?DEVICE_OFFLINE}; + ?DEVICE_ONLINE -> + {ok, _} = device_bo:change_status(DeviceUUID, ?DEVICE_OFFLINE), + report_event(DeviceUUID, ?DEVICE_OFFLINE), + Device#device{status = ?DEVICE_OFFLINE} + end. + +-spec reload(Device :: #device{}) -> error | {ok, NDevice :: #device{}}. +reload(Device = #device{device_uuid = DeviceUUID}) -> + lager:debug("[iot_device] will reload: ~p", [DeviceUUID]), + case device_bo:get_device_by_uuid(DeviceUUID) of + {ok, #{<<"authorize_status">> := AuthorizeStatus, <<"status">> := Status}} -> + {ok, Device#device{device_uuid = DeviceUUID, status = Status, auth_state = auth_state(AuthorizeStatus)}}; + undefined -> + lager:warning("[iot_device] device uuid: ~p, loaded from mysql failed", [DeviceUUID]), + error + end. + +-spec auth(Device :: #device{}, Auth :: boolean()) -> NDevice :: #device{}. +auth(Device = #device{auth_state = StateName, device_uuid = DeviceUUID}, Auth) when is_boolean(Auth) -> + case {StateName, Auth} of + {?STATE_DENIED, false} -> + lager:debug("[iot_device] device_uuid: ~p, auth: false, will keep state_name: ~p", [DeviceUUID, ?STATE_DENIED]), + Device; + {?STATE_DENIED, true} -> + Device#device{auth_state = ?STATE_ACTIVATED}; + {?STATE_ACTIVATED, false} -> + lager:debug("[iot_device] device_uuid: ~p, auth: false, state_name from: ~p, to: ~p", [DeviceUUID, ?STATE_ACTIVATED, ?STATE_DENIED]), + Device#device{auth_state = ?STATE_DENIED}; + {?STATE_ACTIVATED, true} -> + lager:debug("[iot_device] device_uuid: ~p, auth: true, will keep state_name: ~p", [DeviceUUID, ?STATE_ACTIVATED]), + Device + end. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +-spec auth_state(integer()) -> atom(). +auth_state(?DEVICE_AUTH_AUTHED) -> + ?STATE_ACTIVATED; +auth_state(?DEVICE_AUTH_DENIED) -> + ?STATE_DENIED. + +-spec report_event(DeviceUUID :: binary(), NewStatus :: integer()) -> no_return(). +report_event(DeviceUUID, NewStatus) when is_binary(DeviceUUID), is_integer(NewStatus) -> + TextMap = #{ + 0 => <<"离线"/utf8>>, + 1 => <<"在线"/utf8>> + }, + %% 设备的状态信息上报给中电 + Timestamp = iot_util:timestamp_of_seconds(), + FieldsList = [#{ + <<"key">> => <<"device_status">>, + <<"value">> => NewStatus, + <<"value_text">> => maps:get(NewStatus, TextMap), + <<"unit">> => 0, + <<"type">> => <<"DI">>, + <<"name">> => <<"设备状态"/utf8>>, + <<"timestamp">> => Timestamp + }], + iot_router:route_uuid(DeviceUUID, FieldsList, Timestamp), + lager:debug("[iot_device] device_uuid: ~p, route fields: ~p", [DeviceUUID, FieldsList]). \ No newline at end of file diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index be727b9..a0f5783 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -145,7 +145,6 @@ init([UUID]) -> %% 通过host_id注册别名, 可以避免通过查询数据库获取HostPid AliasName = get_alias_name(HostId), global:register_name(AliasName, self()), - Aes = list_to_binary(iot_util:rand_bytes(32)), %% 心跳检测机制 erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat_ticker), @@ -333,91 +332,28 @@ handle_event(cast, {handle, {ping, Metrics}}, ?STATE_ACTIVATED, State = #state{u {keep_state, State#state{metrics = Metrics}}; handle_event(cast, {handle, {inform, Inform}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, host_id = HostId, has_session = true}) -> - #service_inform{name = Name, props = Props, version = Version, version_copy = VersionCopy, status = Status, at = At} = Inform, + #service_inform{service_id = ServiceId, status = Status, timestamp = Timestamp} = Inform, lager:debug("[iot_host] host: ~p, service infos is: ~p", [UUID, Inform]), %% props 主机id:场景id:微服务id - {SceneId, MicroId} = parse_props(Props), - micro_inform_log:insert(#{ - <<"host_id">> => HostId, - <<"scene_id">> => SceneId, - <<"service_name">> => Name, - <<"version">> => Version, - <<"version_copy">> => VersionCopy, - <<"status">> => Status, - <<"created_at">> => At - }), - micro_set_bo:change_status(HostId, SceneId, MicroId, Status), - {keep_state, State}; - -handle_event(cast, {handle, {feedback_step, FeedbackStep}}, ?STATE_ACTIVATED, State = #state{has_session = true}) -> - #feedback_step{task_id = TaskId, code = Code} = FeedbackStep, - Result = scene_feedback_step:insert(#{ - <<"task_id">> => TaskId, - <<"code">> => Code, - <<"created_at">> => iot_util:current_time() - }), - lager:debug("[iot_host] feedback_step: ~p, insert result: ~p", [FeedbackStep, Result]), - {keep_state, State}; handle_event(cast, {handle, {feedback_result, FeedbackResult}}, ?STATE_ACTIVATED, State = #state{has_session = true}) -> - #feedback_result{task_id = TaskId, time = Time, code = Code, reason = Reason, error = Error, task_type = Type} = FeedbackResult, - scene_feedback:insert(#{ - <<"task_id">> => TaskId, - <<"task_type">> => Type, - <<"code">> => Code, - <<"reason">> => Reason, - <<"error">> => Error, - <<"created_at">> => Time - }), + #feedback_phase{task_id = TaskId} = FeedbackResult, {keep_state, State}; -%handle_event(cast, {handle, {event, Event}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true}) -> -% lager:debug("[iot_host] uuid: ~p, get event: ~p", [UUID, Event]), -% #event{event_type = EventType, params = } = Event, -% -% case catch jiffy:decode(EventText, [return_maps]) of -% #{<<"event_type">> := ?EVENT_DEVICE, <<"params">> := #{<<"device_uuid">> := DeviceUUID, <<"status">> := Status}} -> -% DevicePid = iot_device:get_pid(DeviceUUID), -% iot_device:change_status(DevicePid, Status); -% Event when is_map(Event) -> -% lager:warning("[iot_host] host: ~p, event: ~p, not supported", [UUID, Event]); -% Other -> -% lager:warning("[iot_host] host: ~p, event error: ~p", [UUID, Other]) -% end, -% {keep_state, State}; +handle_event(cast, {handle, {event, Event}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true}) -> + lager:debug("[iot_host] uuid: ~p, get event: ~p", [UUID, Event]), + #event{service_id = ServiceId, event_type = EventType, params = Params} = Event, -%handle_event(cast, {handle, {ai_event, Event0}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true}) -> -% EventText = iot_cipher_aes:decrypt(AES, Event0), -% lager:debug("[iot_host] uuid: ~p, get ai_event: ~p", [UUID, EventText]), -% case catch jiffy:decode(EventText, [return_maps]) of -% #{<<"event_type">> := EventType, <<"params">> := Params0 = #{<<"device_uuid">> := DeviceUUID, <<"props">> := Props}} -> -% case iot_device:is_alive(DeviceUUID) of -% error -> -% lager:notice("[iot_host] uuid: ~p, device_uuid: ~p is not alive, get ai_event: ~p", [UUID, DeviceUUID, EventText]), -% ok; -% {ok, DevicePid} -> -% Params = maps:remove(<<"props">>, Params0), -% {SceneId, MicroId} = parse_props(Props), -% -% %% 保存数据到mysql -% Message = iolist_to_binary(jiffy:encode(Params, [force_utf8])), -% case ai_event_logs_bo:insert(UUID, DeviceUUID, SceneId, MicroId, EventType, Message) of -% {ok, LogId} -> -% iot_api:ai_event(LogId); -% _ -> -% ok -% end, -% iot_device:change_status(DevicePid, ?DEVICE_ONLINE), -% -% iot_ai_router:route_uuid(DeviceUUID, EventType, Params) -% end; -% Event when is_map(Event) -> -% lager:warning("[iot_host] host: ~p, event: ~p, not supported", [UUID, Event]); -% Other -> -% lager:warning("[iot_host] host: ~p, event error: ~p", [UUID, Other]) -% end, -% {keep_state, State}; + %DevicePid = iot_device:get_pid(DeviceUUID), + %iot_device:change_status(DevicePid, Status), + + {keep_state, State}; + +handle_event(cast, {handle, {ai_event, AIEventPB}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true}) -> + #ai_event{service_id = ServiceId, event_type = EventType, params = Params} = message_pb:decode_msg(AIEventPB, ai_event), + lager:debug("[iot_host] uuid: ~p, service_id: ~p, event_type: ~p, params: ~p", [UUID, ServiceId, EventType, Params]), + {keep_state, State}; %% 心跳机制 handle_event(cast, heartbeat, _, State = #state{heartbeat_counter = HeartbeatCounter}) -> @@ -541,13 +477,4 @@ state_map(#state{host_id = HostId, uuid = UUID, has_session = HasSession, heartb heartbeat_counter => HeartbeatCounter, channel_pid => ChannelPid, metrics => Metrics - }. - -%% props 主机id:场景id:微服务id --spec parse_props(Props :: undefined | binary()) -> {SceneId :: integer(), MicroId :: integer()}. -parse_props(Props) when is_binary(Props) -> - %% props 主机id:场景id:微服务id - [_, SceneId0, MicroId0] = binary:split(Props, <<":">>, [global]), - SceneId = binary_to_integer(SceneId0), - MicroId = binary_to_integer(MicroId0), - {SceneId, MicroId}. \ No newline at end of file + }. \ No newline at end of file diff --git a/apps/iot/src/iot_tcp_channel.erl b/apps/iot/src/iot_tcp_channel.erl index eeeb075..5248e95 100644 --- a/apps/iot/src/iot_tcp_channel.erl +++ b/apps/iot/src/iot_tcp_channel.erl @@ -100,35 +100,21 @@ handle_info({tcp, Socket, < %% 建立到host的monitor erlang:monitor(process, HostPid), - AuthReplyBin = message_pb:encode_msg(#auth_reply{ - code = 1, - message = <<"ok">>, - repository_url = <<"https://www.baidu.com">> - }), + AuthReplyBin = message_pb:encode_msg(#auth_reply{code = 1, message = <<"ok">>}), Transport:send(Socket, <>), {noreply, State#state{uuid = UUID, host_pid = HostPid}}; {denied, Reason} when is_binary(Reason) -> erlang:monitor(process, HostPid), - AuthReplyBin = message_pb:encode_msg(#auth_reply{ - code = -1, - message = Reason, - repository_url = <<"">> - }), + AuthReplyBin = message_pb:encode_msg(#auth_reply{code = -1, message = Reason}), Transport:send(Socket, <>), - lager:debug("[ws_channel] uuid: ~p, attach channel get error: ~p, stop channel", [UUID, Reason]), {noreply, State#state{uuid = UUID, host_pid = HostPid}}; {error, Reason} when is_binary(Reason) -> - AuthReplyBin = message_pb:encode_msg(#auth_reply{ - code = -2, - message = Reason, - repository_url = <<"">> - }), + AuthReplyBin = message_pb:encode_msg(#auth_reply{code = -2, message = Reason}), Transport:send(Socket, <>), - lager:debug("[ws_channel] uuid: ~p, attach channel get error: ~p, stop channel", [UUID, Reason]), {stop, State} @@ -183,8 +169,6 @@ handle_info({tcp, Socket, <> {noreply, State}; {{ReceiverPid, Ref}, NInflight} -> case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of - true when Body == <<>> -> - ReceiverPid ! {ws_response, Ref}; true -> ReceiverPid ! {ws_response, Ref, Body}; false ->