This commit is contained in:
anlicheng 2023-09-19 11:14:31 +08:00
parent 455961ef86
commit a40bffe215

View File

@ -38,7 +38,6 @@
%% websocket相关
channel_pid :: undefined | pid(),
monitor_ref :: undefined | reference(),
%%
metrics = #{} :: map()
@ -231,52 +230,47 @@ handle_call({publish_message, _, _, _}, _From, State = #state{uuid = UUID}) ->
{reply, {error, <<"主机状态错误,发送命令失败"/utf8>>}, State};
%%
handle_call({activate, Auth}, _From, State = #state{uuid = UUID, host_id = HostId, monitor_ref = MRef, channel_pid = ChannelPid, has_session = false}) ->
handle_call({activate, Auth}, _From, State = #state{uuid = UUID, host_id = HostId, channel_pid = ChannelPid, has_session = false}) ->
case Auth of
true ->
ok;
false ->
%% channel也是要删除的
is_reference(MRef) andalso erlang:demonitor(MRef),
is_pid(ChannelPid) andalso ws_channel:stop(ChannelPid, closed),
is_pid(ChannelPid) andalso exit(ChannelPid, kill),
{ok, _} = host_bo:change_status(UUID, ?HOST_OFFLINE),
report_event(UUID, ?HOST_OFFLINE),
change_devices_status(HostId, ?DEVICE_UNKNOWN)
end,
{reply, ok, State#state{monitor_ref = undefined, channel_pid = undefined}};
{reply, ok, State#state{channel_pid = undefined}};
handle_call({activate, Auth}, _From, State = #state{host_id = HostId, uuid = UUID, monitor_ref = MRef, channel_pid = ChannelPid, has_session = true}) ->
handle_call({activate, Auth}, _From, State = #state{host_id = HostId, uuid = UUID, channel_pid = ChannelPid, has_session = true}) ->
case Auth of
true ->
ok;
false ->
%% monitor
erlang:demonitor(MRef),
ws_channel:stop(ChannelPid, closed),
exit(ChannelPid, kill),
{ok, _} = host_bo:change_status(UUID, ?HOST_OFFLINE),
report_event(UUID, ?HOST_OFFLINE),
change_devices_status(HostId, ?DEVICE_UNKNOWN)
end,
{reply, ok, State#state{monitor_ref = undefined, channel_pid = undefined, has_session = false}};
{reply, ok, State#state{channel_pid = undefined, has_session = false}};
%% channel
handle_call({attach_channel, ChannelPid}, _From, State = #state{uuid = UUID, channel_pid = undefined}) ->
lager:debug("[iot_host] attach_channel host_id uuid: ~p, channel: ~p", [UUID, ChannelPid]),
MRef = erlang:monitor(process, ChannelPid),
{reply, ok, State#state{channel_pid = ChannelPid, monitor_ref = MRef}};
erlang:monitor(process, ChannelPid),
handle_call({attach_channel, ChannelPid}, _From, State = #state{uuid = UUID, monitor_ref = OldMRef, channel_pid = OldChannelPid}) when is_pid(OldChannelPid) ->
{reply, ok, State#state{channel_pid = ChannelPid}};
handle_call({attach_channel, ChannelPid}, _From, State = #state{uuid = UUID, channel_pid = OldChannelPid}) when is_pid(OldChannelPid) ->
lager:debug("[iot_host] attach_channel host_id uuid: ~p, old channel: ~p replace with: ~p", [UUID, OldChannelPid, ChannelPid]),
%% monitor
erlang:demonitor(OldMRef),
ws_channel:stop(OldChannelPid, closed),
%% channel的monitor
MRef = erlang:monitor(process, ChannelPid),
{reply, ok, State#state{channel_pid = ChannelPid, monitor_ref = MRef}};
erlang:monitor(process, ChannelPid),
{reply, ok, State#state{channel_pid = ChannelPid}};
%% 线
handle_call({create_session, PubKey}, _From, State = #state{uuid = UUID, aes = Aes}) ->
@ -446,10 +440,10 @@ handle_cast(heartbeat, State = #state{uuid = UUID, heartbeat_counter = Heartbeat
%% 线: "未知"
handle_info({timeout, _, heartbeat_ticker}, State = #state{uuid = UUID, host_id = HostId, heartbeat_counter = 0, has_session = false}) ->
lager:warning("[iot_host] uuid: ~p, heartbeat lost, devices will unknown", [UUID]),
change_devices_status(HostId, ?DEVICE_UNKNOWN),
{ok, _} = host_bo:change_status(UUID, ?HOST_OFFLINE),
report_event(UUID, ?HOST_OFFLINE),
change_devices_status(HostId, ?DEVICE_UNKNOWN),
erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat_ticker),
{noreply, State#state{heartbeat_counter = 0}};
%%
@ -458,10 +452,14 @@ handle_info({timeout, _, heartbeat_ticker}, State = #state{}) ->
{noreply, State#state{heartbeat_counter = 0}};
%% websocket断开的时候
handle_info({'DOWN', Ref, process, ChannelPid, Reason}, State = #state{uuid = UUID, monitor_ref = Ref, channel_pid = ChannelPid, has_session = HasSession}) ->
handle_info({'DOWN', _Ref, process, ChannelPid, Reason}, State = #state{uuid = UUID, channel_pid = ChannelPid, has_session = HasSession}) ->
lager:warning("[iot_host] uuid: ~p, channel: ~p, down with reason: ~p, has_session: ~p, state: ~p", [UUID, ChannelPid, Reason, HasSession, State]),
{noreply, State#state{channel_pid = undefined, has_session = false}};
handle_info({'DOWN', _Ref, process, Pid, Reason}, State = #state{uuid = UUID}) ->
lager:debug("[iot_host] uuid: ~p, process_pid: ~p, down with reason: ~p, state: ~p", [UUID, Pid, Reason, State]),
{noreply, State};
handle_info(Info, State = #state{has_session = HasSession}) ->
lager:warning("[iot_host] unknown info: ~p, state: ~p", [Info, HasSession]),
@ -547,7 +545,7 @@ report_event(UUID, NewStatus) when is_binary(UUID), is_integer(NewStatus) ->
lager:debug("[iot_host] host_uuid: ~p, route fields: ~p", [UUID, FieldsList]).
%% state转换成map
state_map(#state{host_id = HostId, uuid = UUID, aes = Aes, has_session = HasSession, heartbeat_counter = HeartbeatCounter, channel_pid = ChannelPid, monitor_ref = MonitorRef, metrics = Metrics}) ->
state_map(#state{host_id = HostId, uuid = UUID, aes = Aes, has_session = HasSession, heartbeat_counter = HeartbeatCounter, channel_pid = ChannelPid, metrics = Metrics}) ->
#{
host_id => HostId,
uuid => UUID,
@ -555,6 +553,5 @@ state_map(#state{host_id = HostId, uuid = UUID, aes = Aes, has_session = HasSess
has_session => HasSession,
heartbeat_counter => HeartbeatCounter,
channel_pid => ChannelPid,
monitor_ref => MonitorRef,
metrics => Metrics
}.