Compare commits

...

3 Commits

Author SHA1 Message Date
e886dce82d fix agent 2025-04-20 19:31:36 +08:00
2bfc195c56 fix 2025-04-20 19:08:20 +08:00
d4fbd14b91 add efka agent 2025-04-20 18:48:24 +08:00
7 changed files with 33 additions and 80 deletions

View File

@ -5,15 +5,13 @@
{mod, {efka_app, []}}, {mod, {efka_app, []}},
{applications, {applications,
[ [
sync, % sync,
hackney, hackney,
lager, lager,
esockd, esockd,
jiffy, %jiffy,
gpb, %gpb,
cowlib, %mnesia,
gun,
mnesia,
crypto, crypto,
ssl, ssl,
public_key, public_key,

View File

@ -4,7 +4,7 @@
%%% @doc %%% @doc
%%% %%%
%%% @end %%% @end
%%% Created : 20. 4 2025 16:25 %%% Created : 20. 4 2025 18:47
%%%------------------------------------------------------------------- %%%-------------------------------------------------------------------
-module(efka_agent). -module(efka_agent).
-author("anlicheng"). -author("anlicheng").
@ -19,15 +19,10 @@
-define(SERVER, ?MODULE). -define(SERVER, ?MODULE).
-define(DISCONNECTED, 0).
-define(CONNECTING, 1).
-define(CONNECTED, 2).
-record(state, { -record(state, {
conn_pid :: pid() | undefined,
host :: string(), host :: string(),
port :: integer(), port :: integer(),
status = ?DISCONNECTED socket
}). }).
%%%=================================================================== %%%===================================================================
@ -50,11 +45,10 @@ start_link() ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore). {stop, Reason :: term()} | ignore).
init([]) -> init([]) ->
{ok, Props} = application:get_env(efka, wss_server), {ok, Props} = application:get_env(efka, tls_server),
Host = proplists:get_value(host, Props), Host = proplists:get_value(host, Props),
Port = proplists:get_value(port, Props), Port = proplists:get_value(port, Props),
%%
erlang:start_timer(0, self(), create_connection), erlang:start_timer(0, self(), create_connection),
{ok, #state{host = Host, port = Port}}. {ok, #state{host = Host, port = Port}}.
@ -62,7 +56,7 @@ init([]) ->
%% @private %% @private
%% @doc Handling call messages %% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, -spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) -> State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} | {reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} | {noreply, NewState :: #state{}} |
@ -89,38 +83,27 @@ handle_cast(_Request, State = #state{}) ->
{stop, Reason :: term(), NewState :: #state{}}). {stop, Reason :: term(), NewState :: #state{}}).
handle_info({timeout, _, create_connection}, State = #state{host = Host, port = Port}) -> handle_info({timeout, _, create_connection}, State = #state{host = Host, port = Port}) ->
case connect(Host, Port) of case connect(Host, Port) of
{ok, ConnPid} -> {ok, Socket} ->
{noreply, State#state{conn_pid = ConnPid, status = ?CONNECTING}}; {noreply, State#state{socket = Socket}};
{error, Reason} -> {error, Reason} ->
lager:debug("[efka_agent] connect host: ~p, port: ~p, get error: ~p", [Host, Port, Reason]), lager:debug("[efka_agent] create_connection get error: ~p", [Reason]),
retry_connect(), retry_connect(),
{noreply, State#state{conn_pid = undefined, status = ?DISCONNECTED}} {noreply, State#state{socket = undefined}}
end; end;
%% upgrade逻辑处理 handle_info({ssl, Socket, Data}, State = #state{socket = Socket}) ->
handle_info({gun_upgrade, ConnPid, _StreamRef, [<<"websocket">>], Headers}, State = #state{conn_pid = ConnPid}) -> lager:debug("[efka_agent] socket get message: ~p", [Data]),
lager:debug("[efka_agent] upgrade success, headers: ~p", [Headers]),
{noreply, State#state{status = ?CONNECTED}};
handle_info({gun_response, ConnPid, _, _, Status, Headers}, State = #state{conn_pid = ConnPid}) ->
lager:debug("[efka_agent] upgrade failed, status: ~p, headers: ~p", [Status, Headers]),
retry_connect(), retry_connect(),
gun:close(ConnPid), {noreply, State#state{socket = undefined}};
{noreply, State#state{conn_pid = undefined, status = ?DISCONNECTED}};
%% handle_info({ssl_error, Socket, Reason}, State = #state{socket = Socket}) ->
handle_info({gun_ws, ConnPid, {binary, Bin}}, State = #state{conn_pid = ConnPid}) -> lager:warning("[efka_agent] socket close with reason: ~p", [Reason]),
lager:debug("[efka_agent] get binary message: ~p", [Bin]),
{noreply, State};
handle_info({gun_ws, ConnPid, {text, Msg}}, State = #state{conn_pid = ConnPid}) ->
lager:debug("[efka_agent] get text message: ~p", [Msg]),
{noreply, State};
%% websocket链接关闭
handle_info({gun_ws, ConnPid, close}, State = #state{conn_pid = ConnPid}) ->
lager:debug("[efka_agent] ws close"),
retry_connect(), retry_connect(),
{noreply, State#state{conn_pid = undefined}}; {noreply, State#state{socket = undefined}};
handle_info({ssl_closed, Socket}, State = #state{socket = Socket}) ->
lager:warning("[efka_agent] socket closed"),
retry_connect(),
{noreply, State#state{socket = undefined}};
handle_info(_Info, State = #state{}) -> handle_info(_Info, State = #state{}) ->
{noreply, State}. {noreply, State}.
@ -131,14 +114,14 @@ handle_info(_Info, State = #state{}) ->
%% necessary cleaning up. When it returns, the gen_server terminates %% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored. %% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), -spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()). State :: #state{}) -> term()).
terminate(_Reason, _State = #state{}) -> terminate(_Reason, _State = #state{}) ->
ok. ok.
%% @private %% @private
%% @doc Convert process state when code is changed %% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, -spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) -> Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}). {ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) -> code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}. {ok, State}.
@ -150,28 +133,9 @@ code_change(_OldVsn, State = #state{}, _Extra) ->
retry_connect() -> retry_connect() ->
erlang:start_timer(5000, self(), create_connection). erlang:start_timer(5000, self(), create_connection).
-spec connect(Host :: string(), Port :: integer()) -> {ok, ConnPid :: pid()} | {error, Reason :: any()}.
connect(Host, Port) when is_list(Host), is_integer(Port) -> connect(Host, Port) when is_list(Host), is_integer(Port) ->
Opts = #{ SslOptions = [
protocols => [http], {verify, verify_none},
transport => tcp {active, true}
}, ],
case gun:open(Host, Port, Opts) of ssl:connect(Host, Port, SslOptions, 5000).
{ok, ConnPid} ->
%%
receive
{gun_up, ConnPid, http} ->
%% WebSocket
gun:ws_upgrade(ConnPid, "/", [
{<<"connection">>, <<"upgrade">>},
{<<"upgrade">>, <<"websocket">>},
{<<"sec-websocket-version">>, <<"13">>},
{<<"sec-websocket-key">>, base64:encode(crypto:strong_rand_bytes(16))}
]),
{ok, ConnPid}
after 5000 ->
{error, connect_timeout}
end;
{error, Reason} ->
{error, Reason}
end.

View File

@ -12,7 +12,7 @@
start(_StartType, _StartArgs) -> start(_StartType, _StartArgs) ->
io:setopts([{encoding, unicode}]), io:setopts([{encoding, unicode}]),
%% mnesia数据库 %% mnesia数据库
mnesia:start(), % mnesia:start(),
%% %%
erlang:system_flag(fullsweep_after, 16), erlang:system_flag(fullsweep_after, 16),

View File

@ -2,7 +2,7 @@
{efka, [ {efka, [
{root_dir, "/tmp/efka/"}, {root_dir, "/tmp/efka/"},
{wss_server, [ {tls_server, [
{host, "localhost"}, {host, "localhost"},
{port, 18080} {port, 18080}
]} ]}

View File

@ -3,4 +3,4 @@
-setcookie efka_cookie -setcookie efka_cookie
+K true +K true
+A30 +A 5

View File

@ -5,7 +5,6 @@
{esockd, ".*", {git, "https://github.com/emqx/esockd.git", {tag, "v5.8.0"}}}, {esockd, ".*", {git, "https://github.com/emqx/esockd.git", {tag, "v5.8.0"}}},
{jiffy, ".*", {git, "https://github.com/davisp/jiffy.git", {tag, "1.1.2"}}}, {jiffy, ".*", {git, "https://github.com/davisp/jiffy.git", {tag, "1.1.2"}}},
{gpb, ".*", {git, "https://github.com/tomas-abrahamsson/gpb.git", {tag, "4.20.0"}}}, {gpb, ".*", {git, "https://github.com/tomas-abrahamsson/gpb.git", {tag, "4.20.0"}}},
{gun, ".*", {git, "https://github.com/ninenines/gun.git", {tag, "2.2.0"}}},
{parse_trans, ".*", {git, "https://github.com/uwiger/parse_trans", {tag, "3.0.0"}}}, {parse_trans, ".*", {git, "https://github.com/uwiger/parse_trans", {tag, "3.0.0"}}},
{lager, ".*", {git,"https://github.com/erlang-lager/lager.git", {tag, "3.9.2"}}} {lager, ".*", {git,"https://github.com/erlang-lager/lager.git", {tag, "3.9.2"}}}
]}. ]}.

View File

@ -1,9 +1,5 @@
{"1.2.0", {"1.2.0",
[{<<"certifi">>,{pkg,<<"certifi">>,<<"2.5.3">>},1}, [{<<"certifi">>,{pkg,<<"certifi">>,<<"2.5.3">>},1},
{<<"cowlib">>,
{git,"https://github.com/ninenines/cowlib",
{ref,"f8d0ad7f19b5dddd33cbfb089ebd2e2be2a81a5d"}},
1},
{<<"esockd">>, {<<"esockd">>,
{git,"https://github.com/emqx/esockd.git", {git,"https://github.com/emqx/esockd.git",
{ref,"9b959fc11a1c398a589892f335235be6c5b4a454"}}, {ref,"9b959fc11a1c398a589892f335235be6c5b4a454"}},
@ -14,10 +10,6 @@
{git,"https://github.com/tomas-abrahamsson/gpb.git", {git,"https://github.com/tomas-abrahamsson/gpb.git",
{ref,"edda1006d863a09509673778c455d33d88e6edbc"}}, {ref,"edda1006d863a09509673778c455d33d88e6edbc"}},
0}, 0},
{<<"gun">>,
{git,"https://github.com/ninenines/gun.git",
{ref,"627b8f9ed65da255afaddd166b1b9d102e0fa512"}},
0},
{<<"hackney">>, {<<"hackney">>,
{git,"https://github.com/benoitc/hackney.git", {git,"https://github.com/benoitc/hackney.git",
{ref,"f642ee0eccaff9aa15707ae3a3effa29f2920e41"}}, {ref,"f642ee0eccaff9aa15707ae3a3effa29f2920e41"}},