fix channel

This commit is contained in:
anlicheng 2025-08-27 15:07:46 +08:00
parent f991ea2fac
commit 3a8c621f7b
2 changed files with 37 additions and 60 deletions

View File

@ -127,35 +127,24 @@ handle_info({tcp, Socket, <<?PACKET_REQUEST:8, Data/binary>>}, State = #state{so
%% micro-client:response => efka %% micro-client:response => efka
handle_info({tcp, Socket, <<?PACKET_RESPONSE:8, Data/binary>>}, State = #state{socket = Socket, inflight = Inflight}) -> handle_info({tcp, Socket, <<?PACKET_RESPONSE:8, Data/binary>>}, State = #state{socket = Socket, inflight = Inflight}) ->
Resp = jiffy:decode(Data, [return_maps]), Resp = jiffy:decode(Data, [return_maps]),
case Resp of {PacketId, Reply} = case Resp of
#{<<"id">> := Id, <<"result">> := Result} -> #{<<"id">> := Id, <<"result">> := Result} ->
case maps:take(Id, Inflight) of {Id, {ok, Result}};
error -> #{<<"id">> := Id, <<"error">> := #{<<"code">> := _Code, <<"message">> := Error}} ->
lager:warning("[tcp_channel] get unknown publish response message: ~p, packet_id: ~p", [Resp, Id]), {Id, {error, Error}}
{noreply, State}; end,
{{ReceiverPid, Ref}, NInflight} -> case maps:take(PacketId, Inflight) of
case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of error ->
true -> lager:warning("[tcp_channel] get unknown publish response message: ~p, packet_id: ~p", [Resp, PacketId]),
ReceiverPid ! {channel_reply, Ref, {ok, Result}}; {noreply, State};
false -> {{ReceiverPid, Ref}, NInflight} ->
lager:warning("[tcp_channel] get publish response message: ~p, packet_id: ~p, but receiver_pid is deaded", [Resp, Id]) case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of
end, true ->
{noreply, State#state{inflight = NInflight}} ReceiverPid ! {channel_reply, Ref, Reply};
end; false ->
#{<<"id">> := Id, <<"error">> := #{<<"code">> := _Code, <<"message">> := Error}} -> lager:warning("[tcp_channel] get publish response message: ~p, packet_id: ~p, but receiver_pid is deaded", [Resp, PacketId])
case maps:take(Id, Inflight) of end,
error -> {noreply, State#state{inflight = NInflight}}
lager:warning("[tcp_channel] get unknown publish response message: ~p, packet_id: ~p", [Resp, Id]),
{noreply, State};
{{ReceiverPid, Ref}, NInflight} ->
case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of
true ->
ReceiverPid ! {channel_reply, Ref, {error, Error}};
false ->
lager:warning("[tcp_channel] get publish response message: ~p, packet_id: ~p, but receiver_pid is deaded", [Resp, Id])
end,
{noreply, State#state{inflight = NInflight}}
end
end; end;
%% %%

View File

@ -36,8 +36,6 @@ push_config(ChannelPid, Ref, ReceiverPid, ConfigJson) when is_pid(ChannelPid), i
invoke(ChannelPid, Ref, ReceiverPid, Payload) when is_pid(ChannelPid), is_pid(ReceiverPid), is_binary(Payload), is_reference(Ref) -> invoke(ChannelPid, Ref, ReceiverPid, Payload) when is_pid(ChannelPid), is_pid(ReceiverPid), is_binary(Payload), is_reference(Ref) ->
ChannelPid ! {invoke, Ref, ReceiverPid, Payload}. ChannelPid ! {invoke, Ref, ReceiverPid, Payload}.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% %%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
@ -57,36 +55,26 @@ websocket_handle({binary, <<?PACKET_REQUEST, Data/binary>>}, State) ->
%% micro-client:response => efka %% micro-client:response => efka
websocket_handle({binary, <<?PACKET_RESPONSE:8, Data/binary>>}, State = #state{inflight = Inflight}) -> websocket_handle({binary, <<?PACKET_RESPONSE:8, Data/binary>>}, State = #state{inflight = Inflight}) ->
Resp = jiffy:decode(Data, [return_maps]), Resp = jiffy:decode(Data, [return_maps]),
case Resp of {PacketId, Reply} = case Resp of
#{<<"id">> := Id, <<"result">> := Result} -> #{<<"id">> := Id, <<"result">> := Result} ->
case maps:take(Id, Inflight) of {Id, {ok, Result}};
error -> #{<<"id">> := Id, <<"error">> := #{<<"code">> := _Code, <<"message">> := Error}} ->
lager:warning("[tcp_channel] get unknown publish response message: ~p, packet_id: ~p", [Resp, Id]), {Id, {error, Error}}
{ok, State}; end,
{{ReceiverPid, Ref}, NInflight} -> case maps:take(PacketId, Inflight) of
case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of error ->
true -> lager:warning("[tcp_channel] get unknown publish response message: ~p, packet_id: ~p", [Resp, PacketId]),
ReceiverPid ! {channel_reply, Ref, {ok, Result}}; {ok, State};
false -> {{ReceiverPid, Ref}, NInflight} ->
lager:warning("[tcp_channel] get publish response message: ~p, packet_id: ~p, but receiver_pid is deaded", [Resp, Id]) case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of
end, true ->
{ok, State#state{inflight = NInflight}} ReceiverPid ! {channel_reply, Ref, Reply};
end; false ->
#{<<"id">> := Id, <<"error">> := #{<<"code">> := _Code, <<"message">> := Error}} -> lager:warning("[tcp_channel] get publish response message: ~p, packet_id: ~p, but receiver_pid is deaded", [Resp, PacketId])
case maps:take(Id, Inflight) of end,
error -> {ok, State#state{inflight = NInflight}}
lager:warning("[tcp_channel] get unknown publish response message: ~p, packet_id: ~p", [Resp, Id]),
{ok, State};
{{ReceiverPid, Ref}, NInflight} ->
case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of
true ->
ReceiverPid ! {channel_reply, Ref, {error, Error}};
false ->
lager:warning("[tcp_channel] get publish response message: ~p, packet_id: ~p, but receiver_pid is deaded", [Resp, Id])
end,
{ok, State#state{inflight = NInflight}}
end
end; end;
websocket_handle(Info, State) -> websocket_handle(Info, State) ->
lager:error("[ws_channel] get a unknown message: ~p, channel will closed", [Info]), lager:error("[ws_channel] get a unknown message: ~p, channel will closed", [Info]),
{stop, State}. {stop, State}.