fix efka client

This commit is contained in:
anlicheng 2025-05-09 10:53:13 +08:00
parent ad5f06be0e
commit 7f0cb1d027
2 changed files with 95 additions and 65 deletions

View File

@ -16,21 +16,11 @@
%%
%%
-define(PACKET_REGISTER, 16#00).
-define(PACKET_REQUEST, 16#01).
%%
-define(PACKET_RESPONSE, 16#01).
-define(PACKET_RESPONSE, 16#02).
%%
-define(PACKET_METRIC_DATA, 16#02).
%%
-define(PACKET_EVENT, 16#03).
%% efka获取自身的采集项
-define(PACKET_REQUEST_CONFIG, 16#04).
%% efka下发给微服务配置
-define(PACKET_PUSH_CONFIG, 16#10).
-define(PACKET_INVOKE, 16#11).
-define(PACKET_PUSH, 16#02).
%% API
-export([start_link/3]).
@ -73,8 +63,7 @@ request_config() ->
{ok, Config};
{response, Ref, {error, Reason}} ->
{error, Reason}
after
?EFKA_REQUEST_TIMEOUT ->
after 5000 ->
{error, timeout}
end.
@ -114,16 +103,20 @@ init([ServiceId, Host, Port]) ->
ok = gen_tcp:controlling_process(Socket, self()),
PacketId = 1,
Packet = <<?PACKET_REGISTER:8, PacketId:32, ServiceId/binary>>,
ok = gen_tcp:send(Socket, Packet),
Register = #{<<"id">> => PacketId, <<"method">> => <<"register">>, <<"params">> => #{<<"service_id">> => ServiceId}},
Packet = jiffy:encode(Register, [force_utf8]),
ok = gen_tcp:send(Socket, <<?PACKET_REQUEST, Packet/binary>>),
lager:debug("[efka_client] will send packet: ~p", [Packet]),
receive
{tcp, Socket, <<?PACKET_RESPONSE, PacketId:32, 1:8>>} ->
{tcp, Socket, <<?PACKET_RESPONSE, Data/binary>>} ->
case catch jiffy:decode(Data, [return_maps]) of
#{<<"id">> := PacketId, <<"result">> := <<"ok">>} ->
{ok, #state{packet_id = PacketId + 1, host = Host, port = Port, socket = Socket}};
{tcp, Socket, <<?PACKET_RESPONSE, PacketId:32, 0:8, Error/binary>>} ->
{stop, Error}
after
?EFKA_REQUEST_TIMEOUT ->
#{<<"id">> := PacketId, <<"error">> := #{<<"code">> := Code, <<"message">> := Error}} ->
{stop, {error, {Code, Error}}}
end
after 5000 ->
{stop, register_timeout}
end.
@ -143,8 +136,9 @@ handle_call({controller_process, ControllerPid}, _From, State) ->
%% done
handle_call({request_config, ReceiverPid}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
Packet = <<?PACKET_REQUEST_CONFIG:8, PacketId:32>>,
ok = gen_tcp:send(Socket, Packet),
RequestConfig = #{<<"id">> => PacketId, <<"method">> => <<"request_config">>},
Packet = jiffy:encode(RequestConfig, [force_utf8]),
ok = gen_tcp:send(Socket, <<?PACKET_REQUEST, Packet/binary>>),
Ref = make_ref(),
{reply, {ok, Ref}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}}.
@ -160,17 +154,33 @@ handle_cast({send_metric_data, DeviceUUID, Measurement, Tags, Fields}, State = #
%% Line Protocol实现数据的传输
Point = efka_point:new(Measurement, Tags, Fields, efka_util:timestamp()),
Body = efka_point:normalized(Point),
Len = byte_size(DeviceUUID),
Packet = <<?PACKET_METRIC_DATA, Len:8, DeviceUUID/binary, Body/binary>>,
ok = gen_tcp:send(Socket, Packet),
MetricData = #{
<<"id">> => 0,
<<"method">> => <<"metric_data">>,
<<"params">> => #{
<<"device_uuid">> => DeviceUUID,
<<"metric">> => Body
}
},
Packet = jiffy:encode(MetricData, [force_utf8]),
ok = gen_tcp:send(Socket, <<?PACKET_REQUEST, Packet/binary>>),
{noreply, State};
%% done
handle_cast({send_event, EventType, Params}, State = #state{socket = Socket}) ->
Packet = <<?PACKET_EVENT:8, EventType:16, Params/binary>>,
ok = gen_tcp:send(Socket, Packet),
handle_cast({send_event, EventType, Body}, State = #state{socket = Socket}) ->
Event = #{
<<"id">> => 0,
<<"method">> => <<"event">>,
<<"params">> => #{
<<"event_type">> => EventType,
<<"body">> => Body
}
},
Packet = jiffy:encode(Event, [force_utf8]),
ok = gen_tcp:send(Socket, <<?PACKET_REQUEST, Packet/binary>>),
{noreply, State};
@ -185,42 +195,62 @@ handle_cast(_Info, State = #state{}) ->
{stop, Reason :: term(), NewState :: #state{}}).
%%
handle_info({tcp, Socket, <<?PACKET_RESPONSE:8, PacketId:32, Message/binary>>}, State = #state{socket = Socket, inflight = Inflight}) ->
case maps:take(PacketId, Inflight) of
handle_info({tcp, Socket, <<?PACKET_RESPONSE, Packet/binary>>}, State = #state{socket = Socket, inflight = Inflight}) ->
case jiffy:decode(Packet, [return_maps]) of
#{<<"id">> := Id, <<"result">> := Result} ->
case maps:take(Id, Inflight) of
error ->
{noreply, State};
{{Ref, ReceiverPid}, NInflight} ->
case Message of
<<1:8, Result/binary>> ->
ReceiverPid ! {response, Ref, {ok, Result}};
<<0:8, Error/binary>> ->
ReceiverPid ! {response, Ref, {error, Error}}
end,
ReceiverPid ! {response, Ref, {ok, Result}},
{noreply, State#state{inflight = NInflight}}
end;
#{<<"id">> := Id, <<"error">> := #{<<"code">> := Code, <<"message">> := Message}} ->
case maps:take(Id, Inflight) of
error ->
{noreply, State};
{{Ref, ReceiverPid}, NInflight} ->
ReceiverPid ! {response, Ref, {error, {Code, Message}}},
{noreply, State#state{inflight = NInflight}}
end
end;
%% efka推送的参数设置
handle_info({tcp, Socket, <<?PACKET_PUSH_CONFIG:8, PacketId:32, ConfigJson/binary>>}, State = #state{socket = Socket, controller_process = ControllerPid}) ->
Message = case is_pid(ControllerPid) andalso is_process_alive(ControllerPid) of
true ->
handle_info({tcp, Socket, <<?PACKET_PUSH, Packet/binary>>}, State = #state{socket = Socket, controller_process = ControllerPid}) ->
case jiffy:decode(Packet, [return_maps]) of
#{<<"id">> := Id, <<"method">> := <<"push_config">>, <<"params">> := ConfigJson} ->
Ref = make_ref(),
ControllerPid ! {push_config, Ref, ConfigJson},
Reply =
receive
{push_config_reply, Ref, ok} ->
<<1:8>>;
#{<<"id">> => Id, <<"result">> => <<"ok">>};
{push_config_reply, Ref, {error, Reason}} when is_binary(Reason) ->
<<0:8, Reason/binary>>
#{<<"id">> => Id, <<"error">> => #{<<"code">> => -1, <<"message">> => Reason}}
after 5000 ->
<<0:8, "服务执行超时"/utf8>>
end;
false ->
<<0:8, "处理进程异常"/utf8>>
#{<<"id">> => Id, <<"error">> => #{<<"code">> => -2, <<"message">> => <<"timeout">>}}
end,
Packet = <<?PACKET_RESPONSE:8, PacketId:32, Message/binary>>,
Packet = jiffy:encode(Reply, [force_utf8]),
ok = gen_tcp:send(Socket, Packet),
{noreply, State};
#{<<"id">> := Id, <<"method">> := <<"invoke">>, <<"params">> := #{<<"payload">> := Payload, <<"timeout">> := Timeout}} ->
Ref = make_ref(),
ControllerPid ! {invoke, Ref, Payload},
Reply =
receive
{invoke_reply, Ref, {ok, Result}} ->
#{<<"id">> => Id, <<"result">> => Result};
{invoke_reply, Ref, {error, Reason}} when is_binary(Reason) ->
#{<<"id">> => Id, <<"error">> => #{<<"code">> => -1, <<"message">> => Reason}}
after Timeout * 1000 ->
#{<<"id">> => Id, <<"error">> => #{<<"code">> => -2, <<"message">> => <<"invoke timeout">>}}
end,
Packet = jiffy:encode(Reply, [force_utf8]),
ok = gen_tcp:send(Socket, Packet),
{noreply, State}
end;
%%
handle_info({tcp, Socket, Packet}, State = #state{socket = Socket}) ->
lager:debug("[efka_client] get unknown packet: ~p", [Packet]),

View File

@ -145,7 +145,7 @@ handle_info({tcp, Socket, <<?PACKET_REQUEST_CONFIG:8, PacketId:32>>}, State = #s
{noreply, State};
%%
handle_info({tcp, Socket, <<?PACKET_METRIC_DATA:8, Len:8, DeviceUUID:Len/binary, Data/binary>>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) ->
handle_info({tcp, Socket, <<?PACKET_METRIC_DATA:8, Len:16, DeviceUUID:Len/binary, Data/binary>>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) ->
efka_service:metric_data(ServicePid, DeviceUUID, Data),
{noreply, State};