add efka agent

This commit is contained in:
anlicheng 2025-04-20 18:48:24 +08:00
parent 9d473d26fb
commit d4fbd14b91
3 changed files with 7 additions and 90 deletions

View File

@ -11,8 +11,6 @@
esockd,
jiffy,
gpb,
cowlib,
gun,
mnesia,
crypto,
ssl,

View File

@ -4,7 +4,7 @@
%%% @doc
%%%
%%% @end
%%% Created : 20. 4 2025 16:25
%%% Created : 20. 4 2025 18:47
%%%-------------------------------------------------------------------
-module(efka_agent).
-author("anlicheng").
@ -19,16 +19,7 @@
-define(SERVER, ?MODULE).
-define(DISCONNECTED, 0).
-define(CONNECTING, 1).
-define(CONNECTED, 2).
-record(state, {
conn_pid :: pid() | undefined,
host :: string(),
port :: integer(),
status = ?DISCONNECTED
}).
-record(state, {}).
%%%===================================================================
%%% API
@ -50,14 +41,7 @@ start_link() ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([]) ->
{ok, Props} = application:get_env(efka, wss_server),
Host = proplists:get_value(host, Props),
Port = proplists:get_value(port, Props),
%%
erlang:start_timer(0, self(), create_connection),
{ok, #state{host = Host, port = Port}}.
{ok, #state{}}.
%% @private
%% @doc Handling call messages
@ -87,41 +71,6 @@ handle_cast(_Request, State = #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info({timeout, _, create_connection}, State = #state{host = Host, port = Port}) ->
case connect(Host, Port) of
{ok, ConnPid} ->
{noreply, State#state{conn_pid = ConnPid, status = ?CONNECTING}};
{error, Reason} ->
lager:debug("[efka_agent] connect host: ~p, port: ~p, get error: ~p", [Host, Port, Reason]),
retry_connect(),
{noreply, State#state{conn_pid = undefined, status = ?DISCONNECTED}}
end;
%% upgrade逻辑处理
handle_info({gun_upgrade, ConnPid, _StreamRef, [<<"websocket">>], Headers}, State = #state{conn_pid = ConnPid}) ->
lager:debug("[efka_agent] upgrade success, headers: ~p", [Headers]),
{noreply, State#state{status = ?CONNECTED}};
handle_info({gun_response, ConnPid, _, _, Status, Headers}, State = #state{conn_pid = ConnPid}) ->
lager:debug("[efka_agent] upgrade failed, status: ~p, headers: ~p", [Status, Headers]),
retry_connect(),
gun:close(ConnPid),
{noreply, State#state{conn_pid = undefined, status = ?DISCONNECTED}};
%%
handle_info({gun_ws, ConnPid, {binary, Bin}}, State = #state{conn_pid = ConnPid}) ->
lager:debug("[efka_agent] get binary message: ~p", [Bin]),
{noreply, State};
handle_info({gun_ws, ConnPid, {text, Msg}}, State = #state{conn_pid = ConnPid}) ->
lager:debug("[efka_agent] get text message: ~p", [Msg]),
{noreply, State};
%% websocket链接关闭
handle_info({gun_ws, ConnPid, close}, State = #state{conn_pid = ConnPid}) ->
lager:debug("[efka_agent] ws close"),
retry_connect(),
{noreply, State#state{conn_pid = undefined}};
handle_info(_Info, State = #state{}) ->
{noreply, State}.
@ -146,32 +95,3 @@ code_change(_OldVsn, State = #state{}, _Extra) ->
%%%===================================================================
%%% Internal functions
%%%===================================================================
retry_connect() ->
erlang:start_timer(5000, self(), create_connection).
-spec connect(Host :: string(), Port :: integer()) -> {ok, ConnPid :: pid()} | {error, Reason :: any()}.
connect(Host, Port) when is_list(Host), is_integer(Port) ->
Opts = #{
protocols => [http],
transport => tcp
},
case gun:open(Host, Port, Opts) of
{ok, ConnPid} ->
%%
receive
{gun_up, ConnPid, http} ->
%% WebSocket
gun:ws_upgrade(ConnPid, "/", [
{<<"connection">>, <<"upgrade">>},
{<<"upgrade">>, <<"websocket">>},
{<<"sec-websocket-version">>, <<"13">>},
{<<"sec-websocket-key">>, base64:encode(crypto:strong_rand_bytes(16))}
]),
{ok, ConnPid}
after 5000 ->
{error, connect_timeout}
end;
{error, Reason} ->
{error, Reason}
end.

View File

@ -5,7 +5,6 @@
{esockd, ".*", {git, "https://github.com/emqx/esockd.git", {tag, "v5.8.0"}}},
{jiffy, ".*", {git, "https://github.com/davisp/jiffy.git", {tag, "1.1.2"}}},
{gpb, ".*", {git, "https://github.com/tomas-abrahamsson/gpb.git", {tag, "4.20.0"}}},
{gun, ".*", {git, "https://github.com/ninenines/gun.git", {tag, "2.2.0"}}},
{parse_trans, ".*", {git, "https://github.com/uwiger/parse_trans", {tag, "3.0.0"}}},
{lager, ".*", {git,"https://github.com/erlang-lager/lager.git", {tag, "3.9.2"}}}
]}.