ekfa/apps/efka/src/client/efka_client.erl
2025-05-09 00:06:47 +08:00

264 lines
10 KiB
Erlang
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

%%%-------------------------------------------------------------------
%%% @copyright (C) 2023, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 28. 8月 2023 15:39
%%%-------------------------------------------------------------------
-module(efka_client).
-author("aresei").
-behaviour(gen_server).
%% 请求超时时间
-define(EFKA_REQUEST_TIMEOUT, 5000).
%% 消息类型
%% 服务注册
-define(PACKET_REGISTER, 16#00).
%% 消息响应
-define(PACKET_RESPONSE, 16#01).
%% 上传数据
-define(PACKET_METRIC_DATA, 16#02).
%% 微服务事件上报
-define(PACKET_EVENT, 16#03).
%% 微服务从efka获取自身的采集项
-define(PACKET_REQUEST_CONFIG, 16#04).
%% efka下发给微服务配置
-define(PACKET_PUSH_CONFIG, 16#10).
-define(PACKET_INVOKE, 16#11).
%% API
-export([start_link/3]).
-export([device_offline/1, device_online/1]).
-export([send_metric_data/4, request_config/0, send_event/2, controller_process/1]).
-export([test/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-record(state, {
packet_id = 1 :: integer(),
host :: string(),
port :: integer(),
%% 请求后未完成的请求
inflight = #{} :: map(),
socket :: gen_tcp:socket(),
controller_process :: pid() | undefined
}).
test() ->
start_link(<<"test">>, "localhost", 18080).
-spec controller_process(ControllerPid :: pid()) -> ok.
controller_process(ControllerPid) when is_pid(ControllerPid) ->
gen_server:call(?MODULE, {controller_process, ControllerPid}).
-spec send_metric_data(DeviceUUID :: binary(), Measurement :: binary(), Tags :: map(), Fields :: map()) -> no_return().
send_metric_data(DeviceUUID, Measurement, Tags, Fields) when is_binary(DeviceUUID), is_binary(Measurement), is_map(Fields), is_map(Tags) ->
gen_server:cast(?MODULE, {send_metric_data, DeviceUUID, Measurement, Tags, Fields}).
%% efka_server为了统一r对象为字符串需要2次json_decode
-spec request_config() -> {ok, Result :: list() | map()} | {error, Reason :: any()}.
request_config() ->
{ok, Ref} = gen_server:call(?MODULE, {request_config, self()}),
receive
{response, Ref, {ok, Reply}} ->
Config = jiffy:decode(Reply, [return_maps]),
{ok, Config};
{response, Ref, {error, Reason}} ->
{error, Reason}
after
?EFKA_REQUEST_TIMEOUT ->
{error, timeout}
end.
-spec device_offline(DeviceUUID :: binary()) -> no_return().
device_offline(DeviceUUID) when is_binary(DeviceUUID) ->
send_event(1, #{<<"device_uuid">> => DeviceUUID, <<"status">> => 0}).
-spec device_online(DeviceUUID :: binary()) -> no_return().
device_online(DeviceUUID) when is_binary(DeviceUUID) ->
send_event(1, #{<<"device_uuid">> => DeviceUUID, <<"status">> => 1}).
-spec send_event(EventType :: integer(), Params :: binary()) -> no_return().
send_event(EventType, Params) when is_integer(EventType), is_binary(Params) ->
gen_server:cast(?MODULE, {send_event, EventType, Params}).
%%%===================================================================
%%% API
%%%===================================================================
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link(ServiceId :: binary(), Host :: string(), Port :: integer()) ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link(ServiceId, Host, Port) when is_binary(ServiceId), is_list(Host), is_integer(Port) ->
gen_server:start_link(?MODULE, [ServiceId, Host, Port], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% @private
%% @doc Initializes the server
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([ServiceId, Host, Port]) ->
{ok, Socket} = gen_tcp:connect(Host, Port, [binary, {packet, 4}, {active, true}]),
ok = gen_tcp:controlling_process(Socket, self()),
PacketId = 1,
Packet = <<?PACKET_REGISTER:8, PacketId:32, ServiceId/binary>>,
ok = gen_tcp:send(Socket, Packet),
lager:debug("[efka_client] will send packet: ~p", [Packet]),
receive
{tcp, Socket, <<?PACKET_RESPONSE, PacketId:32, 1:8>>} ->
{ok, #state{packet_id = PacketId + 1, host = Host, port = Port, socket = Socket}};
{tcp, Socket, <<?PACKET_RESPONSE, PacketId:32, 0:8, Error/binary>>} ->
{stop, Error}
after
?EFKA_REQUEST_TIMEOUT ->
{stop, register_timeout}
end.
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
%% 设置主动推送消息的接受进程
handle_call({controller_process, ControllerPid}, _From, State) ->
{reply, ok, State#state{controller_process = ControllerPid}};
%% done
handle_call({request_config, ReceiverPid}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
Packet = <<?PACKET_REQUEST_CONFIG:8, PacketId:32>>,
ok = gen_tcp:send(Socket, Packet),
Ref = make_ref(),
{reply, {ok, Ref}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}}.
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
%% done
handle_cast({send_metric_data, DeviceUUID, Measurement, Tags, Fields}, State = #state{socket = Socket}) ->
%% 基于Line Protocol实现数据的传输
Point = efka_point:new(Measurement, Tags, Fields, efka_util:timestamp()),
Body = efka_point:normalized(Point),
Len = byte_size(DeviceUUID),
Packet = <<?PACKET_METRIC_DATA, Len:8, DeviceUUID/binary, Body/binary>>,
ok = gen_tcp:send(Socket, Packet),
{noreply, State};
%% done
handle_cast({send_event, EventType, Params}, State = #state{socket = Socket}) ->
Packet = <<?PACKET_EVENT:8, EventType:16, Params/binary>>,
ok = gen_tcp:send(Socket, Packet),
{noreply, State};
handle_cast(_Info, State = #state{}) ->
{noreply, State}.
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
%% 收到请求的响应
handle_info({tcp, Socket, <<?PACKET_RESPONSE:8, PacketId:32, Message/binary>>}, State = #state{socket = Socket, inflight = Inflight}) ->
case maps:take(PacketId, Inflight) of
error ->
{noreply, State};
{{Ref, ReceiverPid}, NInflight} ->
case Message of
<<1:8, Result/binary>> ->
ReceiverPid ! {response, Ref, {ok, Result}};
<<0:8, Error/binary>> ->
ReceiverPid ! {response, Ref, {error, Error}}
end,
{noreply, State#state{inflight = NInflight}}
end;
%% 收到efka推送的参数设置
handle_info({tcp, Socket, <<?PACKET_PUSH_CONFIG:8, PacketId:32, ConfigJson/binary>>}, State = #state{socket = Socket, controller_process = ControllerPid}) ->
Message = case is_pid(ControllerPid) andalso is_process_alive(ControllerPid) of
true ->
Ref = make_ref(),
ControllerPid ! {push_config, Ref, ConfigJson},
receive
{push_config_reply, Ref, ok} ->
<<1:8>>;
{push_config_reply, Ref, {error, Reason}} when is_binary(Reason) ->
<<0:8, Reason/binary>>
after 5000 ->
<<0:8, "服务执行超时"/utf8>>
end;
false ->
<<0:8, "处理进程异常"/utf8>>
end,
Packet = <<?PACKET_RESPONSE:8, PacketId:32, Message/binary>>,
ok = gen_tcp:send(Socket, Packet),
{noreply, State};
%% 其他消息为非法消息
handle_info({tcp, Socket, Packet}, State = #state{socket = Socket}) ->
lager:debug("[efka_client] get unknown packet: ~p", [Packet]),
{noreply, State};
handle_info({tcp_closed, Socket}, State = #state{socket = Socket}) ->
{stop, tcp_closed, State}.
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(_Reason, _State = #state{socket = Socket}) ->
gen_tcp:close(Socket),
ok.
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
%% 采用32位编码
-spec next_packet_id(PacketId :: integer()) -> NextPacketId :: integer().
next_packet_id(PacketId) when PacketId >= 4294967295 ->
1;
next_packet_id(PacketId) ->
PacketId + 1.
%%%===================================================================
%%% simple callbacks
%%%===================================================================