解决tcp_server的问题
This commit is contained in:
parent
067ead0f60
commit
7a9d18cf51
@ -6,9 +6,11 @@
|
||||
{applications,
|
||||
[
|
||||
sync,
|
||||
%jiffy,
|
||||
jiffy,
|
||||
%gpb,
|
||||
%mnesia,
|
||||
parse_trans,
|
||||
lager,
|
||||
crypto,
|
||||
inets,
|
||||
ssl,
|
||||
|
||||
437
apps/efka/src/efka_client.erl
Normal file
437
apps/efka/src/efka_client.erl
Normal file
@ -0,0 +1,437 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% @copyright (C) 2023, <COMPANY>
|
||||
%%% @doc
|
||||
%%%
|
||||
%%% @end
|
||||
%%% Created : 28. 8月 2023 15:39
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(efka_client).
|
||||
-author("aresei").
|
||||
|
||||
-behaviour(gen_server).
|
||||
|
||||
%% 消息包
|
||||
-record(efka_packet, {
|
||||
packet_id :: integer(),
|
||||
type :: integer(),
|
||||
message :: any()
|
||||
}).
|
||||
|
||||
%% 请求超时时间
|
||||
-define(EFKA_REQUEST_TIMEOUT, 5000).
|
||||
|
||||
%% 消息类型
|
||||
|
||||
%% 服务注册
|
||||
-define(PACKET_TYPE_REGISTER, 16).
|
||||
%% 上传数据
|
||||
-define(PACKET_TYPE_METRIC_DATA, 3).
|
||||
%% 调用其他微服务
|
||||
-define(PACKET_TYPE_INVOKE, 4).
|
||||
%% 消息响应
|
||||
-define(PACKET_TYPE_RESPONSE, 7).
|
||||
%% efka下发给微服务参数
|
||||
-define(PACKET_TYPE_PUSH_PARAM, 5).
|
||||
%% efka下发给微服务采集项
|
||||
-define(PACKET_TYPE_PUSH_METRIC, 6).
|
||||
%% 设备状态轮询: 增加日期: 2025-4-16
|
||||
-define(PACKET_TYPE_POLL, 20).
|
||||
%% 微服务给efka发送log消息
|
||||
-define(PACKET_TYPE_LOG, 9).
|
||||
%% 微服务从efka获取自身的采集项
|
||||
-define(PACKET_TYPE_REQUEST_METRIC, 10).
|
||||
%% 微服务从efka获取自身的参数
|
||||
-define(PACKET_TYPE_REQUEST_PARAM, 12).
|
||||
%% efka向微服务发送stream-call消息
|
||||
-define(PACKET_TYPE_PUSH_STREAM_CALL, 11).
|
||||
%% 微服务事件上报
|
||||
-define(PACKET_TYPE_EVENT, 15).
|
||||
|
||||
%% API
|
||||
-export([start_link/3]).
|
||||
-export([device_offline/1, device_online/1]).
|
||||
-export([send_metric_data/2, invoke_service/3, send_log/1, request_metric/0, request_param/0, send_event/2]).
|
||||
|
||||
-export([test/0]).
|
||||
|
||||
%% gen_server callbacks
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||
|
||||
-record(state, {
|
||||
packet_id = 0 :: integer(),
|
||||
host :: string(),
|
||||
port :: integer(),
|
||||
%% 请求后未完成的请求
|
||||
inflight = #{} :: map(),
|
||||
socket :: gen_tcp:socket()
|
||||
}).
|
||||
|
||||
-spec send_metric_data(Fields :: list(), Tags :: #{}) -> {ok, Result :: any()} | {error, Reason :: any()}.
|
||||
send_metric_data(Fields, Tags) when is_list(Fields), is_map(Tags) ->
|
||||
{ok, Ref} = gen_server:call(?MODULE, {send_metric_data, self(), Fields, Tags}),
|
||||
await_reply(Ref, ?EFKA_REQUEST_TIMEOUT).
|
||||
|
||||
-spec invoke_service(ToService :: binary(), Message :: map(), Timeout :: integer()) ->
|
||||
{ok, Result :: any()} | {error, Reason :: any()}.
|
||||
invoke_service(ToService, Message, Timeout) when is_binary(ToService), is_map(Message), is_integer(Timeout) ->
|
||||
{ok, Ref} = gen_server:call(?MODULE, {invoke_service, self(), ToService, Message, Timeout}),
|
||||
await_reply(Ref, ?EFKA_REQUEST_TIMEOUT).
|
||||
|
||||
-spec send_log(Message :: binary() | map()) -> no_return().
|
||||
send_log(Message) when is_binary(Message); is_map(Message) ->
|
||||
gen_server:cast(?MODULE, {send_log, Message}).
|
||||
|
||||
%% efka_server为了统一,r对象为字符串;需要2次json_decode
|
||||
-spec request_metric() -> {ok, Result :: list()} | {error, Reason :: any()}.
|
||||
request_metric() ->
|
||||
{ok, Ref} = gen_server:call(?MODULE, {request_metric, self()}),
|
||||
case await_reply(Ref, ?EFKA_REQUEST_TIMEOUT) of
|
||||
{ok, Reply} ->
|
||||
{ok, jiffy:decode(Reply, [return_maps])};
|
||||
Error ->
|
||||
Error
|
||||
end.
|
||||
|
||||
-spec request_param() -> {ok, Result :: map()} | {error, Reason :: any()}.
|
||||
request_param() ->
|
||||
{ok, Ref} = gen_server:call(?MODULE, {request_param, self()}),
|
||||
case await_reply(Ref, ?EFKA_REQUEST_TIMEOUT) of
|
||||
{ok, Reply} ->
|
||||
{ok, jiffy:decode(Reply, [return_maps])};
|
||||
Error ->
|
||||
Error
|
||||
end.
|
||||
|
||||
-spec device_offline(DeviceUUID :: binary()) -> no_return().
|
||||
device_offline(DeviceUUID) when is_binary(DeviceUUID) ->
|
||||
send_event(1, #{<<"device_uuid">> => DeviceUUID, <<"status">> => 0}).
|
||||
|
||||
-spec device_online(DeviceUUID :: binary()) -> no_return().
|
||||
device_online(DeviceUUID) when is_binary(DeviceUUID) ->
|
||||
send_event(1, #{<<"device_uuid">> => DeviceUUID, <<"status">> => 1}).
|
||||
|
||||
-spec send_event(EventType :: integer(), Params :: binary() | map()) -> no_return().
|
||||
send_event(EventType, Params) when is_integer(EventType), is_binary(Params); is_map(Params) ->
|
||||
gen_server:cast(?MODULE, {send_event, EventType, Params}).
|
||||
|
||||
%%%===================================================================
|
||||
%%% API
|
||||
%%%===================================================================
|
||||
|
||||
-spec await_reply(Ref :: reference(), Timeout :: integer()) -> {ok, Result :: any()} | {error, Reason :: any()}.
|
||||
await_reply(Ref, Timeout) when is_reference(Ref), is_integer(Timeout) ->
|
||||
receive
|
||||
{response, Ref, {ok, Reply}} ->
|
||||
{ok, Reply};
|
||||
{response, Ref, {error, Reason}} ->
|
||||
{error, Reason}
|
||||
after
|
||||
Timeout ->
|
||||
{error, timeout}
|
||||
end.
|
||||
|
||||
test() ->
|
||||
start_link(<<"test">>, "localhost", 18080).
|
||||
|
||||
%% @doc Spawns the server and registers the local name (unique)
|
||||
-spec(start_link(RegisterName :: binary(), Host :: string(), Port :: integer()) ->
|
||||
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
|
||||
start_link(RegisterName, Host, Port) when is_binary(RegisterName), is_list(Host), is_integer(Port) ->
|
||||
gen_server:start_link(?MODULE, [RegisterName, Host, Port], []).
|
||||
|
||||
%%%===================================================================
|
||||
%%% gen_server callbacks
|
||||
%%%===================================================================
|
||||
|
||||
%% @private
|
||||
%% @doc Initializes the server
|
||||
-spec(init(Args :: term()) ->
|
||||
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term()} | ignore).
|
||||
init([RegisterName, Host, Port]) ->
|
||||
{ok, Socket} = gen_tcp:connect(Host, Port, [binary, {packet, 4}, {active, true}]),
|
||||
ok = gen_tcp:controlling_process(Socket, self()),
|
||||
case do_register(RegisterName, Socket) of
|
||||
ok ->
|
||||
{ok, #state{packet_id = 1, host = Host, port = Port, socket = Socket}};
|
||||
{error, Reason} ->
|
||||
{stop, Reason}
|
||||
end.
|
||||
|
||||
%% 执行到efka服务器的注册
|
||||
do_register(RegisterName, Socket) ->
|
||||
PacketId = 0,
|
||||
Body = #{<<"name">> => RegisterName},
|
||||
Packet = pack(PacketId, ?PACKET_TYPE_REGISTER, Body),
|
||||
ok = gen_tcp:send(Socket, Packet),
|
||||
lager:debug("[efka_client] will send packet: ~p", [Packet]),
|
||||
receive
|
||||
{tcp, Socket, Data} ->
|
||||
RegisterPacket = unpack(Data),
|
||||
lager:debug("[efka_client] get register reply packet: ~p", [RegisterPacket]),
|
||||
case RegisterPacket of
|
||||
#efka_packet{packet_id = PacketId, type = ?PACKET_TYPE_RESPONSE, message = #{<<"c">> := 1, <<"r">> := <<"ok">>}} ->
|
||||
ok;
|
||||
#efka_packet{packet_id = PacketId, type = ?PACKET_TYPE_RESPONSE, message = #{<<"c">> := 0, <<"e">> := Error}} ->
|
||||
{error, Error};
|
||||
_ ->
|
||||
{error, invalid_register_packet}
|
||||
end
|
||||
after
|
||||
?EFKA_REQUEST_TIMEOUT ->
|
||||
{error, timeout}
|
||||
end.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling call messages
|
||||
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
|
||||
State :: #state{}) ->
|
||||
{reply, Reply :: term(), NewState :: #state{}} |
|
||||
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_call({send_metric_data, ReceiverPid, Fields, Tags}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
|
||||
Body = #{<<"data">> => Fields, <<"tag">> => Tags},
|
||||
Packet = pack(PacketId, ?PACKET_TYPE_METRIC_DATA, Body),
|
||||
ok = gen_tcp:send(Socket, Packet),
|
||||
Ref = make_ref(),
|
||||
{reply, {ok, Ref}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}};
|
||||
|
||||
handle_call({invoke_service, ReceiverPid, ToService, Message, Timeout}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
|
||||
Body = #{
|
||||
<<"to">> => ToService,
|
||||
<<"t">> => Timeout,
|
||||
<<"m">> => Message
|
||||
},
|
||||
Packet = pack(PacketId, ?PACKET_TYPE_INVOKE, Body),
|
||||
ok = gen_tcp:send(Socket, Packet),
|
||||
Ref = make_ref(),
|
||||
{reply, {ok, Ref}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}};
|
||||
|
||||
handle_call({request_metric, ReceiverPid}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
|
||||
Packet = pack(PacketId, ?PACKET_TYPE_REQUEST_METRIC),
|
||||
ok = gen_tcp:send(Socket, Packet),
|
||||
Ref = make_ref(),
|
||||
{reply, {ok, Ref}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}};
|
||||
|
||||
handle_call({request_param, ReceiverPid}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
|
||||
Packet = pack(PacketId, ?PACKET_TYPE_REQUEST_PARAM),
|
||||
ok = gen_tcp:send(Socket, Packet),
|
||||
Ref = make_ref(),
|
||||
{reply, {ok, Ref}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling cast messages
|
||||
-spec(handle_cast(Request :: term(), State :: #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_cast({send_log, Message}, State = #state{socket = Socket, packet_id = PacketId}) ->
|
||||
Body = #{<<"l">> => Message},
|
||||
Packet = pack(PacketId, ?PACKET_TYPE_LOG, Body),
|
||||
ok = gen_tcp:send(Socket, Packet),
|
||||
|
||||
{noreply, State#state{packet_id = next_packet_id(PacketId)}};
|
||||
|
||||
handle_cast({send_event, EventType, Params}, State = #state{socket = Socket}) ->
|
||||
Body = #{
|
||||
<<"event_type">> => EventType,
|
||||
<<"params">> => Params
|
||||
},
|
||||
Packet = pack(0, ?PACKET_TYPE_EVENT, Body),
|
||||
ok = gen_tcp:send(Socket, Packet),
|
||||
{noreply, State};
|
||||
|
||||
handle_cast(_Info, State = #state{}) ->
|
||||
{noreply, State}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling all non call/cast messages
|
||||
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_info({tcp, Socket, Data}, State = #state{socket = Socket}) ->
|
||||
Packet = unpack(Data),
|
||||
self() ! {handle_packet, Packet},
|
||||
{noreply, State};
|
||||
handle_info({tcp_closed, Socket}, State = #state{socket = Socket}) ->
|
||||
{stop, tcp_closed, State};
|
||||
|
||||
%% 收到请求的响应
|
||||
handle_info({handle_packet, #efka_packet{packet_id = PacketId, type = ?PACKET_TYPE_RESPONSE, message = Message}}, State = #state{inflight = Inflight}) ->
|
||||
case maps:take(PacketId, Inflight) of
|
||||
error ->
|
||||
{noreply, State};
|
||||
{{Ref, ReceiverPid}, NInflight} ->
|
||||
case Message of
|
||||
#{<<"c">> := 1, <<"r">> := Result} ->
|
||||
ReceiverPid ! {response, Ref, {ok, Result}};
|
||||
#{<<"c">> := 0, <<"e">> := Error} ->
|
||||
ReceiverPid ! {response, Ref, {error, Error}}
|
||||
end,
|
||||
{noreply, State#state{inflight = NInflight}}
|
||||
end;
|
||||
|
||||
%% 收到efka推送的参数设置
|
||||
handle_info({handle_packet, #efka_packet{packet_id = PacketId, type = ?PACKET_TYPE_PUSH_PARAM, message = Params}},
|
||||
State = #state{socket = Socket}) when is_map(Params) ->
|
||||
|
||||
Message = case handle_param(Params) of
|
||||
ok ->
|
||||
#{<<"c">> => 1, <<"r">> => <<"ok">>};
|
||||
{error, Reason} ->
|
||||
#{<<"c">> => 0, <<"e">> => Reason}
|
||||
end,
|
||||
Packet = pack(PacketId, ?PACKET_TYPE_RESPONSE, Message),
|
||||
ok = gen_tcp:send(Socket, Packet),
|
||||
|
||||
{noreply, State};
|
||||
|
||||
%% 收到efka推送的采集项消息
|
||||
handle_info({handle_packet, #efka_packet{packet_id = PacketId, type = ?PACKET_TYPE_PUSH_METRIC, message = Metrics}},
|
||||
State = #state{socket = Socket}) when is_list(Metrics) ->
|
||||
|
||||
Message = case handle_metric(Metrics) of
|
||||
ok ->
|
||||
#{<<"c">> => 1, <<"r">> => <<"ok">>};
|
||||
{error, Reason} when is_binary(Reason) ->
|
||||
#{<<"c">> => 0, <<"e">> => Reason}
|
||||
end,
|
||||
Packet = pack(PacketId, ?PACKET_TYPE_RESPONSE, Message),
|
||||
ok = gen_tcp:send(Socket, Packet),
|
||||
|
||||
{noreply, State};
|
||||
|
||||
%% 收到设备状态的轮询请求
|
||||
handle_info({handle_packet, #efka_packet{packet_id = PacketId, type = ?PACKET_TYPE_POLL, message = Command}}, State = #state{socket = Socket}) ->
|
||||
Message = handle_poll_command(Command),
|
||||
Packet = pack(PacketId, ?PACKET_TYPE_RESPONSE, Message),
|
||||
ok = gen_tcp:send(Socket, Packet),
|
||||
|
||||
{noreply, State};
|
||||
|
||||
%% 收到efka的stream-call消息
|
||||
handle_info({handle_packet, #efka_packet{packet_id = PacketId, type = ?PACKET_TYPE_PUSH_STREAM_CALL,
|
||||
message = Msg = #{<<"service_name">> := ServiceName, <<"data">> := Data, <<"tag">> := Tag}}}, State = #state{socket = Socket}) ->
|
||||
|
||||
Message = case handle_stream_call(ServiceName, Data, Tag) of
|
||||
{continue, NewServiceName, NewData, NewTag} ->
|
||||
#{
|
||||
<<"c">> => 1,
|
||||
<<"r">> => Msg#{<<"service_name">> := NewServiceName, <<"data">> => NewData, <<"tag">> => NewTag},
|
||||
<<"k">> => true
|
||||
};
|
||||
%% 处理到当前节点为止,不继续往下传递
|
||||
{break, NewServiceName, NewData, NewTag} ->
|
||||
#{
|
||||
<<"c">> => 1,
|
||||
<<"r">> => Msg#{<<"service_name">> := NewServiceName, <<"data">> => NewData, <<"tag">> => NewTag},
|
||||
<<"k">> => false
|
||||
};
|
||||
error ->
|
||||
#{
|
||||
<<"c">> => 0,
|
||||
<<"r">> => Msg,
|
||||
<<"k">> => true
|
||||
}
|
||||
end,
|
||||
Packet = pack(PacketId, ?PACKET_TYPE_RESPONSE, Message),
|
||||
ok = gen_tcp:send(Socket, Packet),
|
||||
|
||||
{noreply, State};
|
||||
|
||||
%% 其他消息为非法消息
|
||||
handle_info({handle_packet, _Packet}, State = #state{}) ->
|
||||
{noreply, State}.
|
||||
|
||||
%% @private
|
||||
%% @doc This function is called by a gen_server when it is about to
|
||||
%% terminate. It should be the opposite of Module:init/1 and do any
|
||||
%% necessary cleaning up. When it returns, the gen_server terminates
|
||||
%% with Reason. The return value is ignored.
|
||||
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
|
||||
State :: #state{}) -> term()).
|
||||
terminate(_Reason, _State = #state{socket = Socket}) ->
|
||||
gen_tcp:close(Socket),
|
||||
ok.
|
||||
|
||||
%% @private
|
||||
%% @doc Convert process state when code is changed
|
||||
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
|
||||
Extra :: term()) ->
|
||||
{ok, NewState :: #state{}} | {error, Reason :: term()}).
|
||||
code_change(_OldVsn, State = #state{}, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
%%%===================================================================
|
||||
%%% Internal functions
|
||||
%%%===================================================================
|
||||
|
||||
-spec next_packet_id(PacketId :: integer()) -> NextPacketId :: integer().
|
||||
next_packet_id(PacketId) when PacketId >= 65535 ->
|
||||
0;
|
||||
next_packet_id(PacketId) ->
|
||||
PacketId + 1.
|
||||
|
||||
-spec pack(PacketId :: integer(), Type :: integer()) -> binary().
|
||||
pack(PacketId, Type) when is_integer(PacketId), is_integer(Type) ->
|
||||
<<PacketId:16, Type:8>>.
|
||||
|
||||
-spec pack(PacketId :: integer(), Type :: integer(), Body :: map()) -> binary().
|
||||
pack(PacketId, Type, Body) when is_integer(PacketId), is_integer(Type), is_map(Body) ->
|
||||
Message = iolist_to_binary(jiffy:encode(Body, [force_utf8])),
|
||||
<<PacketId:16, Type:8, Message/binary>>.
|
||||
|
||||
-spec unpack(binary()) -> #efka_packet{}.
|
||||
unpack(<<PacketId:16, Type:8, Body/binary>>) ->
|
||||
Message = catch jiffy:decode(Body, [return_maps]),
|
||||
#efka_packet{packet_id = PacketId, type = Type, message = Message}.
|
||||
|
||||
%%%===================================================================
|
||||
%%% simple callbacks
|
||||
%%%===================================================================
|
||||
|
||||
handle_poll_command(#{<<"device_uuid">> := DeviceUUID, <<"command">> := <<"query_status">>}) when is_binary(DeviceUUID) ->
|
||||
case power_device:get_pid(DeviceUUID) of
|
||||
undefined ->
|
||||
#{
|
||||
<<"c">> => 1,
|
||||
<<"r">> => #{
|
||||
<<"edge_status">> => -1,
|
||||
<<"message">> => <<"设备信息不存在"/utf8>>
|
||||
}
|
||||
};
|
||||
Pid ->
|
||||
StatusMap = #{
|
||||
0 => <<"离线"/utf8>>,
|
||||
1 => <<"在线"/utf8>>
|
||||
},
|
||||
{ok, Status} = power_device:poll_status(Pid),
|
||||
#{
|
||||
<<"c">> => 1,
|
||||
<<"r">> => #{
|
||||
<<"edge_status">> => Status,
|
||||
<<"message">> => maps:get(Status, StatusMap)
|
||||
}
|
||||
}
|
||||
end.
|
||||
|
||||
-spec handle_param(Params :: map()) -> ok | {error, Reason :: binary()}.
|
||||
handle_param(Params) when is_map(Params) ->
|
||||
power_gateway_args:push_param(Params),
|
||||
ok.
|
||||
|
||||
-spec handle_metric(Metric :: list()) -> ok | {error, Reason :: binary()}.
|
||||
handle_metric(Metric) when is_list(Metric) ->
|
||||
power_gateway_args:push_metric(Metric),
|
||||
ok.
|
||||
|
||||
-spec handle_stream_call(ServiceName :: binary(), Fields :: list(), Tag :: map()) ->
|
||||
{continue, NewServiceName :: binary(), NewFields :: list(), NewTag :: map()}
|
||||
| {break, NewServiceName :: binary(), NewFields :: list(), NewTag :: map()}
|
||||
| error.
|
||||
handle_stream_call(ServiceName, Fields, Tag) when is_binary(ServiceName), is_list(Fields), is_map(Tag) ->
|
||||
{continue, ServiceName, Fields, Tag}.
|
||||
@ -62,13 +62,13 @@ stop(Pid, Reason) when is_pid(Pid) ->
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
start_link(Socket) ->
|
||||
{ok, proc_lib:spawn_link(?MODULE, init, [Socket])}.
|
||||
gen_server:start_link(?MODULE, [Socket], []).
|
||||
|
||||
init(Socket) ->
|
||||
init([Socket]) ->
|
||||
efka_logger:debug("[sdlan_channel] get a new connection: ~p", [Socket]),
|
||||
ok = inet:setopts(Socket, [{active, true}, {packet, 4}]),
|
||||
|
||||
erlang:start_timer(?PING_TICKER, self(), ping_ticker),
|
||||
gen_tcp:controlling_process(Socket, self()),
|
||||
ok = inet:setopts(Socket, [binary, {active, true}, {packet, 4}]),
|
||||
% erlang:start_timer(?PING_TICKER, self(), ping_ticker),
|
||||
gen_server:enter_loop(?MODULE, [], #state{socket = Socket}).
|
||||
|
||||
handle_call(_Request, _From, State) ->
|
||||
@ -78,9 +78,8 @@ handle_cast(_Msg, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
%% 网络流量统计
|
||||
handle_info({tcp, _Sock, <<Body/binary>>}, State = #state{is_registered = true}) ->
|
||||
% #sdl_flows{forward_num = ForwardNum, p2p_num = P2PNum, inbound_num = InboundNum} = sdlan_pb:decode_msg(Body, sdl_flows),
|
||||
efka_logger:debug("[sdlan_channel] body: ~p", [Body]),
|
||||
handle_info({tcp, _Sock, Body}, State = #state{}) ->
|
||||
efka_logger:debug("[sdlan_channel] read body: ~p", [Body]),
|
||||
{noreply, State};
|
||||
|
||||
handle_info({tcp_error, Sock, Reason}, State = #state{socket = Sock}) ->
|
||||
@ -96,11 +95,11 @@ handle_info({stop, Reason}, State) ->
|
||||
{stop, Reason, State};
|
||||
|
||||
handle_info(Info, State) ->
|
||||
efka_logger:warning("[sdlan_channel] get a unknown message: ~p, channel will closed", [Info]),
|
||||
efka_logger:debug("[sdlan_channel] get a unknown message: ~p, channel will closed", [Info]),
|
||||
{noreply, State}.
|
||||
|
||||
terminate(Reason, #state{}) ->
|
||||
efka_logger:warning("[sdlan_channel] stop with reason: ~p", [Reason]),
|
||||
efka_logger:notice("[sdlan_channel] stop with reason: ~p", [Reason]),
|
||||
ok.
|
||||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
|
||||
@ -17,7 +17,7 @@ start_link(Port) ->
|
||||
|
||||
%% 监听循环
|
||||
init(Port) ->
|
||||
case gen_tcp:listen(Port, [binary, {active, false}, {reuseaddr, true}]) of
|
||||
case gen_tcp:listen(Port, [binary, {packet, 4}, {active, false}, {reuseaddr, true}]) of
|
||||
{ok, ListenSocket} ->
|
||||
efka_logger:debug("Server started on port ~p~n", [Port]),
|
||||
main_loop(ListenSocket);
|
||||
@ -29,9 +29,9 @@ init(Port) ->
|
||||
main_loop(ListenSocket) ->
|
||||
case gen_tcp:accept(ListenSocket) of
|
||||
{ok, Socket} ->
|
||||
efka_logger:debug("New client connected: ~p~n", [Socket]),
|
||||
% 为每个新连接生成一个处理进程
|
||||
efka_tcp_sup:start_child(Socket),
|
||||
{ok, ChannelPid} = efka_tcp_sup:start_child(Socket),
|
||||
ok = gen_tcp:controlling_process(Socket, ChannelPid),
|
||||
% 继续监听下一个连接
|
||||
main_loop(ListenSocket);
|
||||
{error, closed} ->
|
||||
|
||||
@ -26,19 +26,18 @@ start_link() ->
|
||||
%% type => worker(), % optional
|
||||
%% modules => modules()} % optional
|
||||
init([]) ->
|
||||
SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600},
|
||||
{ok, {SupFlags, []}}.
|
||||
SupFlags = #{strategy => simple_one_for_one, intensity => 0, period => 1},
|
||||
ChildSpec = #{
|
||||
id => tcp_channel,
|
||||
start => {tcp_channel, start_link, []},
|
||||
restart => temporary,
|
||||
type => worker
|
||||
},
|
||||
{ok, {SupFlags, [ChildSpec]}}.
|
||||
|
||||
%% internal functions
|
||||
|
||||
start_child(Socket) ->
|
||||
supervisor:start_child(?MODULE, #{
|
||||
id => make_ref(),
|
||||
start => {efka_tcp_channel, start_link, [Socket]},
|
||||
restart => permanent,
|
||||
shutdown => 2000,
|
||||
type => worker,
|
||||
modules => ['efka_tcp_channel']
|
||||
}).
|
||||
supervisor:start_child(?MODULE, [Socket]).
|
||||
|
||||
|
||||
|
||||
102
apps/efka/src/tcp_channel.erl
Normal file
102
apps/efka/src/tcp_channel.erl
Normal file
@ -0,0 +1,102 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% @author anlicheng
|
||||
%%% @copyright (C) 2025, <COMPANY>
|
||||
%%% @doc
|
||||
%%%
|
||||
%%% @end
|
||||
%%% Created : 30. 4月 2025 09:22
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(tcp_channel).
|
||||
-author("anlicheng").
|
||||
|
||||
-behaviour(gen_server).
|
||||
|
||||
%% API
|
||||
-export([start_link/1]).
|
||||
|
||||
%% gen_server callbacks
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||
|
||||
-define(SERVER, ?MODULE).
|
||||
|
||||
-record(state, {
|
||||
socket
|
||||
}).
|
||||
|
||||
%%%===================================================================
|
||||
%%% API
|
||||
%%%===================================================================
|
||||
|
||||
%% @doc Spawns the server and registers the local name (unique)
|
||||
-spec(start_link(Socket :: any()) ->
|
||||
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
|
||||
start_link(Socket) ->
|
||||
gen_server:start_link(?MODULE, [Socket], []).
|
||||
|
||||
%%%===================================================================
|
||||
%%% gen_server callbacks
|
||||
%%%===================================================================
|
||||
|
||||
%% @private
|
||||
%% @doc Initializes the server
|
||||
-spec(init(Args :: term()) ->
|
||||
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term()} | ignore).
|
||||
init([Socket]) ->
|
||||
ok = inet:setopts(Socket, [{active, true}]),
|
||||
lager:debug("[tcp_channel] get new socket: ~p", [Socket]),
|
||||
{ok, #state{socket = Socket}}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling call messages
|
||||
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
|
||||
State :: #state{}) ->
|
||||
{reply, Reply :: term(), NewState :: #state{}} |
|
||||
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_call(_Request, _From, State = #state{}) ->
|
||||
{reply, ok, State}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling cast messages
|
||||
-spec(handle_cast(Request :: term(), State :: #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_cast(_Request, State = #state{}) ->
|
||||
{noreply, State}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling all non call/cast messages
|
||||
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_info(Info, State = #state{}) ->
|
||||
lager:debug("[tcp_channel] get info: ~p", [Info]),
|
||||
{noreply, State}.
|
||||
|
||||
%% @private
|
||||
%% @doc This function is called by a gen_server when it is about to
|
||||
%% terminate. It should be the opposite of Module:init/1 and do any
|
||||
%% necessary cleaning up. When it returns, the gen_server terminates
|
||||
%% with Reason. The return value is ignored.
|
||||
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
|
||||
State :: #state{}) -> term()).
|
||||
terminate(_Reason, _State = #state{}) ->
|
||||
ok.
|
||||
|
||||
%% @private
|
||||
%% @doc Convert process state when code is changed
|
||||
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
|
||||
Extra :: term()) ->
|
||||
{ok, NewState :: #state{}} | {error, Reason :: term()}).
|
||||
code_change(_OldVsn, State = #state{}, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
%%%===================================================================
|
||||
%%% Internal functions
|
||||
%%%===================================================================
|
||||
@ -2,7 +2,10 @@
|
||||
{deps, [
|
||||
{sync, ".*", {git, "https://github.com/rustyio/sync.git", {branch, "master"}}},
|
||||
{jiffy, ".*", {git, "https://github.com/davisp/jiffy.git", {tag, "1.1.2"}}},
|
||||
{gpb, ".*", {git, "https://github.com/tomas-abrahamsson/gpb.git", {tag, "4.20.0"}}}
|
||||
{gpb, ".*", {git, "https://github.com/tomas-abrahamsson/gpb.git", {tag, "4.20.0"}}},
|
||||
{jiffy, ".*", {git, "https://github.com/davisp/jiffy.git", {tag, "1.1.1"}}},
|
||||
{parse_trans, ".*", {git, "https://github.com/uwiger/parse_trans", {tag, "3.0.0"}}},
|
||||
{lager, ".*", {git,"https://github.com/erlang-lager/lager.git", {tag, "3.9.2"}}}
|
||||
]}.
|
||||
|
||||
{relx, [{release, {efka, "0.1.0"},
|
||||
@ -40,4 +43,6 @@
|
||||
{pc, {git, "https://github.com/blt/port_compiler.git", {tag, "v1.15.0"}}}
|
||||
]}.
|
||||
|
||||
{erl_opts, [{parse_transform,lager_transform}]}.
|
||||
|
||||
{rebar_packages_cdn, "https://hexpm.upyun.com"}.
|
||||
Loading…
x
Reference in New Issue
Block a user