This commit is contained in:
anlicheng 2025-09-17 11:01:11 +08:00
parent 71297abd93
commit cdb75fa4e3

View File

@ -13,7 +13,7 @@
%% API
-export([init/2]).
-export([websocket_init/1, websocket_handle/2, websocket_info/2, terminate/3]).
-export([invoke/4, channel_reply/3]).
-export([invoke/4]).
%%
-define(PENDING_TIMEOUT, 10 * 1000).
@ -30,21 +30,6 @@
invoke(ChannelPid, Ref, ReceiverPid, Payload) when is_pid(ChannelPid), is_pid(ReceiverPid), is_binary(Payload), is_reference(Ref) ->
ChannelPid ! {invoke, Ref, ReceiverPid, Payload}.
%%
channel_reply(Id, Reply, Inflight) ->
case maps:take(Id, Inflight) of
error ->
Inflight;
{{ReceiverPid, Ref}, NInflight} ->
case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of
true ->
ReceiverPid ! {channel_reply, Ref, Reply};
false ->
ok
end,
NInflight
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
@ -69,7 +54,7 @@ websocket_handle({binary, <<?PACKET_RESPONSE:8, Data/binary>>}, State = #state{i
#{<<"id">> := Id, <<"error">> := #{<<"code">> := _Code, <<"message">> := Error}} ->
{Id, {error, Error}}
end,
NInflight = ws_channel:channel_reply(PacketId, Reply, Inflight),
NInflight = channel_reply(PacketId, Reply, Inflight),
{ok, State#state{inflight = NInflight}};
websocket_handle(Info, State) ->
@ -93,7 +78,7 @@ websocket_info({topic_broadcast, Topic, Content}, State = #state{}) ->
%%
websocket_info({timeout, _, {pending_timeout, Id}}, State = #state{inflight = Inflight}) ->
NInflight = ws_channel:channel_reply(Id, {error, <<"timeout">>}, Inflight),
NInflight = channel_reply(Id, {error, <<"timeout">>}, Inflight),
{ok, State#state{inflight = NInflight}};
%% service进程关闭
@ -189,4 +174,19 @@ json_error(Id, Code, Message) when is_integer(Id), is_integer(Code), is_binary(M
<<"message">> => Message
}
},
jiffy:encode(Response, [force_utf8]).
jiffy:encode(Response, [force_utf8]).
%%
channel_reply(Id, Reply, Inflight) ->
case maps:take(Id, Inflight) of
error ->
Inflight;
{{ReceiverPid, Ref}, NInflight} ->
case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of
true ->
ReceiverPid ! {channel_reply, Ref, Reply};
false ->
ok
end,
NInflight
end.