fix client

This commit is contained in:
anlicheng 2025-05-06 16:52:14 +08:00
parent 82325ac440
commit f7e827df38
2 changed files with 28 additions and 28 deletions

View File

@ -23,26 +23,26 @@
%% %%
%% %%
-define(PACKET_TYPE_REGISTER, 16). -define(PACKET_REGISTER, 16).
%% %%
-define(PACKET_TYPE_METRIC_DATA, 3). -define(PACKET_METRIC_DATA, 3).
%% %%
-define(PACKET_TYPE_RESPONSE, 7). -define(PACKET_RESPONSE, 7).
%% efka下发给微服务参数 %% efka下发给微服务参数
-define(PACKET_TYPE_PUSH_PARAM, 5). -define(PACKET_PUSH_PARAM, 5).
%% efka下发给微服务采集项 %% efka下发给微服务采集项
-define(PACKET_TYPE_PUSH_METRIC, 6). -define(PACKET_PUSH_METRIC, 6).
%% : : 2025-4-16
-define(PACKET_TYPE_POLL, 20).
%% efka发送log消息 %% efka发送log消息
-define(PACKET_TYPE_LOG, 9). -define(PACKET_LOG, 9).
%% efka获取自身的采集项 %% efka获取自身的采集项
-define(PACKET_TYPE_REQUEST_METRIC, 10). -define(PACKET_REQUEST_METRIC, 10).
%% efka获取自身的参数 %% efka获取自身的参数
-define(PACKET_TYPE_REQUEST_PARAM, 12). -define(PACKET_REQUEST_PARAM, 12).
%% %%
-define(PACKET_TYPE_EVENT, 15). -define(PACKET_EVENT, 15).
-define(PACKET_TYPE_AI_EVENT, 16). -define(PACKET_AI_EVENT, 16).
%% API %% API
-export([start_link/3]). -export([start_link/3]).
@ -148,13 +148,13 @@ init([MicroServiceId, Host, Port]) ->
{ok, Socket} = gen_tcp:connect(Host, Port, [binary, {packet, 4}, {active, true}]), {ok, Socket} = gen_tcp:connect(Host, Port, [binary, {packet, 4}, {active, true}]),
ok = gen_tcp:controlling_process(Socket, self()), ok = gen_tcp:controlling_process(Socket, self()),
Packet = <<1:32, ?PACKET_TYPE_REGISTER:8, MicroServiceId/binary>>, Packet = <<1:32, ?PACKET_REGISTER:8, MicroServiceId/binary>>,
ok = gen_tcp:send(Socket, Packet), ok = gen_tcp:send(Socket, Packet),
lager:debug("[efka_client] will send packet: ~p", [Packet]), lager:debug("[efka_client] will send packet: ~p", [Packet]),
receive receive
{tcp, Socket, <<1:32, ?PACKET_TYPE_RESPONSE, 1:8>>} -> {tcp, Socket, <<1:32, ?PACKET_RESPONSE, 1:8>>} ->
{ok, #state{packet_id = 2, host = Host, port = Port, socket = Socket}}; {ok, #state{packet_id = 2, host = Host, port = Port, socket = Socket}};
{tcp, Socket, <<1:32, ?PACKET_TYPE_RESPONSE, 0:8, Error/binary>>} -> {tcp, Socket, <<1:32, ?PACKET_RESPONSE, 0:8, Error/binary>>} ->
{stop, Error} {stop, Error}
after after
?EFKA_REQUEST_TIMEOUT -> ?EFKA_REQUEST_TIMEOUT ->
@ -176,14 +176,14 @@ handle_call({controller_process, ControllerPid}, _From, State) ->
{reply, ok, State#state{controller_process = ControllerPid}}; {reply, ok, State#state{controller_process = ControllerPid}};
handle_call({request_metric, ReceiverPid}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> handle_call({request_metric, ReceiverPid}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
Packet = <<PacketId:32, ?PACKET_TYPE_REQUEST_METRIC:8>>, Packet = <<PacketId:32, ?PACKET_REQUEST_METRIC:8>>,
ok = gen_tcp:send(Socket, Packet), ok = gen_tcp:send(Socket, Packet),
Ref = make_ref(), Ref = make_ref(),
{reply, {ok, Ref}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}}; {reply, {ok, Ref}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}};
handle_call({request_param, ReceiverPid}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> handle_call({request_param, ReceiverPid}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
Packet = <<PacketId:32, ?PACKET_TYPE_REQUEST_PARAM:8>>, Packet = <<PacketId:32, ?PACKET_REQUEST_PARAM:8>>,
ok = gen_tcp:send(Socket, Packet), ok = gen_tcp:send(Socket, Packet),
Ref = make_ref(), Ref = make_ref(),
@ -197,27 +197,27 @@ handle_call({request_param, ReceiverPid}, _From, State = #state{socket = Socket,
{stop, Reason :: term(), NewState :: #state{}}). {stop, Reason :: term(), NewState :: #state{}}).
handle_cast({send_metric_data, Fields, Tags}, State = #state{socket = Socket}) -> handle_cast({send_metric_data, Fields, Tags}, State = #state{socket = Socket}) ->
Body = jiffy:encode(#{<<"data">> => Fields, <<"tag">> => Tags}, [force_utf8]), Body = jiffy:encode(#{<<"data">> => Fields, <<"tag">> => Tags}, [force_utf8]),
Packet = <<0:32, ?PACKET_TYPE_METRIC_DATA, Body/binary>>, Packet = <<0:32, ?PACKET_METRIC_DATA, Body/binary>>,
ok = gen_tcp:send(Socket, Packet), ok = gen_tcp:send(Socket, Packet),
{noreply, State}; {noreply, State};
handle_cast({send_log, LogMessage}, State = #state{socket = Socket}) -> handle_cast({send_log, LogMessage}, State = #state{socket = Socket}) ->
Packet = <<0:32, ?PACKET_TYPE_LOG:8, LogMessage/binary>>, Packet = <<0:32, ?PACKET_LOG:8, LogMessage/binary>>,
ok = gen_tcp:send(Socket, Packet), ok = gen_tcp:send(Socket, Packet),
{noreply, State}; {noreply, State};
handle_cast({send_event, EventType, Params}, State = #state{socket = Socket}) -> handle_cast({send_event, EventType, Params}, State = #state{socket = Socket}) ->
Body = jiffy:encode(#{<<"event_type">> => EventType, <<"params">> => Params}, [force_utf8]), Body = jiffy:encode(#{<<"event_type">> => EventType, <<"params">> => Params}, [force_utf8]),
Packet = <<0:32, ?PACKET_TYPE_EVENT:8, Body/binary>>, Packet = <<0:32, ?PACKET_EVENT:8, Body/binary>>,
ok = gen_tcp:send(Socket, Packet), ok = gen_tcp:send(Socket, Packet),
{noreply, State}; {noreply, State};
handle_cast({send_ai_event, EventType, Params}, State = #state{socket = Socket}) -> handle_cast({send_ai_event, EventType, Params}, State = #state{socket = Socket}) ->
Body = jiffy:encode(#{<<"event_type">> => EventType, <<"params">> => Params}, [force_utf8]), Body = jiffy:encode(#{<<"event_type">> => EventType, <<"params">> => Params}, [force_utf8]),
Packet = <<0:32, ?PACKET_TYPE_AI_EVENT:8, Body/binary>>, Packet = <<0:32, ?PACKET_AI_EVENT:8, Body/binary>>,
ok = gen_tcp:send(Socket, Packet), ok = gen_tcp:send(Socket, Packet),
{noreply, State}; {noreply, State};
@ -233,7 +233,7 @@ handle_cast(_Info, State = #state{}) ->
{stop, Reason :: term(), NewState :: #state{}}). {stop, Reason :: term(), NewState :: #state{}}).
%% %%
handle_info({tcp, Socket, <<PacketId:32, ?PACKET_TYPE_RESPONSE:8, Message/binary>>}, State = #state{socket = Socket, inflight = Inflight}) -> handle_info({tcp, Socket, <<PacketId:32, ?PACKET_RESPONSE:8, Message/binary>>}, State = #state{socket = Socket, inflight = Inflight}) ->
case maps:take(PacketId, Inflight) of case maps:take(PacketId, Inflight) of
error -> error ->
{noreply, State}; {noreply, State};
@ -248,7 +248,7 @@ handle_info({tcp, Socket, <<PacketId:32, ?PACKET_TYPE_RESPONSE:8, Message/binary
end; end;
%% efka推送的参数设置 %% efka推送的参数设置
handle_info({tcp, Socket, <<PacketId:32, ?PACKET_TYPE_PUSH_PARAM:8, Params/binary>>}, State = #state{socket = Socket, controller_process = ControllerPid}) -> handle_info({tcp, Socket, <<PacketId:32, ?PACKET_PUSH_PARAM:8, Params/binary>>}, State = #state{socket = Socket, controller_process = ControllerPid}) ->
Message = case is_pid(ControllerPid) andalso is_process_alive(ControllerPid) of Message = case is_pid(ControllerPid) andalso is_process_alive(ControllerPid) of
true -> true ->
Ref = make_ref(), Ref = make_ref(),
@ -264,13 +264,13 @@ handle_info({tcp, Socket, <<PacketId:32, ?PACKET_TYPE_PUSH_PARAM:8, Params/binar
false -> false ->
<<0:8, "处理进程异常"/utf8>> <<0:8, "处理进程异常"/utf8>>
end, end,
Packet = <<PacketId:32, ?PACKET_TYPE_RESPONSE:8, Message/binary>>, Packet = <<PacketId:32, ?PACKET_RESPONSE:8, Message/binary>>,
ok = gen_tcp:send(Socket, Packet), ok = gen_tcp:send(Socket, Packet),
{noreply, State}; {noreply, State};
%% efka推送的采集项消息 %% efka推送的采集项消息
handle_info({tcp, <<PacketId:32, ?PACKET_TYPE_PUSH_METRIC:8, Metrics/binary>>}, State = #state{socket = Socket, controller_process = ControllerPid}) -> handle_info({tcp, <<PacketId:32, ?PACKET_PUSH_METRIC:8, Metrics/binary>>}, State = #state{socket = Socket, controller_process = ControllerPid}) ->
Message = case is_pid(ControllerPid) andalso is_process_alive(ControllerPid) of Message = case is_pid(ControllerPid) andalso is_process_alive(ControllerPid) of
true -> true ->
Ref = make_ref(), Ref = make_ref(),
@ -287,7 +287,7 @@ handle_info({tcp, <<PacketId:32, ?PACKET_TYPE_PUSH_METRIC:8, Metrics/binary>>},
<<0:8, "处理进程异常"/utf8>> <<0:8, "处理进程异常"/utf8>>
end, end,
Packet = <<PacketId:32, ?PACKET_TYPE_RESPONSE:8, Message/binary>>, Packet = <<PacketId:32, ?PACKET_RESPONSE:8, Message/binary>>,
ok = gen_tcp:send(Socket, Packet), ok = gen_tcp:send(Socket, Packet),
{noreply, State}; {noreply, State};

View File

@ -23,7 +23,7 @@
%% %%
%% %%
-define(PACKET_TYPE_REGISTER, 16). -define(PACKET_REGISTER, 16).
%% %%
-define(PACKET_TYPE_METRIC_DATA, 3). -define(PACKET_TYPE_METRIC_DATA, 3).
%% %%
@ -137,7 +137,7 @@ handle_cast(_Request, State = #state{}) ->
{noreply, NewState :: #state{}, timeout() | hibernate} | {noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}). {stop, Reason :: term(), NewState :: #state{}}).
%% %%
handle_info({tcp, Socket, <<PacketId:32, ?PACKET_TYPE_REGISTER:8, MicroServiceId/binary>>}, State = #state{socket = Socket}) -> handle_info({tcp, Socket, <<PacketId:32, ?PACKET_REGISTER:8, MicroServiceId/binary>>}, State = #state{socket = Socket}) ->
ok = gen_tcp:send(Socket, <<PacketId:32, ?PACKET_TYPE_RESPONSE:8, 1:8>>), ok = gen_tcp:send(Socket, <<PacketId:32, ?PACKET_TYPE_RESPONSE:8, 1:8>>),
{noreply, State#state{micro_service_id = MicroServiceId, is_registered = true}}; {noreply, State#state{micro_service_id = MicroServiceId, is_registered = true}};