diff --git a/apps/efka/src/gen_channel.erl b/apps/efka/src/gen_channel.erl new file mode 100644 index 0000000..5071f1a --- /dev/null +++ b/apps/efka/src/gen_channel.erl @@ -0,0 +1,68 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2025, +%%% @doc +%%% +%%% @end +%%% Created : 27. 8月 2025 15:22 +%%%------------------------------------------------------------------- +-module(gen_channel). +-author("anlicheng"). +-include("efka_service.hrl"). + +-export([push_config/4, invoke/4, channel_reply/3]). +-export([next_packet_id/1]). +-export([json_result/2, json_error/3]). + +%%%=================================================================== +%%% API +%%%=================================================================== + +-spec push_config(ChannelPid :: pid(), Ref :: reference(), ReceiverPid :: pid(), ConfigJson :: binary()) -> no_return(). +push_config(ChannelPid, Ref, ReceiverPid, ConfigJson) when is_pid(ChannelPid), is_pid(ReceiverPid), is_binary(ConfigJson), is_reference(Ref) -> + ChannelPid ! {push_config, Ref, ReceiverPid, ConfigJson}. + +-spec invoke(ChannelPid :: pid(), Ref :: reference(), ReceiverPid :: pid(), Payload :: binary()) -> no_return(). +invoke(ChannelPid, Ref, ReceiverPid, Payload) when is_pid(ChannelPid), is_pid(ReceiverPid), is_binary(Payload), is_reference(Ref) -> + ChannelPid ! {invoke, Ref, ReceiverPid, Payload}. + +%% 超时逻辑处理 +channel_reply(Id, Reply, Inflight) -> + case maps:take(Id, Inflight) of + error -> + Inflight; + {{ReceiverPid, Ref}, NInflight} -> + case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of + true -> + ReceiverPid ! {channel_reply, Ref, Reply}; + false -> + ok + end, + NInflight + end. + +%% 采用32位编码 +-spec next_packet_id(PacketId :: integer()) -> NextPacketId :: integer(). +next_packet_id(PacketId) when PacketId >= 4294967295 -> + 1; +next_packet_id(PacketId) -> + PacketId + 1. + +-spec json_result(Id :: integer(), Result :: term()) -> binary(). +json_result(Id, Result) when is_integer(Id) -> + Response = #{ + <<"id">> => Id, + <<"result">> => Result + }, + jiffy:encode(Response, [force_utf8]). + +-spec json_error(Id :: integer(), Code :: integer(), Message :: binary()) -> binary(). +json_error(Id, Code, Message) when is_integer(Id), is_integer(Code), is_binary(Message) -> + Response = #{ + <<"id">> => Id, + <<"error">> => #{ + <<"code">> => Code, + <<"message">> => Message + } + }, + jiffy:encode(Response, [force_utf8]). \ No newline at end of file diff --git a/apps/efka/src/tcp_server/tcp_channel.erl b/apps/efka/src/tcp_server/tcp_channel.erl index 4247916..f37ce4c 100644 --- a/apps/efka/src/tcp_server/tcp_channel.erl +++ b/apps/efka/src/tcp_server/tcp_channel.erl @@ -14,7 +14,6 @@ %% API -export([start_link/1]). --export([push_config/4, invoke/4]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -40,14 +39,6 @@ %%% API %%%=================================================================== --spec push_config(ChannelPid :: pid(), Ref :: reference(), ReceiverPid :: pid(), ConfigJson :: binary()) -> no_return(). -push_config(ChannelPid, Ref, ReceiverPid, ConfigJson) when is_pid(ChannelPid), is_pid(ReceiverPid), is_binary(ConfigJson), is_reference(Ref) -> - gen_server:cast(ChannelPid, {push_config, Ref, ReceiverPid, ConfigJson}). - --spec invoke(ChannelPid :: pid(), Ref :: reference(), ReceiverPid :: pid(), Payload :: binary()) -> no_return(). -invoke(ChannelPid, Ref, ReceiverPid, Payload) when is_pid(ChannelPid), is_pid(ReceiverPid), is_binary(Payload), is_reference(Ref) -> - gen_server:cast(ChannelPid, {invoke, Ref, ReceiverPid, Payload}). - %% @doc Spawns the server and registers the local name (unique) -spec(start_link(Socket :: gen_tcp:socket()) -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). @@ -87,24 +78,6 @@ handle_call(_Request, _From, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -%% 推送配置项目 -handle_cast({push_config, Ref, ReceiverPid, ConfigJson}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> - PushConfig = #{<<"id">> => PacketId, <<"method">> => <<"push_config">>, <<"params">> => #{<<"config">> => ConfigJson}}, - Packet = jiffy:encode(PushConfig, [force_utf8]), - ok = gen_tcp:send(Socket, <>), - - erlang:start_timer(?PENDING_TIMEOUT, self(), {pending_timeout, PacketId}), - {noreply, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}; - -%% 远程调用 -handle_cast({invoke, Ref, ReceiverPid, Payload}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> - PushConfig = #{<<"id">> => PacketId, <<"method">> => <<"invoke">>, <<"params">> => #{<<"payload">> => Payload}}, - Packet = jiffy:encode(PushConfig, [force_utf8]), - ok = gen_tcp:send(Socket, <>), - - erlang:start_timer(?PENDING_TIMEOUT, self(), {pending_timeout, PacketId}), - {noreply, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}; - handle_cast(_Request, State = #state{}) -> {noreply, State}. @@ -114,6 +87,25 @@ handle_cast(_Request, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). + +%% 推送配置项目 +handle_info({push_config, Ref, ReceiverPid, ConfigJson}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> + PushConfig = #{<<"id">> => PacketId, <<"method">> => <<"push_config">>, <<"params">> => #{<<"config">> => ConfigJson}}, + Packet = jiffy:encode(PushConfig, [force_utf8]), + ok = gen_tcp:send(Socket, <>), + + erlang:start_timer(?PENDING_TIMEOUT, self(), {pending_timeout, PacketId}), + {noreply, State#state{packet_id = gen_channel:next_packet_id(PacketId), inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}; + +%% 远程调用 +handle_info({invoke, Ref, ReceiverPid, Payload}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> + PushConfig = #{<<"id">> => PacketId, <<"method">> => <<"invoke">>, <<"params">> => #{<<"payload">> => Payload}}, + Packet = jiffy:encode(PushConfig, [force_utf8]), + ok = gen_tcp:send(Socket, <>), + + erlang:start_timer(?PENDING_TIMEOUT, self(), {pending_timeout, PacketId}), + {noreply, State#state{packet_id = gen_channel:next_packet_id(PacketId), inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}; + %% 处理micro-client:request => efka 主动的请求 handle_info({tcp, Socket, <>}, State = #state{socket = Socket}) -> Request = jiffy:decode(Data, [return_maps]), @@ -208,20 +200,20 @@ handle_request(#{<<"id">> := Id, <<"method">> := <<"register">>, <<"params">> := case efka_service:get_pid(ServiceId) of undefined -> lager:warning("[efka_tcp_channel] service_id: ~p, not running", [ServiceId]), - Packet = json_error(Id, -1, <<"service not running">>), + Packet = gen_channel:json_error(Id, -1, <<"service not running">>), ok = gen_tcp:send(Socket, <>), {stop, normal, State}; ServicePid when is_pid(ServicePid) -> case efka_service:attach_channel(ServicePid, self()) of ok -> - Packet = json_result(Id, <<"ok">>), + Packet = gen_channel:json_result(Id, <<"ok">>), erlang:monitor(process, ServicePid), ok = gen_tcp:send(Socket, <>), {ok, State#state{service_id = ServiceId, service_pid = ServicePid, is_registered = true}}; {error, Error} -> lager:warning("[efka_tcp_channel] service_id: ~p, attach_channel get error: ~p", [ServiceId, Error]), - Packet = json_error(Id, -1, Error), + Packet = gen_channel:json_error(Id, -1, Error), ok = gen_tcp:send(Socket, <>), {stop, normal, State} end @@ -230,7 +222,7 @@ handle_request(#{<<"id">> := Id, <<"method">> := <<"register">>, <<"params">> := %% 请求参数 handle_request(#{<<"id">> := Id, <<"method">> := <<"request_config">>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) -> {ok, ConfigJson} = efka_service:request_config(ServicePid), - Packet = json_result(Id, ConfigJson), + Packet = gen_channel:json_result(Id, ConfigJson), ok = gen_tcp:send(Socket, <>), {ok, State}; @@ -248,30 +240,4 @@ handle_request(#{<<"id">> := 0, <<"method">> := <<"event">>, <<"params">> := #{< %% 订阅事件 handle_request(#{<<"id">> := 0, <<"method">> := <<"subscribe">>, <<"params">> := #{<<"topic">> := Topic}}, State = #state{is_registered = true}) -> efka_subscription:subscribe(Topic, self()), - {ok, State}. - -%% 采用32位编码 --spec next_packet_id(PacketId :: integer()) -> NextPacketId :: integer(). -next_packet_id(PacketId) when PacketId >= 4294967295 -> - 1; -next_packet_id(PacketId) -> - PacketId + 1. - --spec json_result(Id :: integer(), Result :: term()) -> binary(). -json_result(Id, Result) when is_integer(Id) -> - Response = #{ - <<"id">> => Id, - <<"result">> => Result - }, - jiffy:encode(Response, [force_utf8]). - --spec json_error(Id :: integer(), Code :: integer(), Message :: binary()) -> binary(). -json_error(Id, Code, Message) when is_integer(Id), is_integer(Code), is_binary(Message) -> - Response = #{ - <<"id">> => Id, - <<"error">> => #{ - <<"code">> => Code, - <<"message">> => Message - } - }, - jiffy:encode(Response, [force_utf8]). \ No newline at end of file + {ok, State}. \ No newline at end of file diff --git a/apps/efka/src/websocket_server/ws_channel.erl b/apps/efka/src/websocket_server/ws_channel.erl index aed9d43..2c25691 100644 --- a/apps/efka/src/websocket_server/ws_channel.erl +++ b/apps/efka/src/websocket_server/ws_channel.erl @@ -13,7 +13,6 @@ %% API -export([init/2]). -export([websocket_init/1, websocket_handle/2, websocket_info/2, terminate/3]). --export([push_config/4, invoke/4]). %% 最大的等待时间 -define(PENDING_TIMEOUT, 10 * 1000). @@ -28,14 +27,6 @@ inflight = #{} }). --spec push_config(ChannelPid :: pid(), Ref :: reference(), ReceiverPid :: pid(), ConfigJson :: binary()) -> no_return(). -push_config(ChannelPid, Ref, ReceiverPid, ConfigJson) when is_pid(ChannelPid), is_pid(ReceiverPid), is_binary(ConfigJson), is_reference(Ref) -> - ChannelPid ! {push_config, Ref, ReceiverPid, ConfigJson}. - --spec invoke(ChannelPid :: pid(), Ref :: reference(), ReceiverPid :: pid(), Payload :: binary()) -> no_return(). -invoke(ChannelPid, Ref, ReceiverPid, Payload) when is_pid(ChannelPid), is_pid(ReceiverPid), is_binary(Payload), is_reference(Ref) -> - ChannelPid ! {invoke, Ref, ReceiverPid, Payload}. - %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% 逻辑处理方法 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -79,14 +70,13 @@ websocket_handle(Info, State) -> lager:error("[ws_channel] get a unknown message: ~p, channel will closed", [Info]), {stop, State}. - websocket_info({push_config, Ref, ReceiverPid, ConfigJson}, State = #state{packet_id = PacketId, inflight = Inflight}) -> PushConfig = #{<<"id">> => PacketId, <<"method">> => <<"push_config">>, <<"params">> => #{<<"config">> => ConfigJson}}, Packet = jiffy:encode(PushConfig, [force_utf8]), Reply = <>, erlang:start_timer(?PENDING_TIMEOUT, self(), {pending_timeout, PacketId}), - {reply, {binary, Reply}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}; + {reply, {binary, Reply}, State#state{packet_id = gen_channel:next_packet_id(PacketId), inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}; %% 远程调用 websocket_info({invoke, Ref, ReceiverPid, Payload}, State = #state{packet_id = PacketId, inflight = Inflight}) -> @@ -95,7 +85,7 @@ websocket_info({invoke, Ref, ReceiverPid, Payload}, State = #state{packet_id = P Reply = <>, erlang:start_timer(?PENDING_TIMEOUT, self(), {pending_timeout, PacketId}), - {reply, {binary, Reply}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}; + {reply, {binary, Reply}, State#state{packet_id = gen_channel:next_packet_id(PacketId), inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}; %% 订阅的消息 websocket_info({topic_broadcast, Topic, Content}, State = #state{}) -> @@ -150,21 +140,21 @@ handle_request(#{<<"id">> := Id, <<"method">> := <<"register">>, <<"params">> := case efka_service:get_pid(ServiceId) of undefined -> lager:warning("[efka_tcp_channel] service_id: ~p, not running", [ServiceId]), - Packet = json_error(Id, -1, <<"service not running">>), + Packet = gen_channel:json_error(Id, -1, <<"service not running">>), Reply = <>, delay_stop(10, normal), {reply, {binary, Reply}, State}; ServicePid when is_pid(ServicePid) -> case efka_service:attach_channel(ServicePid, self()) of ok -> - Packet = json_result(Id, <<"ok">>), + Packet = gen_channel:json_result(Id, <<"ok">>), erlang:monitor(process, ServicePid), Reply = <>, {reply, {binary, Reply}, State#state{service_id = ServiceId, service_pid = ServicePid, is_registered = true}}; {error, Error} -> lager:warning("[efka_tcp_channel] service_id: ~p, attach_channel get error: ~p", [ServiceId, Error]), - Packet = json_error(Id, -1, Error), + Packet = gen_channel:json_error(Id, -1, Error), Reply = <>, delay_stop(10, normal), {reply, {binary, Reply}, State} @@ -174,7 +164,7 @@ handle_request(#{<<"id">> := Id, <<"method">> := <<"register">>, <<"params">> := %% 请求参数 handle_request(#{<<"id">> := Id, <<"method">> := <<"request_config">>}, State = #state{service_pid = ServicePid, is_registered = true}) -> {ok, ConfigJson} = efka_service:request_config(ServicePid), - Packet = json_result(Id, ConfigJson), + Packet = gen_channel:json_result(Id, ConfigJson), Reply = <>, {reply, {binary, Reply}, State}; @@ -194,31 +184,5 @@ handle_request(#{<<"id">> := 0, <<"method">> := <<"subscribe">>, <<"params">> := efka_subscription:subscribe(Topic, self()), {ok, State}. -%% 采用32位编码 --spec next_packet_id(PacketId :: integer()) -> NextPacketId :: integer(). -next_packet_id(PacketId) when PacketId >= 4294967295 -> - 1; -next_packet_id(PacketId) -> - PacketId + 1. - --spec json_result(Id :: integer(), Result :: term()) -> binary(). -json_result(Id, Result) when is_integer(Id) -> - Response = #{ - <<"id">> => Id, - <<"result">> => Result - }, - jiffy:encode(Response, [force_utf8]). - --spec json_error(Id :: integer(), Code :: integer(), Message :: binary()) -> binary(). -json_error(Id, Code, Message) when is_integer(Id), is_integer(Code), is_binary(Message) -> - Response = #{ - <<"id">> => Id, - <<"error">> => #{ - <<"code">> => Code, - <<"message">> => Message - } - }, - jiffy:encode(Response, [force_utf8]). - delay_stop(Timeout, Reason) when is_integer(Timeout) -> erlang:start_timer(Timeout, self(), {stop, Reason}). \ No newline at end of file