From 7a9d18cf5137cf5d9d8d3d4a925766a3bf8552f6 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Wed, 30 Apr 2025 09:51:03 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A7=A3=E5=86=B3tcp=5Fserver=E7=9A=84?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/efka/src/efka.app.src | 4 +- apps/efka/src/efka_client.erl | 437 +++++++++++++++++++++++++++++ apps/efka/src/efka_tcp_channel.erl | 19 +- apps/efka/src/efka_tcp_server.erl | 6 +- apps/efka/src/efka_tcp_sup.erl | 19 +- apps/efka/src/tcp_channel.erl | 102 +++++++ rebar.config | 7 +- 7 files changed, 569 insertions(+), 25 deletions(-) create mode 100644 apps/efka/src/efka_client.erl create mode 100644 apps/efka/src/tcp_channel.erl diff --git a/apps/efka/src/efka.app.src b/apps/efka/src/efka.app.src index 519f84d..5124934 100644 --- a/apps/efka/src/efka.app.src +++ b/apps/efka/src/efka.app.src @@ -6,9 +6,11 @@ {applications, [ sync, - %jiffy, + jiffy, %gpb, %mnesia, + parse_trans, + lager, crypto, inets, ssl, diff --git a/apps/efka/src/efka_client.erl b/apps/efka/src/efka_client.erl new file mode 100644 index 0000000..55eda12 --- /dev/null +++ b/apps/efka/src/efka_client.erl @@ -0,0 +1,437 @@ +%%%------------------------------------------------------------------- +%%% @copyright (C) 2023, +%%% @doc +%%% +%%% @end +%%% Created : 28. 8月 2023 15:39 +%%%------------------------------------------------------------------- +-module(efka_client). +-author("aresei"). + +-behaviour(gen_server). + +%% 消息包 +-record(efka_packet, { + packet_id :: integer(), + type :: integer(), + message :: any() +}). + +%% 请求超时时间 +-define(EFKA_REQUEST_TIMEOUT, 5000). + +%% 消息类型 + +%% 服务注册 +-define(PACKET_TYPE_REGISTER, 16). +%% 上传数据 +-define(PACKET_TYPE_METRIC_DATA, 3). +%% 调用其他微服务 +-define(PACKET_TYPE_INVOKE, 4). +%% 消息响应 +-define(PACKET_TYPE_RESPONSE, 7). +%% efka下发给微服务参数 +-define(PACKET_TYPE_PUSH_PARAM, 5). +%% efka下发给微服务采集项 +-define(PACKET_TYPE_PUSH_METRIC, 6). +%% 设备状态轮询: 增加日期: 2025-4-16 +-define(PACKET_TYPE_POLL, 20). +%% 微服务给efka发送log消息 +-define(PACKET_TYPE_LOG, 9). +%% 微服务从efka获取自身的采集项 +-define(PACKET_TYPE_REQUEST_METRIC, 10). +%% 微服务从efka获取自身的参数 +-define(PACKET_TYPE_REQUEST_PARAM, 12). +%% efka向微服务发送stream-call消息 +-define(PACKET_TYPE_PUSH_STREAM_CALL, 11). +%% 微服务事件上报 +-define(PACKET_TYPE_EVENT, 15). + +%% API +-export([start_link/3]). +-export([device_offline/1, device_online/1]). +-export([send_metric_data/2, invoke_service/3, send_log/1, request_metric/0, request_param/0, send_event/2]). + +-export([test/0]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-record(state, { + packet_id = 0 :: integer(), + host :: string(), + port :: integer(), + %% 请求后未完成的请求 + inflight = #{} :: map(), + socket :: gen_tcp:socket() +}). + +-spec send_metric_data(Fields :: list(), Tags :: #{}) -> {ok, Result :: any()} | {error, Reason :: any()}. +send_metric_data(Fields, Tags) when is_list(Fields), is_map(Tags) -> + {ok, Ref} = gen_server:call(?MODULE, {send_metric_data, self(), Fields, Tags}), + await_reply(Ref, ?EFKA_REQUEST_TIMEOUT). + +-spec invoke_service(ToService :: binary(), Message :: map(), Timeout :: integer()) -> + {ok, Result :: any()} | {error, Reason :: any()}. +invoke_service(ToService, Message, Timeout) when is_binary(ToService), is_map(Message), is_integer(Timeout) -> + {ok, Ref} = gen_server:call(?MODULE, {invoke_service, self(), ToService, Message, Timeout}), + await_reply(Ref, ?EFKA_REQUEST_TIMEOUT). + +-spec send_log(Message :: binary() | map()) -> no_return(). +send_log(Message) when is_binary(Message); is_map(Message) -> + gen_server:cast(?MODULE, {send_log, Message}). + +%% efka_server为了统一,r对象为字符串;需要2次json_decode +-spec request_metric() -> {ok, Result :: list()} | {error, Reason :: any()}. +request_metric() -> + {ok, Ref} = gen_server:call(?MODULE, {request_metric, self()}), + case await_reply(Ref, ?EFKA_REQUEST_TIMEOUT) of + {ok, Reply} -> + {ok, jiffy:decode(Reply, [return_maps])}; + Error -> + Error + end. + +-spec request_param() -> {ok, Result :: map()} | {error, Reason :: any()}. +request_param() -> + {ok, Ref} = gen_server:call(?MODULE, {request_param, self()}), + case await_reply(Ref, ?EFKA_REQUEST_TIMEOUT) of + {ok, Reply} -> + {ok, jiffy:decode(Reply, [return_maps])}; + Error -> + Error + end. + +-spec device_offline(DeviceUUID :: binary()) -> no_return(). +device_offline(DeviceUUID) when is_binary(DeviceUUID) -> + send_event(1, #{<<"device_uuid">> => DeviceUUID, <<"status">> => 0}). + +-spec device_online(DeviceUUID :: binary()) -> no_return(). +device_online(DeviceUUID) when is_binary(DeviceUUID) -> + send_event(1, #{<<"device_uuid">> => DeviceUUID, <<"status">> => 1}). + +-spec send_event(EventType :: integer(), Params :: binary() | map()) -> no_return(). +send_event(EventType, Params) when is_integer(EventType), is_binary(Params); is_map(Params) -> + gen_server:cast(?MODULE, {send_event, EventType, Params}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +-spec await_reply(Ref :: reference(), Timeout :: integer()) -> {ok, Result :: any()} | {error, Reason :: any()}. +await_reply(Ref, Timeout) when is_reference(Ref), is_integer(Timeout) -> + receive + {response, Ref, {ok, Reply}} -> + {ok, Reply}; + {response, Ref, {error, Reason}} -> + {error, Reason} + after + Timeout -> + {error, timeout} + end. + +test() -> + start_link(<<"test">>, "localhost", 18080). + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link(RegisterName :: binary(), Host :: string(), Port :: integer()) -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link(RegisterName, Host, Port) when is_binary(RegisterName), is_list(Host), is_integer(Port) -> + gen_server:start_link(?MODULE, [RegisterName, Host, Port], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([RegisterName, Host, Port]) -> + {ok, Socket} = gen_tcp:connect(Host, Port, [binary, {packet, 4}, {active, true}]), + ok = gen_tcp:controlling_process(Socket, self()), + case do_register(RegisterName, Socket) of + ok -> + {ok, #state{packet_id = 1, host = Host, port = Port, socket = Socket}}; + {error, Reason} -> + {stop, Reason} + end. + +%% 执行到efka服务器的注册 +do_register(RegisterName, Socket) -> + PacketId = 0, + Body = #{<<"name">> => RegisterName}, + Packet = pack(PacketId, ?PACKET_TYPE_REGISTER, Body), + ok = gen_tcp:send(Socket, Packet), + lager:debug("[efka_client] will send packet: ~p", [Packet]), + receive + {tcp, Socket, Data} -> + RegisterPacket = unpack(Data), + lager:debug("[efka_client] get register reply packet: ~p", [RegisterPacket]), + case RegisterPacket of + #efka_packet{packet_id = PacketId, type = ?PACKET_TYPE_RESPONSE, message = #{<<"c">> := 1, <<"r">> := <<"ok">>}} -> + ok; + #efka_packet{packet_id = PacketId, type = ?PACKET_TYPE_RESPONSE, message = #{<<"c">> := 0, <<"e">> := Error}} -> + {error, Error}; + _ -> + {error, invalid_register_packet} + end + after + ?EFKA_REQUEST_TIMEOUT -> + {error, timeout} + end. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call({send_metric_data, ReceiverPid, Fields, Tags}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> + Body = #{<<"data">> => Fields, <<"tag">> => Tags}, + Packet = pack(PacketId, ?PACKET_TYPE_METRIC_DATA, Body), + ok = gen_tcp:send(Socket, Packet), + Ref = make_ref(), + {reply, {ok, Ref}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}}; + +handle_call({invoke_service, ReceiverPid, ToService, Message, Timeout}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> + Body = #{ + <<"to">> => ToService, + <<"t">> => Timeout, + <<"m">> => Message + }, + Packet = pack(PacketId, ?PACKET_TYPE_INVOKE, Body), + ok = gen_tcp:send(Socket, Packet), + Ref = make_ref(), + {reply, {ok, Ref}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}}; + +handle_call({request_metric, ReceiverPid}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> + Packet = pack(PacketId, ?PACKET_TYPE_REQUEST_METRIC), + ok = gen_tcp:send(Socket, Packet), + Ref = make_ref(), + {reply, {ok, Ref}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}}; + +handle_call({request_param, ReceiverPid}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> + Packet = pack(PacketId, ?PACKET_TYPE_REQUEST_PARAM), + ok = gen_tcp:send(Socket, Packet), + Ref = make_ref(), + {reply, {ok, Ref}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast({send_log, Message}, State = #state{socket = Socket, packet_id = PacketId}) -> + Body = #{<<"l">> => Message}, + Packet = pack(PacketId, ?PACKET_TYPE_LOG, Body), + ok = gen_tcp:send(Socket, Packet), + + {noreply, State#state{packet_id = next_packet_id(PacketId)}}; + +handle_cast({send_event, EventType, Params}, State = #state{socket = Socket}) -> + Body = #{ + <<"event_type">> => EventType, + <<"params">> => Params + }, + Packet = pack(0, ?PACKET_TYPE_EVENT, Body), + ok = gen_tcp:send(Socket, Packet), + {noreply, State}; + +handle_cast(_Info, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info({tcp, Socket, Data}, State = #state{socket = Socket}) -> + Packet = unpack(Data), + self() ! {handle_packet, Packet}, + {noreply, State}; +handle_info({tcp_closed, Socket}, State = #state{socket = Socket}) -> + {stop, tcp_closed, State}; + +%% 收到请求的响应 +handle_info({handle_packet, #efka_packet{packet_id = PacketId, type = ?PACKET_TYPE_RESPONSE, message = Message}}, State = #state{inflight = Inflight}) -> + case maps:take(PacketId, Inflight) of + error -> + {noreply, State}; + {{Ref, ReceiverPid}, NInflight} -> + case Message of + #{<<"c">> := 1, <<"r">> := Result} -> + ReceiverPid ! {response, Ref, {ok, Result}}; + #{<<"c">> := 0, <<"e">> := Error} -> + ReceiverPid ! {response, Ref, {error, Error}} + end, + {noreply, State#state{inflight = NInflight}} + end; + +%% 收到efka推送的参数设置 +handle_info({handle_packet, #efka_packet{packet_id = PacketId, type = ?PACKET_TYPE_PUSH_PARAM, message = Params}}, + State = #state{socket = Socket}) when is_map(Params) -> + + Message = case handle_param(Params) of + ok -> + #{<<"c">> => 1, <<"r">> => <<"ok">>}; + {error, Reason} -> + #{<<"c">> => 0, <<"e">> => Reason} + end, + Packet = pack(PacketId, ?PACKET_TYPE_RESPONSE, Message), + ok = gen_tcp:send(Socket, Packet), + + {noreply, State}; + +%% 收到efka推送的采集项消息 +handle_info({handle_packet, #efka_packet{packet_id = PacketId, type = ?PACKET_TYPE_PUSH_METRIC, message = Metrics}}, + State = #state{socket = Socket}) when is_list(Metrics) -> + + Message = case handle_metric(Metrics) of + ok -> + #{<<"c">> => 1, <<"r">> => <<"ok">>}; + {error, Reason} when is_binary(Reason) -> + #{<<"c">> => 0, <<"e">> => Reason} + end, + Packet = pack(PacketId, ?PACKET_TYPE_RESPONSE, Message), + ok = gen_tcp:send(Socket, Packet), + + {noreply, State}; + +%% 收到设备状态的轮询请求 +handle_info({handle_packet, #efka_packet{packet_id = PacketId, type = ?PACKET_TYPE_POLL, message = Command}}, State = #state{socket = Socket}) -> + Message = handle_poll_command(Command), + Packet = pack(PacketId, ?PACKET_TYPE_RESPONSE, Message), + ok = gen_tcp:send(Socket, Packet), + + {noreply, State}; + +%% 收到efka的stream-call消息 +handle_info({handle_packet, #efka_packet{packet_id = PacketId, type = ?PACKET_TYPE_PUSH_STREAM_CALL, + message = Msg = #{<<"service_name">> := ServiceName, <<"data">> := Data, <<"tag">> := Tag}}}, State = #state{socket = Socket}) -> + + Message = case handle_stream_call(ServiceName, Data, Tag) of + {continue, NewServiceName, NewData, NewTag} -> + #{ + <<"c">> => 1, + <<"r">> => Msg#{<<"service_name">> := NewServiceName, <<"data">> => NewData, <<"tag">> => NewTag}, + <<"k">> => true + }; + %% 处理到当前节点为止,不继续往下传递 + {break, NewServiceName, NewData, NewTag} -> + #{ + <<"c">> => 1, + <<"r">> => Msg#{<<"service_name">> := NewServiceName, <<"data">> => NewData, <<"tag">> => NewTag}, + <<"k">> => false + }; + error -> + #{ + <<"c">> => 0, + <<"r">> => Msg, + <<"k">> => true + } + end, + Packet = pack(PacketId, ?PACKET_TYPE_RESPONSE, Message), + ok = gen_tcp:send(Socket, Packet), + + {noreply, State}; + +%% 其他消息为非法消息 +handle_info({handle_packet, _Packet}, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{socket = Socket}) -> + gen_tcp:close(Socket), + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +-spec next_packet_id(PacketId :: integer()) -> NextPacketId :: integer(). +next_packet_id(PacketId) when PacketId >= 65535 -> + 0; +next_packet_id(PacketId) -> + PacketId + 1. + +-spec pack(PacketId :: integer(), Type :: integer()) -> binary(). +pack(PacketId, Type) when is_integer(PacketId), is_integer(Type) -> + <>. + +-spec pack(PacketId :: integer(), Type :: integer(), Body :: map()) -> binary(). +pack(PacketId, Type, Body) when is_integer(PacketId), is_integer(Type), is_map(Body) -> + Message = iolist_to_binary(jiffy:encode(Body, [force_utf8])), + <>. + +-spec unpack(binary()) -> #efka_packet{}. +unpack(<>) -> + Message = catch jiffy:decode(Body, [return_maps]), + #efka_packet{packet_id = PacketId, type = Type, message = Message}. + +%%%=================================================================== +%%% simple callbacks +%%%=================================================================== + +handle_poll_command(#{<<"device_uuid">> := DeviceUUID, <<"command">> := <<"query_status">>}) when is_binary(DeviceUUID) -> + case power_device:get_pid(DeviceUUID) of + undefined -> + #{ + <<"c">> => 1, + <<"r">> => #{ + <<"edge_status">> => -1, + <<"message">> => <<"设备信息不存在"/utf8>> + } + }; + Pid -> + StatusMap = #{ + 0 => <<"离线"/utf8>>, + 1 => <<"在线"/utf8>> + }, + {ok, Status} = power_device:poll_status(Pid), + #{ + <<"c">> => 1, + <<"r">> => #{ + <<"edge_status">> => Status, + <<"message">> => maps:get(Status, StatusMap) + } + } + end. + +-spec handle_param(Params :: map()) -> ok | {error, Reason :: binary()}. +handle_param(Params) when is_map(Params) -> + power_gateway_args:push_param(Params), + ok. + +-spec handle_metric(Metric :: list()) -> ok | {error, Reason :: binary()}. +handle_metric(Metric) when is_list(Metric) -> + power_gateway_args:push_metric(Metric), + ok. + +-spec handle_stream_call(ServiceName :: binary(), Fields :: list(), Tag :: map()) -> + {continue, NewServiceName :: binary(), NewFields :: list(), NewTag :: map()} + | {break, NewServiceName :: binary(), NewFields :: list(), NewTag :: map()} + | error. +handle_stream_call(ServiceName, Fields, Tag) when is_binary(ServiceName), is_list(Fields), is_map(Tag) -> + {continue, ServiceName, Fields, Tag}. \ No newline at end of file diff --git a/apps/efka/src/efka_tcp_channel.erl b/apps/efka/src/efka_tcp_channel.erl index df6eeb2..ed33db4 100644 --- a/apps/efka/src/efka_tcp_channel.erl +++ b/apps/efka/src/efka_tcp_channel.erl @@ -62,13 +62,13 @@ stop(Pid, Reason) when is_pid(Pid) -> %%-------------------------------------------------------------------- start_link(Socket) -> - {ok, proc_lib:spawn_link(?MODULE, init, [Socket])}. + gen_server:start_link(?MODULE, [Socket], []). -init(Socket) -> +init([Socket]) -> efka_logger:debug("[sdlan_channel] get a new connection: ~p", [Socket]), - ok = inet:setopts(Socket, [{active, true}, {packet, 4}]), - - erlang:start_timer(?PING_TICKER, self(), ping_ticker), + gen_tcp:controlling_process(Socket, self()), + ok = inet:setopts(Socket, [binary, {active, true}, {packet, 4}]), + % erlang:start_timer(?PING_TICKER, self(), ping_ticker), gen_server:enter_loop(?MODULE, [], #state{socket = Socket}). handle_call(_Request, _From, State) -> @@ -78,9 +78,8 @@ handle_cast(_Msg, State) -> {noreply, State}. %% 网络流量统计 -handle_info({tcp, _Sock, <>}, State = #state{is_registered = true}) -> - % #sdl_flows{forward_num = ForwardNum, p2p_num = P2PNum, inbound_num = InboundNum} = sdlan_pb:decode_msg(Body, sdl_flows), - efka_logger:debug("[sdlan_channel] body: ~p", [Body]), +handle_info({tcp, _Sock, Body}, State = #state{}) -> + efka_logger:debug("[sdlan_channel] read body: ~p", [Body]), {noreply, State}; handle_info({tcp_error, Sock, Reason}, State = #state{socket = Sock}) -> @@ -96,11 +95,11 @@ handle_info({stop, Reason}, State) -> {stop, Reason, State}; handle_info(Info, State) -> - efka_logger:warning("[sdlan_channel] get a unknown message: ~p, channel will closed", [Info]), + efka_logger:debug("[sdlan_channel] get a unknown message: ~p, channel will closed", [Info]), {noreply, State}. terminate(Reason, #state{}) -> - efka_logger:warning("[sdlan_channel] stop with reason: ~p", [Reason]), + efka_logger:notice("[sdlan_channel] stop with reason: ~p", [Reason]), ok. code_change(_OldVsn, State, _Extra) -> diff --git a/apps/efka/src/efka_tcp_server.erl b/apps/efka/src/efka_tcp_server.erl index efd6f31..45c03a9 100644 --- a/apps/efka/src/efka_tcp_server.erl +++ b/apps/efka/src/efka_tcp_server.erl @@ -17,7 +17,7 @@ start_link(Port) -> %% 监听循环 init(Port) -> - case gen_tcp:listen(Port, [binary, {active, false}, {reuseaddr, true}]) of + case gen_tcp:listen(Port, [binary, {packet, 4}, {active, false}, {reuseaddr, true}]) of {ok, ListenSocket} -> efka_logger:debug("Server started on port ~p~n", [Port]), main_loop(ListenSocket); @@ -29,9 +29,9 @@ init(Port) -> main_loop(ListenSocket) -> case gen_tcp:accept(ListenSocket) of {ok, Socket} -> - efka_logger:debug("New client connected: ~p~n", [Socket]), % 为每个新连接生成一个处理进程 - efka_tcp_sup:start_child(Socket), + {ok, ChannelPid} = efka_tcp_sup:start_child(Socket), + ok = gen_tcp:controlling_process(Socket, ChannelPid), % 继续监听下一个连接 main_loop(ListenSocket); {error, closed} -> diff --git a/apps/efka/src/efka_tcp_sup.erl b/apps/efka/src/efka_tcp_sup.erl index c65a50c..b4f23bc 100644 --- a/apps/efka/src/efka_tcp_sup.erl +++ b/apps/efka/src/efka_tcp_sup.erl @@ -26,19 +26,18 @@ start_link() -> %% type => worker(), % optional %% modules => modules()} % optional init([]) -> - SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600}, - {ok, {SupFlags, []}}. + SupFlags = #{strategy => simple_one_for_one, intensity => 0, period => 1}, + ChildSpec = #{ + id => tcp_channel, + start => {tcp_channel, start_link, []}, + restart => temporary, + type => worker + }, + {ok, {SupFlags, [ChildSpec]}}. %% internal functions start_child(Socket) -> - supervisor:start_child(?MODULE, #{ - id => make_ref(), - start => {efka_tcp_channel, start_link, [Socket]}, - restart => permanent, - shutdown => 2000, - type => worker, - modules => ['efka_tcp_channel'] - }). + supervisor:start_child(?MODULE, [Socket]). diff --git a/apps/efka/src/tcp_channel.erl b/apps/efka/src/tcp_channel.erl new file mode 100644 index 0000000..dc07066 --- /dev/null +++ b/apps/efka/src/tcp_channel.erl @@ -0,0 +1,102 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2025, +%%% @doc +%%% +%%% @end +%%% Created : 30. 4月 2025 09:22 +%%%------------------------------------------------------------------- +-module(tcp_channel). +-author("anlicheng"). + +-behaviour(gen_server). + +%% API +-export([start_link/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, { + socket +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link(Socket :: any()) -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link(Socket) -> + gen_server:start_link(?MODULE, [Socket], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([Socket]) -> + ok = inet:setopts(Socket, [{active, true}]), + lager:debug("[tcp_channel] get new socket: ~p", [Socket]), + {ok, #state{socket = Socket}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(_Request, _From, State = #state{}) -> + {reply, ok, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast(_Request, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info(Info, State = #state{}) -> + lager:debug("[tcp_channel] get info: ~p", [Info]), + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== diff --git a/rebar.config b/rebar.config index 7cddb0e..b91b703 100644 --- a/rebar.config +++ b/rebar.config @@ -2,7 +2,10 @@ {deps, [ {sync, ".*", {git, "https://github.com/rustyio/sync.git", {branch, "master"}}}, {jiffy, ".*", {git, "https://github.com/davisp/jiffy.git", {tag, "1.1.2"}}}, - {gpb, ".*", {git, "https://github.com/tomas-abrahamsson/gpb.git", {tag, "4.20.0"}}} + {gpb, ".*", {git, "https://github.com/tomas-abrahamsson/gpb.git", {tag, "4.20.0"}}}, + {jiffy, ".*", {git, "https://github.com/davisp/jiffy.git", {tag, "1.1.1"}}}, + {parse_trans, ".*", {git, "https://github.com/uwiger/parse_trans", {tag, "3.0.0"}}}, + {lager, ".*", {git,"https://github.com/erlang-lager/lager.git", {tag, "3.9.2"}}} ]}. {relx, [{release, {efka, "0.1.0"}, @@ -40,4 +43,6 @@ {pc, {git, "https://github.com/blt/port_compiler.git", {tag, "v1.15.0"}}} ]}. +{erl_opts, [{parse_transform,lager_transform}]}. + {rebar_packages_cdn, "https://hexpm.upyun.com"}. \ No newline at end of file