Compare commits
No commits in common. "e886dce82d0e3415223553ac879d18cbaab1cb71" and "9d473d26fb156be90121deb591b54d8cabeebf85" have entirely different histories.
e886dce82d
...
9d473d26fb
@ -5,13 +5,15 @@
|
|||||||
{mod, {efka_app, []}},
|
{mod, {efka_app, []}},
|
||||||
{applications,
|
{applications,
|
||||||
[
|
[
|
||||||
% sync,
|
sync,
|
||||||
hackney,
|
hackney,
|
||||||
lager,
|
lager,
|
||||||
esockd,
|
esockd,
|
||||||
%jiffy,
|
jiffy,
|
||||||
%gpb,
|
gpb,
|
||||||
%mnesia,
|
cowlib,
|
||||||
|
gun,
|
||||||
|
mnesia,
|
||||||
crypto,
|
crypto,
|
||||||
ssl,
|
ssl,
|
||||||
public_key,
|
public_key,
|
||||||
|
|||||||
@ -4,7 +4,7 @@
|
|||||||
%%% @doc
|
%%% @doc
|
||||||
%%%
|
%%%
|
||||||
%%% @end
|
%%% @end
|
||||||
%%% Created : 20. 4月 2025 18:47
|
%%% Created : 20. 4月 2025 16:25
|
||||||
%%%-------------------------------------------------------------------
|
%%%-------------------------------------------------------------------
|
||||||
-module(efka_agent).
|
-module(efka_agent).
|
||||||
-author("anlicheng").
|
-author("anlicheng").
|
||||||
@ -19,10 +19,15 @@
|
|||||||
|
|
||||||
-define(SERVER, ?MODULE).
|
-define(SERVER, ?MODULE).
|
||||||
|
|
||||||
|
-define(DISCONNECTED, 0).
|
||||||
|
-define(CONNECTING, 1).
|
||||||
|
-define(CONNECTED, 2).
|
||||||
|
|
||||||
-record(state, {
|
-record(state, {
|
||||||
|
conn_pid :: pid() | undefined,
|
||||||
host :: string(),
|
host :: string(),
|
||||||
port :: integer(),
|
port :: integer(),
|
||||||
socket
|
status = ?DISCONNECTED
|
||||||
}).
|
}).
|
||||||
|
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
@ -45,10 +50,11 @@ start_link() ->
|
|||||||
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
|
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
|
||||||
{stop, Reason :: term()} | ignore).
|
{stop, Reason :: term()} | ignore).
|
||||||
init([]) ->
|
init([]) ->
|
||||||
{ok, Props} = application:get_env(efka, tls_server),
|
{ok, Props} = application:get_env(efka, wss_server),
|
||||||
Host = proplists:get_value(host, Props),
|
Host = proplists:get_value(host, Props),
|
||||||
Port = proplists:get_value(port, Props),
|
Port = proplists:get_value(port, Props),
|
||||||
|
|
||||||
|
%% 异步建立到服务器的链接
|
||||||
erlang:start_timer(0, self(), create_connection),
|
erlang:start_timer(0, self(), create_connection),
|
||||||
|
|
||||||
{ok, #state{host = Host, port = Port}}.
|
{ok, #state{host = Host, port = Port}}.
|
||||||
@ -56,7 +62,7 @@ init([]) ->
|
|||||||
%% @private
|
%% @private
|
||||||
%% @doc Handling call messages
|
%% @doc Handling call messages
|
||||||
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
|
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
|
||||||
State :: #state{}) ->
|
State :: #state{}) ->
|
||||||
{reply, Reply :: term(), NewState :: #state{}} |
|
{reply, Reply :: term(), NewState :: #state{}} |
|
||||||
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
|
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
|
||||||
{noreply, NewState :: #state{}} |
|
{noreply, NewState :: #state{}} |
|
||||||
@ -83,27 +89,38 @@ handle_cast(_Request, State = #state{}) ->
|
|||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
handle_info({timeout, _, create_connection}, State = #state{host = Host, port = Port}) ->
|
handle_info({timeout, _, create_connection}, State = #state{host = Host, port = Port}) ->
|
||||||
case connect(Host, Port) of
|
case connect(Host, Port) of
|
||||||
{ok, Socket} ->
|
{ok, ConnPid} ->
|
||||||
{noreply, State#state{socket = Socket}};
|
{noreply, State#state{conn_pid = ConnPid, status = ?CONNECTING}};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
lager:debug("[efka_agent] create_connection get error: ~p", [Reason]),
|
lager:debug("[efka_agent] connect host: ~p, port: ~p, get error: ~p", [Host, Port, Reason]),
|
||||||
retry_connect(),
|
retry_connect(),
|
||||||
{noreply, State#state{socket = undefined}}
|
{noreply, State#state{conn_pid = undefined, status = ?DISCONNECTED}}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_info({ssl, Socket, Data}, State = #state{socket = Socket}) ->
|
%% upgrade逻辑处理
|
||||||
lager:debug("[efka_agent] socket get message: ~p", [Data]),
|
handle_info({gun_upgrade, ConnPid, _StreamRef, [<<"websocket">>], Headers}, State = #state{conn_pid = ConnPid}) ->
|
||||||
|
lager:debug("[efka_agent] upgrade success, headers: ~p", [Headers]),
|
||||||
|
{noreply, State#state{status = ?CONNECTED}};
|
||||||
|
handle_info({gun_response, ConnPid, _, _, Status, Headers}, State = #state{conn_pid = ConnPid}) ->
|
||||||
|
lager:debug("[efka_agent] upgrade failed, status: ~p, headers: ~p", [Status, Headers]),
|
||||||
retry_connect(),
|
retry_connect(),
|
||||||
{noreply, State#state{socket = undefined}};
|
gun:close(ConnPid),
|
||||||
|
{noreply, State#state{conn_pid = undefined, status = ?DISCONNECTED}};
|
||||||
|
|
||||||
handle_info({ssl_error, Socket, Reason}, State = #state{socket = Socket}) ->
|
%% 处理接受到的消息
|
||||||
lager:warning("[efka_agent] socket close with reason: ~p", [Reason]),
|
handle_info({gun_ws, ConnPid, {binary, Bin}}, State = #state{conn_pid = ConnPid}) ->
|
||||||
|
lager:debug("[efka_agent] get binary message: ~p", [Bin]),
|
||||||
|
{noreply, State};
|
||||||
|
|
||||||
|
handle_info({gun_ws, ConnPid, {text, Msg}}, State = #state{conn_pid = ConnPid}) ->
|
||||||
|
lager:debug("[efka_agent] get text message: ~p", [Msg]),
|
||||||
|
{noreply, State};
|
||||||
|
|
||||||
|
%% websocket链接关闭
|
||||||
|
handle_info({gun_ws, ConnPid, close}, State = #state{conn_pid = ConnPid}) ->
|
||||||
|
lager:debug("[efka_agent] ws close"),
|
||||||
retry_connect(),
|
retry_connect(),
|
||||||
{noreply, State#state{socket = undefined}};
|
{noreply, State#state{conn_pid = undefined}};
|
||||||
handle_info({ssl_closed, Socket}, State = #state{socket = Socket}) ->
|
|
||||||
lager:warning("[efka_agent] socket closed"),
|
|
||||||
retry_connect(),
|
|
||||||
{noreply, State#state{socket = undefined}};
|
|
||||||
|
|
||||||
handle_info(_Info, State = #state{}) ->
|
handle_info(_Info, State = #state{}) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
@ -114,14 +131,14 @@ handle_info(_Info, State = #state{}) ->
|
|||||||
%% necessary cleaning up. When it returns, the gen_server terminates
|
%% necessary cleaning up. When it returns, the gen_server terminates
|
||||||
%% with Reason. The return value is ignored.
|
%% with Reason. The return value is ignored.
|
||||||
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
|
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
|
||||||
State :: #state{}) -> term()).
|
State :: #state{}) -> term()).
|
||||||
terminate(_Reason, _State = #state{}) ->
|
terminate(_Reason, _State = #state{}) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
%% @doc Convert process state when code is changed
|
%% @doc Convert process state when code is changed
|
||||||
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
|
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
|
||||||
Extra :: term()) ->
|
Extra :: term()) ->
|
||||||
{ok, NewState :: #state{}} | {error, Reason :: term()}).
|
{ok, NewState :: #state{}} | {error, Reason :: term()}).
|
||||||
code_change(_OldVsn, State = #state{}, _Extra) ->
|
code_change(_OldVsn, State = #state{}, _Extra) ->
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
@ -133,9 +150,28 @@ code_change(_OldVsn, State = #state{}, _Extra) ->
|
|||||||
retry_connect() ->
|
retry_connect() ->
|
||||||
erlang:start_timer(5000, self(), create_connection).
|
erlang:start_timer(5000, self(), create_connection).
|
||||||
|
|
||||||
|
-spec connect(Host :: string(), Port :: integer()) -> {ok, ConnPid :: pid()} | {error, Reason :: any()}.
|
||||||
connect(Host, Port) when is_list(Host), is_integer(Port) ->
|
connect(Host, Port) when is_list(Host), is_integer(Port) ->
|
||||||
SslOptions = [
|
Opts = #{
|
||||||
{verify, verify_none},
|
protocols => [http],
|
||||||
{active, true}
|
transport => tcp
|
||||||
],
|
},
|
||||||
ssl:connect(Host, Port, SslOptions, 5000).
|
case gun:open(Host, Port, Opts) of
|
||||||
|
{ok, ConnPid} ->
|
||||||
|
%% 等待连接建立
|
||||||
|
receive
|
||||||
|
{gun_up, ConnPid, http} ->
|
||||||
|
%% 发起 WebSocket 握手请求
|
||||||
|
gun:ws_upgrade(ConnPid, "/", [
|
||||||
|
{<<"connection">>, <<"upgrade">>},
|
||||||
|
{<<"upgrade">>, <<"websocket">>},
|
||||||
|
{<<"sec-websocket-version">>, <<"13">>},
|
||||||
|
{<<"sec-websocket-key">>, base64:encode(crypto:strong_rand_bytes(16))}
|
||||||
|
]),
|
||||||
|
{ok, ConnPid}
|
||||||
|
after 5000 ->
|
||||||
|
{error, connect_timeout}
|
||||||
|
end;
|
||||||
|
{error, Reason} ->
|
||||||
|
{error, Reason}
|
||||||
|
end.
|
||||||
|
|||||||
@ -12,7 +12,7 @@
|
|||||||
start(_StartType, _StartArgs) ->
|
start(_StartType, _StartArgs) ->
|
||||||
io:setopts([{encoding, unicode}]),
|
io:setopts([{encoding, unicode}]),
|
||||||
%% 启动mnesia数据库
|
%% 启动mnesia数据库
|
||||||
% mnesia:start(),
|
mnesia:start(),
|
||||||
%% 加速内存的回收
|
%% 加速内存的回收
|
||||||
erlang:system_flag(fullsweep_after, 16),
|
erlang:system_flag(fullsweep_after, 16),
|
||||||
|
|
||||||
|
|||||||
@ -2,7 +2,7 @@
|
|||||||
{efka, [
|
{efka, [
|
||||||
{root_dir, "/tmp/efka/"},
|
{root_dir, "/tmp/efka/"},
|
||||||
|
|
||||||
{tls_server, [
|
{wss_server, [
|
||||||
{host, "localhost"},
|
{host, "localhost"},
|
||||||
{port, 18080}
|
{port, 18080}
|
||||||
]}
|
]}
|
||||||
|
|||||||
@ -3,4 +3,4 @@
|
|||||||
-setcookie efka_cookie
|
-setcookie efka_cookie
|
||||||
|
|
||||||
+K true
|
+K true
|
||||||
+A 5
|
+A30
|
||||||
|
|||||||
@ -5,6 +5,7 @@
|
|||||||
{esockd, ".*", {git, "https://github.com/emqx/esockd.git", {tag, "v5.8.0"}}},
|
{esockd, ".*", {git, "https://github.com/emqx/esockd.git", {tag, "v5.8.0"}}},
|
||||||
{jiffy, ".*", {git, "https://github.com/davisp/jiffy.git", {tag, "1.1.2"}}},
|
{jiffy, ".*", {git, "https://github.com/davisp/jiffy.git", {tag, "1.1.2"}}},
|
||||||
{gpb, ".*", {git, "https://github.com/tomas-abrahamsson/gpb.git", {tag, "4.20.0"}}},
|
{gpb, ".*", {git, "https://github.com/tomas-abrahamsson/gpb.git", {tag, "4.20.0"}}},
|
||||||
|
{gun, ".*", {git, "https://github.com/ninenines/gun.git", {tag, "2.2.0"}}},
|
||||||
{parse_trans, ".*", {git, "https://github.com/uwiger/parse_trans", {tag, "3.0.0"}}},
|
{parse_trans, ".*", {git, "https://github.com/uwiger/parse_trans", {tag, "3.0.0"}}},
|
||||||
{lager, ".*", {git,"https://github.com/erlang-lager/lager.git", {tag, "3.9.2"}}}
|
{lager, ".*", {git,"https://github.com/erlang-lager/lager.git", {tag, "3.9.2"}}}
|
||||||
]}.
|
]}.
|
||||||
|
|||||||
@ -1,5 +1,9 @@
|
|||||||
{"1.2.0",
|
{"1.2.0",
|
||||||
[{<<"certifi">>,{pkg,<<"certifi">>,<<"2.5.3">>},1},
|
[{<<"certifi">>,{pkg,<<"certifi">>,<<"2.5.3">>},1},
|
||||||
|
{<<"cowlib">>,
|
||||||
|
{git,"https://github.com/ninenines/cowlib",
|
||||||
|
{ref,"f8d0ad7f19b5dddd33cbfb089ebd2e2be2a81a5d"}},
|
||||||
|
1},
|
||||||
{<<"esockd">>,
|
{<<"esockd">>,
|
||||||
{git,"https://github.com/emqx/esockd.git",
|
{git,"https://github.com/emqx/esockd.git",
|
||||||
{ref,"9b959fc11a1c398a589892f335235be6c5b4a454"}},
|
{ref,"9b959fc11a1c398a589892f335235be6c5b4a454"}},
|
||||||
@ -10,6 +14,10 @@
|
|||||||
{git,"https://github.com/tomas-abrahamsson/gpb.git",
|
{git,"https://github.com/tomas-abrahamsson/gpb.git",
|
||||||
{ref,"edda1006d863a09509673778c455d33d88e6edbc"}},
|
{ref,"edda1006d863a09509673778c455d33d88e6edbc"}},
|
||||||
0},
|
0},
|
||||||
|
{<<"gun">>,
|
||||||
|
{git,"https://github.com/ninenines/gun.git",
|
||||||
|
{ref,"627b8f9ed65da255afaddd166b1b9d102e0fa512"}},
|
||||||
|
0},
|
||||||
{<<"hackney">>,
|
{<<"hackney">>,
|
||||||
{git,"https://github.com/benoitc/hackney.git",
|
{git,"https://github.com/benoitc/hackney.git",
|
||||||
{ref,"f642ee0eccaff9aa15707ae3a3effa29f2920e41"}},
|
{ref,"f642ee0eccaff9aa15707ae3a3effa29f2920e41"}},
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user