simple device
This commit is contained in:
parent
75260f73c0
commit
cd09698e3c
124
apps/iot/src/iot_device1.erl
Normal file
124
apps/iot/src/iot_device1.erl
Normal file
@ -0,0 +1,124 @@
|
|||||||
|
%%%-------------------------------------------------------------------
|
||||||
|
%%% @copyright (C) 2023, <COMPANY>
|
||||||
|
%%% @doc
|
||||||
|
%%%
|
||||||
|
%%% @end
|
||||||
|
%%% Created : 14. 8月 2023 11:40
|
||||||
|
%%%-------------------------------------------------------------------
|
||||||
|
-module(iot_device1).
|
||||||
|
-author("aresei").
|
||||||
|
-include("iot.hrl").
|
||||||
|
|
||||||
|
%% API
|
||||||
|
-export([new/1, is_activated/1, change_status/2, reload/1, auth/2]).
|
||||||
|
|
||||||
|
%% 终端是否授权
|
||||||
|
-define(DEVICE_AUTH_DENIED, 0).
|
||||||
|
-define(DEVICE_AUTH_AUTHED, 1).
|
||||||
|
|
||||||
|
%% 状态
|
||||||
|
-define(STATE_DENIED, denied).
|
||||||
|
-define(STATE_ACTIVATED, activated).
|
||||||
|
|
||||||
|
-record(device, {
|
||||||
|
device_uuid :: binary(),
|
||||||
|
auth_state = ?STATE_DENIED,
|
||||||
|
status = ?DEVICE_OFFLINE
|
||||||
|
}).
|
||||||
|
|
||||||
|
%%%===================================================================
|
||||||
|
%%% API
|
||||||
|
%%%===================================================================
|
||||||
|
|
||||||
|
-spec new(DeviceUUID :: binary()) -> error | {ok, Device :: #device{}}.
|
||||||
|
new(DeviceUUID) when is_binary(DeviceUUID) ->
|
||||||
|
case device_bo:get_device_by_uuid(DeviceUUID) of
|
||||||
|
{ok, #{<<"device_uuid">> := DeviceUUID, <<"authorize_status">> := AuthorizeStatus, <<"status">> := Status}} ->
|
||||||
|
{ok, #device{device_uuid = DeviceUUID, status = Status, auth_state = auth_state(AuthorizeStatus)}};
|
||||||
|
undefined ->
|
||||||
|
lager:warning("[iot_device] device uuid: ~p, loaded from mysql failed", [DeviceUUID]),
|
||||||
|
error
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec is_activated(Device :: #device{}) -> boolean().
|
||||||
|
is_activated(#device{auth_state = AuthState}) ->
|
||||||
|
AuthState =:= ?STATE_ACTIVATED.
|
||||||
|
|
||||||
|
-spec change_status(Device :: #device{}, NewStatus :: integer()) -> NDevice :: #device{}.
|
||||||
|
change_status(Device = #device{status = Status}, NewStatus) when is_integer(NewStatus), Status =:= NewStatus ->
|
||||||
|
Device;
|
||||||
|
change_status(Device = #device{device_uuid = DeviceUUID}, ?DEVICE_ONLINE) ->
|
||||||
|
{ok, _} = device_bo:change_status(DeviceUUID, ?DEVICE_ONLINE),
|
||||||
|
report_event(DeviceUUID, ?DEVICE_ONLINE),
|
||||||
|
Device#device{status = ?DEVICE_ONLINE};
|
||||||
|
change_status(Device = #device{device_uuid = DeviceUUID}, ?DEVICE_OFFLINE) ->
|
||||||
|
{ok, #{<<"status">> := Status}} = device_bo:get_device_by_uuid(DeviceUUID),
|
||||||
|
case Status of
|
||||||
|
?DEVICE_NOT_JOINED ->
|
||||||
|
lager:debug("[iot_device] device: ~p, device_maybe_offline, not joined, can not change to offline", [DeviceUUID]),
|
||||||
|
Device#device{status = ?DEVICE_NOT_JOINED};
|
||||||
|
?DEVICE_OFFLINE ->
|
||||||
|
lager:debug("[iot_device] device: ~p, device_maybe_offline, is offline, do nothing", [DeviceUUID]),
|
||||||
|
Device#device{status = ?DEVICE_OFFLINE};
|
||||||
|
?DEVICE_ONLINE ->
|
||||||
|
{ok, _} = device_bo:change_status(DeviceUUID, ?DEVICE_OFFLINE),
|
||||||
|
report_event(DeviceUUID, ?DEVICE_OFFLINE),
|
||||||
|
Device#device{status = ?DEVICE_OFFLINE}
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec reload(Device :: #device{}) -> error | {ok, NDevice :: #device{}}.
|
||||||
|
reload(Device = #device{device_uuid = DeviceUUID}) ->
|
||||||
|
lager:debug("[iot_device] will reload: ~p", [DeviceUUID]),
|
||||||
|
case device_bo:get_device_by_uuid(DeviceUUID) of
|
||||||
|
{ok, #{<<"authorize_status">> := AuthorizeStatus, <<"status">> := Status}} ->
|
||||||
|
{ok, Device#device{device_uuid = DeviceUUID, status = Status, auth_state = auth_state(AuthorizeStatus)}};
|
||||||
|
undefined ->
|
||||||
|
lager:warning("[iot_device] device uuid: ~p, loaded from mysql failed", [DeviceUUID]),
|
||||||
|
error
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec auth(Device :: #device{}, Auth :: boolean()) -> NDevice :: #device{}.
|
||||||
|
auth(Device = #device{auth_state = StateName, device_uuid = DeviceUUID}, Auth) when is_boolean(Auth) ->
|
||||||
|
case {StateName, Auth} of
|
||||||
|
{?STATE_DENIED, false} ->
|
||||||
|
lager:debug("[iot_device] device_uuid: ~p, auth: false, will keep state_name: ~p", [DeviceUUID, ?STATE_DENIED]),
|
||||||
|
Device;
|
||||||
|
{?STATE_DENIED, true} ->
|
||||||
|
Device#device{auth_state = ?STATE_ACTIVATED};
|
||||||
|
{?STATE_ACTIVATED, false} ->
|
||||||
|
lager:debug("[iot_device] device_uuid: ~p, auth: false, state_name from: ~p, to: ~p", [DeviceUUID, ?STATE_ACTIVATED, ?STATE_DENIED]),
|
||||||
|
Device#device{auth_state = ?STATE_DENIED};
|
||||||
|
{?STATE_ACTIVATED, true} ->
|
||||||
|
lager:debug("[iot_device] device_uuid: ~p, auth: true, will keep state_name: ~p", [DeviceUUID, ?STATE_ACTIVATED]),
|
||||||
|
Device
|
||||||
|
end.
|
||||||
|
|
||||||
|
%%%===================================================================
|
||||||
|
%%% Internal functions
|
||||||
|
%%%===================================================================
|
||||||
|
|
||||||
|
-spec auth_state(integer()) -> atom().
|
||||||
|
auth_state(?DEVICE_AUTH_AUTHED) ->
|
||||||
|
?STATE_ACTIVATED;
|
||||||
|
auth_state(?DEVICE_AUTH_DENIED) ->
|
||||||
|
?STATE_DENIED.
|
||||||
|
|
||||||
|
-spec report_event(DeviceUUID :: binary(), NewStatus :: integer()) -> no_return().
|
||||||
|
report_event(DeviceUUID, NewStatus) when is_binary(DeviceUUID), is_integer(NewStatus) ->
|
||||||
|
TextMap = #{
|
||||||
|
0 => <<"离线"/utf8>>,
|
||||||
|
1 => <<"在线"/utf8>>
|
||||||
|
},
|
||||||
|
%% 设备的状态信息上报给中电
|
||||||
|
Timestamp = iot_util:timestamp_of_seconds(),
|
||||||
|
FieldsList = [#{
|
||||||
|
<<"key">> => <<"device_status">>,
|
||||||
|
<<"value">> => NewStatus,
|
||||||
|
<<"value_text">> => maps:get(NewStatus, TextMap),
|
||||||
|
<<"unit">> => 0,
|
||||||
|
<<"type">> => <<"DI">>,
|
||||||
|
<<"name">> => <<"设备状态"/utf8>>,
|
||||||
|
<<"timestamp">> => Timestamp
|
||||||
|
}],
|
||||||
|
iot_router:route_uuid(DeviceUUID, FieldsList, Timestamp),
|
||||||
|
lager:debug("[iot_device] device_uuid: ~p, route fields: ~p", [DeviceUUID, FieldsList]).
|
||||||
@ -145,7 +145,6 @@ init([UUID]) ->
|
|||||||
%% 通过host_id注册别名, 可以避免通过查询数据库获取HostPid
|
%% 通过host_id注册别名, 可以避免通过查询数据库获取HostPid
|
||||||
AliasName = get_alias_name(HostId),
|
AliasName = get_alias_name(HostId),
|
||||||
global:register_name(AliasName, self()),
|
global:register_name(AliasName, self()),
|
||||||
Aes = list_to_binary(iot_util:rand_bytes(32)),
|
|
||||||
|
|
||||||
%% 心跳检测机制
|
%% 心跳检测机制
|
||||||
erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat_ticker),
|
erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat_ticker),
|
||||||
@ -333,91 +332,28 @@ handle_event(cast, {handle, {ping, Metrics}}, ?STATE_ACTIVATED, State = #state{u
|
|||||||
{keep_state, State#state{metrics = Metrics}};
|
{keep_state, State#state{metrics = Metrics}};
|
||||||
|
|
||||||
handle_event(cast, {handle, {inform, Inform}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, host_id = HostId, has_session = true}) ->
|
handle_event(cast, {handle, {inform, Inform}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, host_id = HostId, has_session = true}) ->
|
||||||
#service_inform{name = Name, props = Props, version = Version, version_copy = VersionCopy, status = Status, at = At} = Inform,
|
#service_inform{service_id = ServiceId, status = Status, timestamp = Timestamp} = Inform,
|
||||||
lager:debug("[iot_host] host: ~p, service infos is: ~p", [UUID, Inform]),
|
lager:debug("[iot_host] host: ~p, service infos is: ~p", [UUID, Inform]),
|
||||||
%% props 主机id:场景id:微服务id
|
%% props 主机id:场景id:微服务id
|
||||||
{SceneId, MicroId} = parse_props(Props),
|
|
||||||
micro_inform_log:insert(#{
|
|
||||||
<<"host_id">> => HostId,
|
|
||||||
<<"scene_id">> => SceneId,
|
|
||||||
<<"service_name">> => Name,
|
|
||||||
<<"version">> => Version,
|
|
||||||
<<"version_copy">> => VersionCopy,
|
|
||||||
<<"status">> => Status,
|
|
||||||
<<"created_at">> => At
|
|
||||||
}),
|
|
||||||
micro_set_bo:change_status(HostId, SceneId, MicroId, Status),
|
|
||||||
{keep_state, State};
|
|
||||||
|
|
||||||
handle_event(cast, {handle, {feedback_step, FeedbackStep}}, ?STATE_ACTIVATED, State = #state{has_session = true}) ->
|
|
||||||
#feedback_step{task_id = TaskId, code = Code} = FeedbackStep,
|
|
||||||
Result = scene_feedback_step:insert(#{
|
|
||||||
<<"task_id">> => TaskId,
|
|
||||||
<<"code">> => Code,
|
|
||||||
<<"created_at">> => iot_util:current_time()
|
|
||||||
}),
|
|
||||||
lager:debug("[iot_host] feedback_step: ~p, insert result: ~p", [FeedbackStep, Result]),
|
|
||||||
|
|
||||||
{keep_state, State};
|
{keep_state, State};
|
||||||
|
|
||||||
handle_event(cast, {handle, {feedback_result, FeedbackResult}}, ?STATE_ACTIVATED, State = #state{has_session = true}) ->
|
handle_event(cast, {handle, {feedback_result, FeedbackResult}}, ?STATE_ACTIVATED, State = #state{has_session = true}) ->
|
||||||
#feedback_result{task_id = TaskId, time = Time, code = Code, reason = Reason, error = Error, task_type = Type} = FeedbackResult,
|
#feedback_phase{task_id = TaskId} = FeedbackResult,
|
||||||
scene_feedback:insert(#{
|
|
||||||
<<"task_id">> => TaskId,
|
|
||||||
<<"task_type">> => Type,
|
|
||||||
<<"code">> => Code,
|
|
||||||
<<"reason">> => Reason,
|
|
||||||
<<"error">> => Error,
|
|
||||||
<<"created_at">> => Time
|
|
||||||
}),
|
|
||||||
{keep_state, State};
|
{keep_state, State};
|
||||||
|
|
||||||
%handle_event(cast, {handle, {event, Event}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true}) ->
|
handle_event(cast, {handle, {event, Event}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true}) ->
|
||||||
% lager:debug("[iot_host] uuid: ~p, get event: ~p", [UUID, Event]),
|
lager:debug("[iot_host] uuid: ~p, get event: ~p", [UUID, Event]),
|
||||||
% #event{event_type = EventType, params = } = Event,
|
#event{service_id = ServiceId, event_type = EventType, params = Params} = Event,
|
||||||
%
|
|
||||||
% case catch jiffy:decode(EventText, [return_maps]) of
|
|
||||||
% #{<<"event_type">> := ?EVENT_DEVICE, <<"params">> := #{<<"device_uuid">> := DeviceUUID, <<"status">> := Status}} ->
|
|
||||||
% DevicePid = iot_device:get_pid(DeviceUUID),
|
|
||||||
% iot_device:change_status(DevicePid, Status);
|
|
||||||
% Event when is_map(Event) ->
|
|
||||||
% lager:warning("[iot_host] host: ~p, event: ~p, not supported", [UUID, Event]);
|
|
||||||
% Other ->
|
|
||||||
% lager:warning("[iot_host] host: ~p, event error: ~p", [UUID, Other])
|
|
||||||
% end,
|
|
||||||
% {keep_state, State};
|
|
||||||
|
|
||||||
%handle_event(cast, {handle, {ai_event, Event0}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true}) ->
|
%DevicePid = iot_device:get_pid(DeviceUUID),
|
||||||
% EventText = iot_cipher_aes:decrypt(AES, Event0),
|
%iot_device:change_status(DevicePid, Status),
|
||||||
% lager:debug("[iot_host] uuid: ~p, get ai_event: ~p", [UUID, EventText]),
|
|
||||||
% case catch jiffy:decode(EventText, [return_maps]) of
|
{keep_state, State};
|
||||||
% #{<<"event_type">> := EventType, <<"params">> := Params0 = #{<<"device_uuid">> := DeviceUUID, <<"props">> := Props}} ->
|
|
||||||
% case iot_device:is_alive(DeviceUUID) of
|
handle_event(cast, {handle, {ai_event, AIEventPB}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true}) ->
|
||||||
% error ->
|
#ai_event{service_id = ServiceId, event_type = EventType, params = Params} = message_pb:decode_msg(AIEventPB, ai_event),
|
||||||
% lager:notice("[iot_host] uuid: ~p, device_uuid: ~p is not alive, get ai_event: ~p", [UUID, DeviceUUID, EventText]),
|
lager:debug("[iot_host] uuid: ~p, service_id: ~p, event_type: ~p, params: ~p", [UUID, ServiceId, EventType, Params]),
|
||||||
% ok;
|
{keep_state, State};
|
||||||
% {ok, DevicePid} ->
|
|
||||||
% Params = maps:remove(<<"props">>, Params0),
|
|
||||||
% {SceneId, MicroId} = parse_props(Props),
|
|
||||||
%
|
|
||||||
% %% 保存数据到mysql
|
|
||||||
% Message = iolist_to_binary(jiffy:encode(Params, [force_utf8])),
|
|
||||||
% case ai_event_logs_bo:insert(UUID, DeviceUUID, SceneId, MicroId, EventType, Message) of
|
|
||||||
% {ok, LogId} ->
|
|
||||||
% iot_api:ai_event(LogId);
|
|
||||||
% _ ->
|
|
||||||
% ok
|
|
||||||
% end,
|
|
||||||
% iot_device:change_status(DevicePid, ?DEVICE_ONLINE),
|
|
||||||
%
|
|
||||||
% iot_ai_router:route_uuid(DeviceUUID, EventType, Params)
|
|
||||||
% end;
|
|
||||||
% Event when is_map(Event) ->
|
|
||||||
% lager:warning("[iot_host] host: ~p, event: ~p, not supported", [UUID, Event]);
|
|
||||||
% Other ->
|
|
||||||
% lager:warning("[iot_host] host: ~p, event error: ~p", [UUID, Other])
|
|
||||||
% end,
|
|
||||||
% {keep_state, State};
|
|
||||||
|
|
||||||
%% 心跳机制
|
%% 心跳机制
|
||||||
handle_event(cast, heartbeat, _, State = #state{heartbeat_counter = HeartbeatCounter}) ->
|
handle_event(cast, heartbeat, _, State = #state{heartbeat_counter = HeartbeatCounter}) ->
|
||||||
@ -541,13 +477,4 @@ state_map(#state{host_id = HostId, uuid = UUID, has_session = HasSession, heartb
|
|||||||
heartbeat_counter => HeartbeatCounter,
|
heartbeat_counter => HeartbeatCounter,
|
||||||
channel_pid => ChannelPid,
|
channel_pid => ChannelPid,
|
||||||
metrics => Metrics
|
metrics => Metrics
|
||||||
}.
|
}.
|
||||||
|
|
||||||
%% props 主机id:场景id:微服务id
|
|
||||||
-spec parse_props(Props :: undefined | binary()) -> {SceneId :: integer(), MicroId :: integer()}.
|
|
||||||
parse_props(Props) when is_binary(Props) ->
|
|
||||||
%% props 主机id:场景id:微服务id
|
|
||||||
[_, SceneId0, MicroId0] = binary:split(Props, <<":">>, [global]),
|
|
||||||
SceneId = binary_to_integer(SceneId0),
|
|
||||||
MicroId = binary_to_integer(MicroId0),
|
|
||||||
{SceneId, MicroId}.
|
|
||||||
@ -100,35 +100,21 @@ handle_info({tcp, Socket, <<?PACKET_REQUEST, PacketId:32, ?METHOD_AUTH:8, AuthRe
|
|||||||
ok ->
|
ok ->
|
||||||
%% 建立到host的monitor
|
%% 建立到host的monitor
|
||||||
erlang:monitor(process, HostPid),
|
erlang:monitor(process, HostPid),
|
||||||
AuthReplyBin = message_pb:encode_msg(#auth_reply{
|
AuthReplyBin = message_pb:encode_msg(#auth_reply{code = 1, message = <<"ok">>}),
|
||||||
code = 1,
|
|
||||||
message = <<"ok">>,
|
|
||||||
repository_url = <<"https://www.baidu.com">>
|
|
||||||
}),
|
|
||||||
Transport:send(Socket, <<?PACKET_RESPONSE, PacketId:32, 0:8, AuthReplyBin/binary>>),
|
Transport:send(Socket, <<?PACKET_RESPONSE, PacketId:32, 0:8, AuthReplyBin/binary>>),
|
||||||
|
|
||||||
{noreply, State#state{uuid = UUID, host_pid = HostPid}};
|
{noreply, State#state{uuid = UUID, host_pid = HostPid}};
|
||||||
{denied, Reason} when is_binary(Reason) ->
|
{denied, Reason} when is_binary(Reason) ->
|
||||||
erlang:monitor(process, HostPid),
|
erlang:monitor(process, HostPid),
|
||||||
AuthReplyBin = message_pb:encode_msg(#auth_reply{
|
AuthReplyBin = message_pb:encode_msg(#auth_reply{code = -1, message = Reason}),
|
||||||
code = -1,
|
|
||||||
message = Reason,
|
|
||||||
repository_url = <<"">>
|
|
||||||
}),
|
|
||||||
Transport:send(Socket, <<?PACKET_RESPONSE, PacketId:32, 0:8, AuthReplyBin/binary>>),
|
Transport:send(Socket, <<?PACKET_RESPONSE, PacketId:32, 0:8, AuthReplyBin/binary>>),
|
||||||
|
|
||||||
lager:debug("[ws_channel] uuid: ~p, attach channel get error: ~p, stop channel", [UUID, Reason]),
|
lager:debug("[ws_channel] uuid: ~p, attach channel get error: ~p, stop channel", [UUID, Reason]),
|
||||||
|
|
||||||
{noreply, State#state{uuid = UUID, host_pid = HostPid}};
|
{noreply, State#state{uuid = UUID, host_pid = HostPid}};
|
||||||
|
|
||||||
{error, Reason} when is_binary(Reason) ->
|
{error, Reason} when is_binary(Reason) ->
|
||||||
AuthReplyBin = message_pb:encode_msg(#auth_reply{
|
AuthReplyBin = message_pb:encode_msg(#auth_reply{code = -2, message = Reason}),
|
||||||
code = -2,
|
|
||||||
message = Reason,
|
|
||||||
repository_url = <<"">>
|
|
||||||
}),
|
|
||||||
Transport:send(Socket, <<?PACKET_RESPONSE, PacketId:32, 0:8, AuthReplyBin/binary>>),
|
Transport:send(Socket, <<?PACKET_RESPONSE, PacketId:32, 0:8, AuthReplyBin/binary>>),
|
||||||
|
|
||||||
lager:debug("[ws_channel] uuid: ~p, attach channel get error: ~p, stop channel", [UUID, Reason]),
|
lager:debug("[ws_channel] uuid: ~p, attach channel get error: ~p, stop channel", [UUID, Reason]),
|
||||||
|
|
||||||
{stop, State}
|
{stop, State}
|
||||||
@ -183,8 +169,6 @@ handle_info({tcp, Socket, <<?PACKET_PUBLISH_RESPONSE, PacketId:32, Body/binary>>
|
|||||||
{noreply, State};
|
{noreply, State};
|
||||||
{{ReceiverPid, Ref}, NInflight} ->
|
{{ReceiverPid, Ref}, NInflight} ->
|
||||||
case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of
|
case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of
|
||||||
true when Body == <<>> ->
|
|
||||||
ReceiverPid ! {ws_response, Ref};
|
|
||||||
true ->
|
true ->
|
||||||
ReceiverPid ! {ws_response, Ref, Body};
|
ReceiverPid ! {ws_response, Ref, Body};
|
||||||
false ->
|
false ->
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user