From 3a8c621f7b4f5a92cd64df7b331c0fcdecf1b5dc Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Wed, 27 Aug 2025 15:07:46 +0800 Subject: [PATCH] fix channel --- apps/efka/src/tcp_server/tcp_channel.erl | 47 +++++++---------- apps/efka/src/websocket_server/ws_channel.erl | 50 +++++++------------ 2 files changed, 37 insertions(+), 60 deletions(-) diff --git a/apps/efka/src/tcp_server/tcp_channel.erl b/apps/efka/src/tcp_server/tcp_channel.erl index fecc64f..4247916 100644 --- a/apps/efka/src/tcp_server/tcp_channel.erl +++ b/apps/efka/src/tcp_server/tcp_channel.erl @@ -127,35 +127,24 @@ handle_info({tcp, Socket, <>}, State = #state{so %% 处理micro-client:response => efka 的响应 handle_info({tcp, Socket, <>}, State = #state{socket = Socket, inflight = Inflight}) -> Resp = jiffy:decode(Data, [return_maps]), - case Resp of - #{<<"id">> := Id, <<"result">> := Result} -> - case maps:take(Id, Inflight) of - error -> - lager:warning("[tcp_channel] get unknown publish response message: ~p, packet_id: ~p", [Resp, Id]), - {noreply, State}; - {{ReceiverPid, Ref}, NInflight} -> - case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of - true -> - ReceiverPid ! {channel_reply, Ref, {ok, Result}}; - false -> - lager:warning("[tcp_channel] get publish response message: ~p, packet_id: ~p, but receiver_pid is deaded", [Resp, Id]) - end, - {noreply, State#state{inflight = NInflight}} - end; - #{<<"id">> := Id, <<"error">> := #{<<"code">> := _Code, <<"message">> := Error}} -> - case maps:take(Id, Inflight) of - error -> - lager:warning("[tcp_channel] get unknown publish response message: ~p, packet_id: ~p", [Resp, Id]), - {noreply, State}; - {{ReceiverPid, Ref}, NInflight} -> - case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of - true -> - ReceiverPid ! {channel_reply, Ref, {error, Error}}; - false -> - lager:warning("[tcp_channel] get publish response message: ~p, packet_id: ~p, but receiver_pid is deaded", [Resp, Id]) - end, - {noreply, State#state{inflight = NInflight}} - end + {PacketId, Reply} = case Resp of + #{<<"id">> := Id, <<"result">> := Result} -> + {Id, {ok, Result}}; + #{<<"id">> := Id, <<"error">> := #{<<"code">> := _Code, <<"message">> := Error}} -> + {Id, {error, Error}} + end, + case maps:take(PacketId, Inflight) of + error -> + lager:warning("[tcp_channel] get unknown publish response message: ~p, packet_id: ~p", [Resp, PacketId]), + {noreply, State}; + {{ReceiverPid, Ref}, NInflight} -> + case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of + true -> + ReceiverPid ! {channel_reply, Ref, Reply}; + false -> + lager:warning("[tcp_channel] get publish response message: ~p, packet_id: ~p, but receiver_pid is deaded", [Resp, PacketId]) + end, + {noreply, State#state{inflight = NInflight}} end; %% 超时逻辑处理 diff --git a/apps/efka/src/websocket_server/ws_channel.erl b/apps/efka/src/websocket_server/ws_channel.erl index 22f3e57..aed9d43 100644 --- a/apps/efka/src/websocket_server/ws_channel.erl +++ b/apps/efka/src/websocket_server/ws_channel.erl @@ -36,8 +36,6 @@ push_config(ChannelPid, Ref, ReceiverPid, ConfigJson) when is_pid(ChannelPid), i invoke(ChannelPid, Ref, ReceiverPid, Payload) when is_pid(ChannelPid), is_pid(ReceiverPid), is_binary(Payload), is_reference(Ref) -> ChannelPid ! {invoke, Ref, ReceiverPid, Payload}. - - %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% 逻辑处理方法 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -57,36 +55,26 @@ websocket_handle({binary, <>}, State) -> %% 处理micro-client:response => efka 的响应 websocket_handle({binary, <>}, State = #state{inflight = Inflight}) -> Resp = jiffy:decode(Data, [return_maps]), - case Resp of - #{<<"id">> := Id, <<"result">> := Result} -> - case maps:take(Id, Inflight) of - error -> - lager:warning("[tcp_channel] get unknown publish response message: ~p, packet_id: ~p", [Resp, Id]), - {ok, State}; - {{ReceiverPid, Ref}, NInflight} -> - case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of - true -> - ReceiverPid ! {channel_reply, Ref, {ok, Result}}; - false -> - lager:warning("[tcp_channel] get publish response message: ~p, packet_id: ~p, but receiver_pid is deaded", [Resp, Id]) - end, - {ok, State#state{inflight = NInflight}} - end; - #{<<"id">> := Id, <<"error">> := #{<<"code">> := _Code, <<"message">> := Error}} -> - case maps:take(Id, Inflight) of - error -> - lager:warning("[tcp_channel] get unknown publish response message: ~p, packet_id: ~p", [Resp, Id]), - {ok, State}; - {{ReceiverPid, Ref}, NInflight} -> - case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of - true -> - ReceiverPid ! {channel_reply, Ref, {error, Error}}; - false -> - lager:warning("[tcp_channel] get publish response message: ~p, packet_id: ~p, but receiver_pid is deaded", [Resp, Id]) - end, - {ok, State#state{inflight = NInflight}} - end + {PacketId, Reply} = case Resp of + #{<<"id">> := Id, <<"result">> := Result} -> + {Id, {ok, Result}}; + #{<<"id">> := Id, <<"error">> := #{<<"code">> := _Code, <<"message">> := Error}} -> + {Id, {error, Error}} + end, + case maps:take(PacketId, Inflight) of + error -> + lager:warning("[tcp_channel] get unknown publish response message: ~p, packet_id: ~p", [Resp, PacketId]), + {ok, State}; + {{ReceiverPid, Ref}, NInflight} -> + case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of + true -> + ReceiverPid ! {channel_reply, Ref, Reply}; + false -> + lager:warning("[tcp_channel] get publish response message: ~p, packet_id: ~p, but receiver_pid is deaded", [Resp, PacketId]) + end, + {ok, State#state{inflight = NInflight}} end; + websocket_handle(Info, State) -> lager:error("[ws_channel] get a unknown message: ~p, channel will closed", [Info]), {stop, State}.