From cdb75fa4e38262e46ab65c6b076b02e5aeef0b89 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Wed, 17 Sep 2025 11:01:11 +0800 Subject: [PATCH] fix --- apps/efka/src/websocket/ws_channel.erl | 38 +++++++++++++------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/apps/efka/src/websocket/ws_channel.erl b/apps/efka/src/websocket/ws_channel.erl index 6e5d090..181be76 100644 --- a/apps/efka/src/websocket/ws_channel.erl +++ b/apps/efka/src/websocket/ws_channel.erl @@ -13,7 +13,7 @@ %% API -export([init/2]). -export([websocket_init/1, websocket_handle/2, websocket_info/2, terminate/3]). --export([invoke/4, channel_reply/3]). +-export([invoke/4]). %% 最大的等待时间 -define(PENDING_TIMEOUT, 10 * 1000). @@ -30,21 +30,6 @@ invoke(ChannelPid, Ref, ReceiverPid, Payload) when is_pid(ChannelPid), is_pid(ReceiverPid), is_binary(Payload), is_reference(Ref) -> ChannelPid ! {invoke, Ref, ReceiverPid, Payload}. -%% 超时逻辑处理 -channel_reply(Id, Reply, Inflight) -> - case maps:take(Id, Inflight) of - error -> - Inflight; - {{ReceiverPid, Ref}, NInflight} -> - case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of - true -> - ReceiverPid ! {channel_reply, Ref, Reply}; - false -> - ok - end, - NInflight - end. - %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% 逻辑处理方法 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -69,7 +54,7 @@ websocket_handle({binary, <>}, State = #state{i #{<<"id">> := Id, <<"error">> := #{<<"code">> := _Code, <<"message">> := Error}} -> {Id, {error, Error}} end, - NInflight = ws_channel:channel_reply(PacketId, Reply, Inflight), + NInflight = channel_reply(PacketId, Reply, Inflight), {ok, State#state{inflight = NInflight}}; websocket_handle(Info, State) -> @@ -93,7 +78,7 @@ websocket_info({topic_broadcast, Topic, Content}, State = #state{}) -> %% 超时逻辑处理 websocket_info({timeout, _, {pending_timeout, Id}}, State = #state{inflight = Inflight}) -> - NInflight = ws_channel:channel_reply(Id, {error, <<"timeout">>}, Inflight), + NInflight = channel_reply(Id, {error, <<"timeout">>}, Inflight), {ok, State#state{inflight = NInflight}}; %% service进程关闭 @@ -189,4 +174,19 @@ json_error(Id, Code, Message) when is_integer(Id), is_integer(Code), is_binary(M <<"message">> => Message } }, - jiffy:encode(Response, [force_utf8]). \ No newline at end of file + jiffy:encode(Response, [force_utf8]). + +%% 超时逻辑处理 +channel_reply(Id, Reply, Inflight) -> + case maps:take(Id, Inflight) of + error -> + Inflight; + {{ReceiverPid, Ref}, NInflight} -> + case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of + true -> + ReceiverPid ! {channel_reply, Ref, Reply}; + false -> + ok + end, + NInflight + end. \ No newline at end of file