fix channel

This commit is contained in:
anlicheng 2023-08-10 15:01:49 +08:00
parent 4eefb7fc9f
commit ef6a81ce44

View File

@ -54,61 +54,48 @@ websocket_init(_State) ->
websocket_handle({binary, <<?PACKET_REQUEST, PacketId:32, ?METHOD_REGISTER:8, Data/binary>>}, State) -> websocket_handle({binary, <<?PACKET_REQUEST, PacketId:32, ?METHOD_REGISTER:8, Data/binary>>}, State) ->
#{<<"uuid">> := UUID, <<"timestamp">> := Timestamp, <<"salt">> := Salt, <<"username">> := Username, <<"token">> := Token} = jiffy:decode(Data, [return_maps]), #{<<"uuid">> := UUID, <<"timestamp">> := Timestamp, <<"salt">> := Salt, <<"username">> := Username, <<"token">> := Token} = jiffy:decode(Data, [return_maps]),
lager:debug("[ws_channel] register uuid: ~p, messag: ~p", [UUID, Data]), lager:debug("[ws_channel] register uuid: ~p, request message: ~p", [UUID, Data]),
case iot_auth:check(Username, Token, UUID, Salt, Timestamp) of case iot_auth:check(Username, Token, UUID, Salt, Timestamp) of
true -> true ->
%% %%
case host_bo:ensured_host(UUID) of ok = host_bo:ensured_host(UUID),
ok -> %%
lager:debug("[ws_channel] register success, host uuid: ~p", [UUID]), {ok, HostPid} = iot_host_sup:ensured_host_started(UUID),
%% ok = iot_host:attach_channel(HostPid, self()),
{ok, HostPid} = iot_host_sup:ensured_host_started(UUID), %% host的monitor
ok = iot_host:attach_channel(HostPid, self()), erlang:monitor(process, HostPid),
%% host的monitor
erlang:monitor(process, HostPid),
Reply = jiffy:encode(#{ Reply = jiffy:encode(#{<<"code">> => 1, <<"message">> => <<"ok">>}, [force_utf8]),
<<"code">> => 1,
<<"message">> => <<"ok">>
}),
{reply, {binary, <<?PACKET_RESPONSE, PacketId:32, 0:8, Reply/binary>>}, State#state{uuid = UUID, host_pid = HostPid}};
{error, Reason} -> lager:debug("[ws_channel] register success, host uuid: ~p", [UUID]),
lager:warning("[ws_channel] register failed, uuid: ~p, reason: ~p", [UUID, Reason]),
{stop, State} {reply, {binary, <<?PACKET_RESPONSE, PacketId:32, 0:8, Reply/binary>>}, State#state{uuid = UUID, host_pid = HostPid}};
end;
false -> false ->
lager:warning("[ws_channel] uuid: ~p, user: ~p, auth failed", [UUID, Username]), lager:warning("[ws_channel] uuid: ~p, user: ~p, auth failed", [UUID, Username]),
{stop, State} {stop, State}
end; end;
websocket_handle({binary, <<?PACKET_REQUEST, PacketId:32, ?METHOD_CREATE_SESSION:8, PubKey/binary>>}, State = #state{host_pid = HostPid, uuid = UUID}) when is_pid(HostPid) -> websocket_handle({binary, <<?PACKET_REQUEST, PacketId:32, ?METHOD_CREATE_SESSION:8, PubKey/binary>>}, State = #state{host_pid = HostPid}) when is_pid(HostPid) ->
lager:debug("[ws_channel] create session, uuid: ~p", [UUID]),
{ok, Reply} = iot_host:create_session(HostPid, PubKey), {ok, Reply} = iot_host:create_session(HostPid, PubKey),
{reply, {binary, <<?PACKET_RESPONSE, PacketId:32, Reply/binary>>}, State}; {reply, {binary, <<?PACKET_RESPONSE, PacketId:32, Reply/binary>>}, State};
websocket_handle({binary, <<?PACKET_REQUEST, _PacketId:32, ?METHOD_DATA:8, Data/binary>>}, State = #state{uuid = UUID, host_pid = HostPid}) when is_pid(HostPid) -> websocket_handle({binary, <<?PACKET_REQUEST, _PacketId:32, ?METHOD_DATA:8, Data/binary>>}, State = #state{host_pid = HostPid}) when is_pid(HostPid) ->
lager:debug("[ws_channel] data uuid: ~p, data: ~p", [UUID, Data]),
iot_host:handle(HostPid, {data, Data}), iot_host:handle(HostPid, {data, Data}),
{ok, State}; {ok, State};
websocket_handle({binary, <<?PACKET_REQUEST, _PacketId:32, ?METHOD_PING:8, CipherMetric/binary>>}, State = #state{uuid = UUID, host_pid = HostPid}) when is_pid(HostPid) -> websocket_handle({binary, <<?PACKET_REQUEST, _PacketId:32, ?METHOD_PING:8, CipherMetric/binary>>}, State = #state{host_pid = HostPid}) when is_pid(HostPid) ->
lager:debug("[ws_channel] ping uuid: ~p", [UUID]),
iot_host:handle(HostPid, {ping, CipherMetric}), iot_host:handle(HostPid, {ping, CipherMetric}),
{ok, State}; {ok, State};
websocket_handle({binary, <<?PACKET_REQUEST, _PacketId:32, ?METHOD_INFORM:8, CipherInfo/binary>>}, State = #state{uuid = UUID, host_pid = HostPid}) when is_pid(HostPid) -> websocket_handle({binary, <<?PACKET_REQUEST, _PacketId:32, ?METHOD_INFORM:8, CipherInfo/binary>>}, State = #state{host_pid = HostPid}) when is_pid(HostPid) ->
lager:debug("[ws_channel] inform uuid: ~p", [UUID]),
iot_host:handle(HostPid, {inform, CipherInfo}), iot_host:handle(HostPid, {inform, CipherInfo}),
{ok, State}; {ok, State};
websocket_handle({binary, <<?PACKET_REQUEST, _PacketId:32, ?METHOD_FEEDBACK_STEP:8, CipherInfo/binary>>}, State = #state{uuid = UUID, host_pid = HostPid}) when is_pid(HostPid) -> websocket_handle({binary, <<?PACKET_REQUEST, _PacketId:32, ?METHOD_FEEDBACK_STEP:8, CipherInfo/binary>>}, State = #state{host_pid = HostPid}) when is_pid(HostPid) ->
lager:debug("[ws_channel] feedback step uuid: ~p", [UUID]),
iot_host:handle(HostPid, {feedback_step, CipherInfo}), iot_host:handle(HostPid, {feedback_step, CipherInfo}),
{ok, State}; {ok, State};
websocket_handle({binary, <<?PACKET_REQUEST, _PacketId:32, ?METHOD_FEEDBACK_RESULT:8, CipherInfo/binary>>}, State = #state{uuid = UUID, host_pid = HostPid}) when is_pid(HostPid) -> websocket_handle({binary, <<?PACKET_REQUEST, _PacketId:32, ?METHOD_FEEDBACK_RESULT:8, CipherInfo/binary>>}, State = #state{host_pid = HostPid}) when is_pid(HostPid) ->
lager:debug("[ws_channel] feedback result uuid: ~p", [UUID]),
iot_host:handle(HostPid, {feedback_result, CipherInfo}), iot_host:handle(HostPid, {feedback_result, CipherInfo}),
{ok, State}; {ok, State};
@ -126,13 +113,13 @@ websocket_handle({binary, <<?PACKET_PUBLISH_RESPONSE, PacketId:32, Body/binary>>
true -> true ->
ReceiverPid ! {ws_response, Ref, Body}; ReceiverPid ! {ws_response, Ref, Body};
false -> false ->
lager:warning("[ws_channel] get publish response message: ~p, packet_id: ~p, but receiver_pid is dead", [Body, PacketId]) lager:warning("[ws_channel] get publish response message: ~p, packet_id: ~p, but receiver_pid is deaded", [Body, PacketId])
end, end,
{ok, State#state{inflight = NInflight}} {ok, State#state{inflight = NInflight}}
end; end;
websocket_handle(Info, State) -> websocket_handle(Info, State) ->
lager:error("[ws_channel] get a error messag: ~p, channel will close", [Info]), lager:error("[ws_channel] get a unknown message: ~p, channel will closed", [Info]),
{stop, State}. {stop, State}.
%% %%
@ -147,7 +134,7 @@ websocket_info({publish, ReceiverPid, Ref, Msg}, State = #state{packet_id = Pack
%% %%
websocket_info({'DOWN', _, process, HostPid, Reason}, State = #state{uuid = UUID, host_pid = HostPid}) -> websocket_info({'DOWN', _, process, HostPid, Reason}, State = #state{uuid = UUID, host_pid = HostPid}) ->
lager:debug("[ws_channel] uuid: ~p, channel will close because user exited with reason: ~p", [UUID, Reason]), lager:debug("[ws_channel] uuid: ~p, channel will close because host exited with reason: ~p", [UUID, Reason]),
{stop, State}; {stop, State};
%% %%