From 3ab07c4fe338c3fad77ed9178815d2196249c5d5 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Tue, 20 May 2025 15:14:54 +0800 Subject: [PATCH] fix client broker --- apps/efka/src/client/efka_client.erl | 6 +- apps/efka/src/client/efka_client_broker.erl | 108 ++++++++++++++++++++ apps/efka/src/efka_service.erl | 2 +- 3 files changed, 112 insertions(+), 4 deletions(-) create mode 100644 apps/efka/src/client/efka_client_broker.erl diff --git a/apps/efka/src/client/efka_client.erl b/apps/efka/src/client/efka_client.erl index ba9f36b..a6cf6ca 100644 --- a/apps/efka/src/client/efka_client.erl +++ b/apps/efka/src/client/efka_client.erl @@ -45,7 +45,7 @@ }). test() -> - start_link(<<"test">>, "localhost", 18080). + start_link(<<"service1234">>, "localhost", 18088). -spec controller_process(ControllerPid :: pid()) -> ok. controller_process(ControllerPid) when is_pid(ControllerPid) -> @@ -251,7 +251,7 @@ handle_info({tcp, Socket, <>}, State = #state{socke case jiffy:decode(Packet, [return_maps]) of #{<<"id">> := Id, <<"method">> := <<"push_config">>, <<"params">> := #{<<"config">> := ConfigJson}} -> Ref = make_ref(), - ControllerPid ! {push_config, Ref, ConfigJson}, + ControllerPid ! {push_config, self(), Ref, ConfigJson}, Reply = receive {push_config_reply, Ref, ok} -> @@ -265,7 +265,7 @@ handle_info({tcp, Socket, <>}, State = #state{socke #{<<"id">> := Id, <<"method">> := <<"invoke">>, <<"params">> := #{<<"payload">> := Payload}} -> Ref = make_ref(), - ControllerPid ! {invoke, Ref, Payload}, + ControllerPid ! {invoke, self(), Ref, Payload}, Reply = receive {invoke_reply, Ref, {ok, Result}} -> diff --git a/apps/efka/src/client/efka_client_broker.erl b/apps/efka/src/client/efka_client_broker.erl new file mode 100644 index 0000000..5a36762 --- /dev/null +++ b/apps/efka/src/client/efka_client_broker.erl @@ -0,0 +1,108 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2025, +%%% @doc +%%% +%%% @end +%%% Created : 20. 5月 2025 15:09 +%%%------------------------------------------------------------------- +-module(efka_client_broker). +-author("anlicheng"). + +-behaviour(gen_server). + +%% API +-export([start_link/0]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, { + +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link() -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([]) -> + efka_client:start_link(<<"service1234">>, "localhost", 18088), + efka_client:controller_process(self()), + {ok, #state{}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(_Request, _From, State = #state{}) -> + {reply, ok, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast(_Request, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info({push_config, ReceiverPid, Ref, Config}, State = #state{}) -> + lager:debug("[efka_client_broker] get push_config: ~p", [jiffy:decode(Config, [return_maps])]), + ReceiverPid ! {push_config_reply, Ref, ok}, + {noreply, State}; +handle_info({invoke, ReceiverPid, Ref, Payload}, State = #state{}) -> + lager:debug("[efka_client_broker] get invoke: ~p", [Payload]), + ReceiverPid ! {invoke_reply, Ref, ok}, + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== diff --git a/apps/efka/src/efka_service.erl b/apps/efka/src/efka_service.erl index 04f0b97..8c58075 100644 --- a/apps/efka/src/efka_service.erl +++ b/apps/efka/src/efka_service.erl @@ -163,7 +163,7 @@ handle_cast({push_config, Ref, ReceiverPid, ConfigJson}, State = #state{channel_ {noreply, State#state{inflight = maps:put(Ref, ReceiverPid, Inflight)}}; false -> ReceiverPid ! {service_reply, Ref, {error, <<"channel is not alive">>}}, - {reply, State} + {noreply, State} end; %% 推送配置项目