diff --git a/apps/efka/src/efka_client.erl b/apps/efka/src/efka_client.erl index ef85625..b4d7fb5 100644 --- a/apps/efka/src/efka_client.erl +++ b/apps/efka/src/efka_client.erl @@ -23,26 +23,26 @@ %% 消息类型 %% 服务注册 --define(PACKET_TYPE_REGISTER, 16). +-define(PACKET_REGISTER, 16). %% 上传数据 --define(PACKET_TYPE_METRIC_DATA, 3). +-define(PACKET_METRIC_DATA, 3). %% 消息响应 --define(PACKET_TYPE_RESPONSE, 7). +-define(PACKET_RESPONSE, 7). %% efka下发给微服务参数 --define(PACKET_TYPE_PUSH_PARAM, 5). +-define(PACKET_PUSH_PARAM, 5). %% efka下发给微服务采集项 --define(PACKET_TYPE_PUSH_METRIC, 6). -%% 设备状态轮询: 增加日期: 2025-4-16 --define(PACKET_TYPE_POLL, 20). +-define(PACKET_PUSH_METRIC, 6). + %% 微服务给efka发送log消息 --define(PACKET_TYPE_LOG, 9). +-define(PACKET_LOG, 9). + %% 微服务从efka获取自身的采集项 --define(PACKET_TYPE_REQUEST_METRIC, 10). +-define(PACKET_REQUEST_METRIC, 10). %% 微服务从efka获取自身的参数 --define(PACKET_TYPE_REQUEST_PARAM, 12). +-define(PACKET_REQUEST_PARAM, 12). %% 微服务事件上报 --define(PACKET_TYPE_EVENT, 15). --define(PACKET_TYPE_AI_EVENT, 16). +-define(PACKET_EVENT, 15). +-define(PACKET_AI_EVENT, 16). %% API -export([start_link/3]). @@ -148,13 +148,13 @@ init([MicroServiceId, Host, Port]) -> {ok, Socket} = gen_tcp:connect(Host, Port, [binary, {packet, 4}, {active, true}]), ok = gen_tcp:controlling_process(Socket, self()), - Packet = <<1:32, ?PACKET_TYPE_REGISTER:8, MicroServiceId/binary>>, + Packet = <<1:32, ?PACKET_REGISTER:8, MicroServiceId/binary>>, ok = gen_tcp:send(Socket, Packet), lager:debug("[efka_client] will send packet: ~p", [Packet]), receive - {tcp, Socket, <<1:32, ?PACKET_TYPE_RESPONSE, 1:8>>} -> + {tcp, Socket, <<1:32, ?PACKET_RESPONSE, 1:8>>} -> {ok, #state{packet_id = 2, host = Host, port = Port, socket = Socket}}; - {tcp, Socket, <<1:32, ?PACKET_TYPE_RESPONSE, 0:8, Error/binary>>} -> + {tcp, Socket, <<1:32, ?PACKET_RESPONSE, 0:8, Error/binary>>} -> {stop, Error} after ?EFKA_REQUEST_TIMEOUT -> @@ -176,14 +176,14 @@ handle_call({controller_process, ControllerPid}, _From, State) -> {reply, ok, State#state{controller_process = ControllerPid}}; handle_call({request_metric, ReceiverPid}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> - Packet = <>, + Packet = <>, ok = gen_tcp:send(Socket, Packet), Ref = make_ref(), {reply, {ok, Ref}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}}; handle_call({request_param, ReceiverPid}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> - Packet = <>, + Packet = <>, ok = gen_tcp:send(Socket, Packet), Ref = make_ref(), @@ -197,27 +197,27 @@ handle_call({request_param, ReceiverPid}, _From, State = #state{socket = Socket, {stop, Reason :: term(), NewState :: #state{}}). handle_cast({send_metric_data, Fields, Tags}, State = #state{socket = Socket}) -> Body = jiffy:encode(#{<<"data">> => Fields, <<"tag">> => Tags}, [force_utf8]), - Packet = <<0:32, ?PACKET_TYPE_METRIC_DATA, Body/binary>>, + Packet = <<0:32, ?PACKET_METRIC_DATA, Body/binary>>, ok = gen_tcp:send(Socket, Packet), {noreply, State}; handle_cast({send_log, LogMessage}, State = #state{socket = Socket}) -> - Packet = <<0:32, ?PACKET_TYPE_LOG:8, LogMessage/binary>>, + Packet = <<0:32, ?PACKET_LOG:8, LogMessage/binary>>, ok = gen_tcp:send(Socket, Packet), {noreply, State}; handle_cast({send_event, EventType, Params}, State = #state{socket = Socket}) -> Body = jiffy:encode(#{<<"event_type">> => EventType, <<"params">> => Params}, [force_utf8]), - Packet = <<0:32, ?PACKET_TYPE_EVENT:8, Body/binary>>, + Packet = <<0:32, ?PACKET_EVENT:8, Body/binary>>, ok = gen_tcp:send(Socket, Packet), {noreply, State}; handle_cast({send_ai_event, EventType, Params}, State = #state{socket = Socket}) -> Body = jiffy:encode(#{<<"event_type">> => EventType, <<"params">> => Params}, [force_utf8]), - Packet = <<0:32, ?PACKET_TYPE_AI_EVENT:8, Body/binary>>, + Packet = <<0:32, ?PACKET_AI_EVENT:8, Body/binary>>, ok = gen_tcp:send(Socket, Packet), {noreply, State}; @@ -233,7 +233,7 @@ handle_cast(_Info, State = #state{}) -> {stop, Reason :: term(), NewState :: #state{}}). %% 收到请求的响应 -handle_info({tcp, Socket, <>}, State = #state{socket = Socket, inflight = Inflight}) -> +handle_info({tcp, Socket, <>}, State = #state{socket = Socket, inflight = Inflight}) -> case maps:take(PacketId, Inflight) of error -> {noreply, State}; @@ -248,7 +248,7 @@ handle_info({tcp, Socket, <>}, State = #state{socket = Socket, controller_process = ControllerPid}) -> +handle_info({tcp, Socket, <>}, State = #state{socket = Socket, controller_process = ControllerPid}) -> Message = case is_pid(ControllerPid) andalso is_process_alive(ControllerPid) of true -> Ref = make_ref(), @@ -264,13 +264,13 @@ handle_info({tcp, Socket, < <<0:8, "处理进程异常"/utf8>> end, - Packet = <>, + Packet = <>, ok = gen_tcp:send(Socket, Packet), {noreply, State}; %% 收到efka推送的采集项消息 -handle_info({tcp, <>}, State = #state{socket = Socket, controller_process = ControllerPid}) -> +handle_info({tcp, <>}, State = #state{socket = Socket, controller_process = ControllerPid}) -> Message = case is_pid(ControllerPid) andalso is_process_alive(ControllerPid) of true -> Ref = make_ref(), @@ -287,7 +287,7 @@ handle_info({tcp, <>}, <<0:8, "处理进程异常"/utf8>> end, - Packet = <>, + Packet = <>, ok = gen_tcp:send(Socket, Packet), {noreply, State}; diff --git a/apps/efka/src/tcp_channel.erl b/apps/efka/src/tcp_channel.erl index bbf9e68..abe6362 100644 --- a/apps/efka/src/tcp_channel.erl +++ b/apps/efka/src/tcp_channel.erl @@ -23,7 +23,7 @@ %% 消息类型 %% 服务注册 --define(PACKET_TYPE_REGISTER, 16). +-define(PACKET_REGISTER, 16). %% 上传数据 -define(PACKET_TYPE_METRIC_DATA, 3). %% 消息响应 @@ -137,7 +137,7 @@ handle_cast(_Request, State = #state{}) -> {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). %% 注册 -handle_info({tcp, Socket, <>}, State = #state{socket = Socket}) -> +handle_info({tcp, Socket, <>}, State = #state{socket = Socket}) -> ok = gen_tcp:send(Socket, <>), {noreply, State#state{micro_service_id = MicroServiceId, is_registered = true}};