From 46f752414140fa82d0e2efdfb4ea86ea3f264a91 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Wed, 30 Apr 2025 11:05:44 +0800 Subject: [PATCH] fix tcp_channel --- apps/efka/src/tcp_channel.erl | 102 +++++++++++++++++++++++++++++++++- 1 file changed, 100 insertions(+), 2 deletions(-) diff --git a/apps/efka/src/tcp_channel.erl b/apps/efka/src/tcp_channel.erl index dc07066..af76c28 100644 --- a/apps/efka/src/tcp_channel.erl +++ b/apps/efka/src/tcp_channel.erl @@ -13,22 +13,67 @@ %% API -export([start_link/1]). +-export([push_metric/2, push_param/2, poll/2]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -define(SERVER, ?MODULE). +%% 消息类型 + +%% 服务注册 +-define(PACKET_TYPE_REGISTER, 16). +%% 上传数据 +-define(PACKET_TYPE_METRIC_DATA, 3). +%% 消息响应 +-define(PACKET_TYPE_RESPONSE, 7). +%% efka下发给微服务参数 +-define(PACKET_TYPE_PUSH_PARAM, 5). +%% efka下发给微服务采集项 +-define(PACKET_TYPE_PUSH_METRIC, 6). +%% 设备状态轮询: 增加日期: 2025-4-16 +-define(PACKET_TYPE_POLL, 20). +%% 微服务给efka发送log消息 +-define(PACKET_TYPE_LOG, 9). +%% 微服务从efka获取自身的采集项 +-define(PACKET_TYPE_REQUEST_METRIC, 10). +%% 微服务从efka获取自身的参数 +-define(PACKET_TYPE_REQUEST_PARAM, 12). +%% 微服务事件上报 +-define(PACKET_TYPE_EVENT, 15). + -record(state, { - socket + packet_id = 1, + socket :: gen_tcp:socket(), + micro_service_id :: undefined | binary(), + is_registered = false :: boolean(), + + %% 请求响应的对应关系, #{packet_id => {ReceiverPid, Ref}} + inflight = #{} }). %%%=================================================================== %%% API %%%=================================================================== +push_param(ChannelPid, Params) when is_pid(ChannelPid), is_binary(Params) -> + Ref = make_ref(), + gen_server:cast(ChannelPid, {push_param, Ref, self(), Params}), + {ok, Ref}. + +push_metric(ChannelPid, Metrics) when is_pid(ChannelPid), is_binary(Metrics) -> + Ref = make_ref(), + gen_server:cast(ChannelPid, {push_metric, Ref, self(), Metrics}), + {ok, Ref}. + +poll(ChannelPid, Command) when is_pid(ChannelPid), is_binary(Command) -> + Ref = make_ref(), + gen_server:cast(ChannelPid, {poll, Ref, self(), Command}), + {ok, Ref}. + %% @doc Spawns the server and registers the local name (unique) --spec(start_link(Socket :: any()) -> +-spec(start_link(Socket :: gen_tcp:socket()) -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). start_link(Socket) -> gen_server:start_link(?MODULE, [Socket], []). @@ -66,6 +111,19 @@ handle_call(_Request, _From, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). +%% 推送参数项目 +handle_cast({push_param, Ref, ReceiverPid, Params}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> + ok = gen_tcp:send(Socket, <>), + {noreply, State#state{inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}; +%% 推送采集项目 +handle_cast({push_metric, Ref, ReceiverPid, Metrics}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> + ok = gen_tcp:send(Socket, <>), + {noreply, State#state{inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}; +%% 推送poll请求 +handle_cast({poll, Ref, ReceiverPid, Command}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> + ok = gen_tcp:send(Socket, <>), + {noreply, State#state{inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}; + handle_cast(_Request, State = #state{}) -> {noreply, State}. @@ -75,6 +133,46 @@ handle_cast(_Request, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). +%% 注册 +handle_info({tcp, Socket, <>}, State = #state{socket = Socket}) -> + ok = gen_tcp:send(Socket, <>), + {noreply, State#state{micro_service_id = MicroServiceId, is_registered = true}}; + +handle_info({tcp, Socket, <>}, State = #state{socket = Socket}) -> + lager:debug("[tcp_channel] get metric data: ~p", [Body]), + + ok = gen_tcp:send(Socket, <>), + {noreply, State}; + +handle_info({tcp, Socket, <>}, State = #state{socket = Socket}) -> + ok = gen_tcp:send(Socket, <>), + {noreply, State}; + +handle_info({tcp, Socket, <>}, State = #state{socket = Socket}) -> + ok = gen_tcp:send(Socket, <>), + {noreply, State}; + +%% 收到端上的响应 +handle_info({tcp, Socket, <>}, State = #state{socket = Socket, inflight = Inflight}) -> + case maps:take(PacketId, Inflight) of + error -> + lager:warning("[tcp_channel] get unknown publish response message: ~p, packet_id: ~p", [Response, PacketId]), + {noreply, State}; + {{ReceiverPid, Ref}, NInflight} -> + case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of + true -> + case Response of + <<1:8, Result/binary>> -> + ReceiverPid ! {channel_reply, Ref, {ok, Result}}; + <<0:8, Error/binary>> -> + ReceiverPid ! {channel_reply, Ref, {error, Error}} + end; + false -> + lager:warning("[tcp_channel] get publish response message: ~p, packet_id: ~p, but receiver_pid is deaded", [Response, PacketId]) + end, + {noreply, State#state{inflight = NInflight}} + end; + handle_info(Info, State = #state{}) -> lager:debug("[tcp_channel] get info: ~p", [Info]), {noreply, State}.