This commit is contained in:
anlicheng 2025-05-06 16:48:09 +08:00
parent bd9e3b6858
commit 82325ac440

View File

@ -42,6 +42,7 @@
-define(PACKET_TYPE_REQUEST_PARAM, 12). -define(PACKET_TYPE_REQUEST_PARAM, 12).
%% %%
-define(PACKET_TYPE_EVENT, 15). -define(PACKET_TYPE_EVENT, 15).
-define(PACKET_TYPE_AI_EVENT, 16).
%% API %% API
-export([start_link/3]). -export([start_link/3]).
@ -75,8 +76,8 @@ send_metric_data(Fields, Tags) when is_list(Fields), is_map(Tags) ->
{ok, Ref} = gen_server:call(?MODULE, {send_metric_data, self(), Fields, Tags}), {ok, Ref} = gen_server:call(?MODULE, {send_metric_data, self(), Fields, Tags}),
await_reply(Ref, ?EFKA_REQUEST_TIMEOUT). await_reply(Ref, ?EFKA_REQUEST_TIMEOUT).
-spec send_log(Message :: binary() | map()) -> no_return(). -spec send_log(Message :: binary()) -> no_return().
send_log(Message) when is_binary(Message); is_map(Message) -> send_log(Message) when is_binary(Message) ->
gen_server:cast(?MODULE, {send_log, Message}). gen_server:cast(?MODULE, {send_log, Message}).
%% efka_server为了统一r对象为字符串2json_decode %% efka_server为了统一r对象为字符串2json_decode
@ -174,14 +175,6 @@ init([MicroServiceId, Host, Port]) ->
handle_call({controller_process, ControllerPid}, _From, State) -> handle_call({controller_process, ControllerPid}, _From, State) ->
{reply, ok, State#state{controller_process = ControllerPid}}; {reply, ok, State#state{controller_process = ControllerPid}};
handle_call({send_metric_data, ReceiverPid, Fields, Tags}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
Body = jiffy:encode(#{<<"data">> => Fields, <<"tag">> => Tags}, [force_utf8]),
Packet = <<PacketId:32, ?PACKET_TYPE_METRIC_DATA, Body/binary>>,
ok = gen_tcp:send(Socket, Packet),
Ref = make_ref(),
{reply, {ok, Ref}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}};
handle_call({request_metric, ReceiverPid}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> handle_call({request_metric, ReceiverPid}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
Packet = <<PacketId:32, ?PACKET_TYPE_REQUEST_METRIC:8>>, Packet = <<PacketId:32, ?PACKET_TYPE_REQUEST_METRIC:8>>,
ok = gen_tcp:send(Socket, Packet), ok = gen_tcp:send(Socket, Packet),
@ -202,9 +195,15 @@ handle_call({request_param, ReceiverPid}, _From, State = #state{socket = Socket,
{noreply, NewState :: #state{}} | {noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} | {noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}). {stop, Reason :: term(), NewState :: #state{}}).
handle_cast({send_log, Message}, State = #state{socket = Socket}) -> handle_cast({send_metric_data, Fields, Tags}, State = #state{socket = Socket}) ->
Body = jiffy:encode(#{<<"l">> => Message}, [force_utf8]), Body = jiffy:encode(#{<<"data">> => Fields, <<"tag">> => Tags}, [force_utf8]),
Packet = <<0:32, ?PACKET_TYPE_LOG:8, Body/binary>>, Packet = <<0:32, ?PACKET_TYPE_METRIC_DATA, Body/binary>>,
ok = gen_tcp:send(Socket, Packet),
{noreply, State};
handle_cast({send_log, LogMessage}, State = #state{socket = Socket}) ->
Packet = <<0:32, ?PACKET_TYPE_LOG:8, LogMessage/binary>>,
ok = gen_tcp:send(Socket, Packet), ok = gen_tcp:send(Socket, Packet),
{noreply, State}; {noreply, State};
@ -216,6 +215,13 @@ handle_cast({send_event, EventType, Params}, State = #state{socket = Socket}) ->
{noreply, State}; {noreply, State};
handle_cast({send_ai_event, EventType, Params}, State = #state{socket = Socket}) ->
Body = jiffy:encode(#{<<"event_type">> => EventType, <<"params">> => Params}, [force_utf8]),
Packet = <<0:32, ?PACKET_TYPE_AI_EVENT:8, Body/binary>>,
ok = gen_tcp:send(Socket, Packet),
{noreply, State};
handle_cast(_Info, State = #state{}) -> handle_cast(_Info, State = #state{}) ->
{noreply, State}. {noreply, State}.
@ -286,36 +292,6 @@ handle_info({tcp, <<PacketId:32, ?PACKET_TYPE_PUSH_METRIC:8, Metrics/binary>>},
{noreply, State}; {noreply, State};
%%
%% #{
%% <<"c">> => 1,
%% <<"r">> => #{
%% <<"edge_status">> => Status,
%% <<"message">> => maps:get(Status, StatusMap)
%% }
%% }
handle_info({tcp, <<PacketId:32, ?PACKET_TYPE_POLL:8, Command/binary>>}, State = #state{socket = Socket, controller_process = ControllerPid}) ->
Message = case is_pid(ControllerPid) andalso is_process_alive(ControllerPid) of
true ->
Ref = make_ref(),
ControllerPid ! {poll, Ref, Command},
receive
{poll_reply, Ref, {ok, Reply}} when is_binary(Reply) ->
<<1:8, Reply/binary>>;
{poll_reply, Ref, {error, Reason}} when is_binary(Reason) ->
<<0:8, Reason/binary>>
after 5000 ->
<<0:8, "服务执行超时"/utf8>>
end;
false ->
<<0:8, "处理进程异常"/utf8>>
end,
Packet = <<PacketId:32, ?PACKET_TYPE_RESPONSE:8, Message/binary>>,
ok = gen_tcp:send(Socket, Packet),
{noreply, State};
%% %%
handle_info({tcp, Socket, Packet}, State = #state{socket = Socket}) -> handle_info({tcp, Socket, Packet}, State = #state{socket = Socket}) ->
lager:debug("[efka_client] get unknown packet: ~p", [Packet]), lager:debug("[efka_client] get unknown packet: ~p", [Packet]),