diff --git a/apps/efka/src/efka.app.src b/apps/efka/src/efka.app.src index 4415217..32fda82 100644 --- a/apps/efka/src/efka.app.src +++ b/apps/efka/src/efka.app.src @@ -11,8 +11,6 @@ esockd, jiffy, gpb, - cowlib, - gun, mnesia, crypto, ssl, diff --git a/apps/efka/src/efka_agent.erl b/apps/efka/src/efka_agent.erl index e2f4463..3d7d23e 100644 --- a/apps/efka/src/efka_agent.erl +++ b/apps/efka/src/efka_agent.erl @@ -4,7 +4,7 @@ %%% @doc %%% %%% @end -%%% Created : 20. 4月 2025 16:25 +%%% Created : 20. 4月 2025 18:47 %%%------------------------------------------------------------------- -module(efka_agent). -author("anlicheng"). @@ -17,18 +17,9 @@ %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --define(SERVER, ?MODULE). +-define(SERVER, ?MODULE). --define(DISCONNECTED, 0). --define(CONNECTING, 1). --define(CONNECTED, 2). - --record(state, { - conn_pid :: pid() | undefined, - host :: string(), - port :: integer(), - status = ?DISCONNECTED -}). +-record(state, {}). %%%=================================================================== %%% API @@ -50,19 +41,12 @@ start_link() -> {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). init([]) -> - {ok, Props} = application:get_env(efka, wss_server), - Host = proplists:get_value(host, Props), - Port = proplists:get_value(port, Props), - - %% 异步建立到服务器的链接 - erlang:start_timer(0, self(), create_connection), - - {ok, #state{host = Host, port = Port}}. + {ok, #state{}}. %% @private %% @doc Handling call messages -spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, - State :: #state{}) -> + State :: #state{}) -> {reply, Reply :: term(), NewState :: #state{}} | {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | {noreply, NewState :: #state{}} | @@ -87,41 +71,6 @@ handle_cast(_Request, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_info({timeout, _, create_connection}, State = #state{host = Host, port = Port}) -> - case connect(Host, Port) of - {ok, ConnPid} -> - {noreply, State#state{conn_pid = ConnPid, status = ?CONNECTING}}; - {error, Reason} -> - lager:debug("[efka_agent] connect host: ~p, port: ~p, get error: ~p", [Host, Port, Reason]), - retry_connect(), - {noreply, State#state{conn_pid = undefined, status = ?DISCONNECTED}} - end; - -%% upgrade逻辑处理 -handle_info({gun_upgrade, ConnPid, _StreamRef, [<<"websocket">>], Headers}, State = #state{conn_pid = ConnPid}) -> - lager:debug("[efka_agent] upgrade success, headers: ~p", [Headers]), - {noreply, State#state{status = ?CONNECTED}}; -handle_info({gun_response, ConnPid, _, _, Status, Headers}, State = #state{conn_pid = ConnPid}) -> - lager:debug("[efka_agent] upgrade failed, status: ~p, headers: ~p", [Status, Headers]), - retry_connect(), - gun:close(ConnPid), - {noreply, State#state{conn_pid = undefined, status = ?DISCONNECTED}}; - -%% 处理接受到的消息 -handle_info({gun_ws, ConnPid, {binary, Bin}}, State = #state{conn_pid = ConnPid}) -> - lager:debug("[efka_agent] get binary message: ~p", [Bin]), - {noreply, State}; - -handle_info({gun_ws, ConnPid, {text, Msg}}, State = #state{conn_pid = ConnPid}) -> - lager:debug("[efka_agent] get text message: ~p", [Msg]), - {noreply, State}; - -%% websocket链接关闭 -handle_info({gun_ws, ConnPid, close}, State = #state{conn_pid = ConnPid}) -> - lager:debug("[efka_agent] ws close"), - retry_connect(), - {noreply, State#state{conn_pid = undefined}}; - handle_info(_Info, State = #state{}) -> {noreply, State}. @@ -131,14 +80,14 @@ handle_info(_Info, State = #state{}) -> %% necessary cleaning up. When it returns, the gen_server terminates %% with Reason. The return value is ignored. -spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), - State :: #state{}) -> term()). + State :: #state{}) -> term()). terminate(_Reason, _State = #state{}) -> ok. %% @private %% @doc Convert process state when code is changed -spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, - Extra :: term()) -> + Extra :: term()) -> {ok, NewState :: #state{}} | {error, Reason :: term()}). code_change(_OldVsn, State = #state{}, _Extra) -> {ok, State}. @@ -146,32 +95,3 @@ code_change(_OldVsn, State = #state{}, _Extra) -> %%%=================================================================== %%% Internal functions %%%=================================================================== - -retry_connect() -> - erlang:start_timer(5000, self(), create_connection). - --spec connect(Host :: string(), Port :: integer()) -> {ok, ConnPid :: pid()} | {error, Reason :: any()}. -connect(Host, Port) when is_list(Host), is_integer(Port) -> - Opts = #{ - protocols => [http], - transport => tcp - }, - case gun:open(Host, Port, Opts) of - {ok, ConnPid} -> - %% 等待连接建立 - receive - {gun_up, ConnPid, http} -> - %% 发起 WebSocket 握手请求 - gun:ws_upgrade(ConnPid, "/", [ - {<<"connection">>, <<"upgrade">>}, - {<<"upgrade">>, <<"websocket">>}, - {<<"sec-websocket-version">>, <<"13">>}, - {<<"sec-websocket-key">>, base64:encode(crypto:strong_rand_bytes(16))} - ]), - {ok, ConnPid} - after 5000 -> - {error, connect_timeout} - end; - {error, Reason} -> - {error, Reason} - end. diff --git a/rebar.config b/rebar.config index 5de6a1a..32a9019 100644 --- a/rebar.config +++ b/rebar.config @@ -5,7 +5,6 @@ {esockd, ".*", {git, "https://github.com/emqx/esockd.git", {tag, "v5.8.0"}}}, {jiffy, ".*", {git, "https://github.com/davisp/jiffy.git", {tag, "1.1.2"}}}, {gpb, ".*", {git, "https://github.com/tomas-abrahamsson/gpb.git", {tag, "4.20.0"}}}, - {gun, ".*", {git, "https://github.com/ninenines/gun.git", {tag, "2.2.0"}}}, {parse_trans, ".*", {git, "https://github.com/uwiger/parse_trans", {tag, "3.0.0"}}}, {lager, ".*", {git,"https://github.com/erlang-lager/lager.git", {tag, "3.9.2"}}} ]}.