%%%------------------------------------------------------------------- %%% @copyright (C) 2023, %%% @doc %%% %%% @end %%% Created : 28. 8月 2023 15:39 %%%------------------------------------------------------------------- -module(efka_client). -author("aresei"). -behaviour(gen_server). %% 请求超时时间 -define(EFKA_REQUEST_TIMEOUT, 5000). %% 消息类型 %% 服务注册 -define(PACKET_REQUEST, 16#01). %% 消息响应 -define(PACKET_RESPONSE, 16#02). %% 上传数据 -define(PACKET_PUSH, 16#03). -define(PACKET_PUB, 16#04). %% API -export([start_link/3]). -export([device_offline/1, device_online/1]). -export([send_metric_data/4, request_config/0, send_event/2, controller_process/1, subscribe/1]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -record(state, { packet_id = 1 :: integer(), host :: string(), port :: integer(), %% 请求后未完成的请求 inflight = #{} :: map(), socket :: gen_tcp:socket(), controller_process :: pid() | undefined }). -spec controller_process(ControllerPid :: pid()) -> ok. controller_process(ControllerPid) when is_pid(ControllerPid) -> gen_server:call(?MODULE, {controller_process, ControllerPid}). -spec send_metric_data(DeviceUUID :: binary(), Measurement :: binary(), Tags :: map(), Fields :: map()) -> no_return(). send_metric_data(DeviceUUID, Measurement, Tags, Fields) when is_binary(DeviceUUID), is_binary(Measurement), is_map(Fields), is_map(Tags) -> gen_server:cast(?MODULE, {send_metric_data, DeviceUUID, Measurement, Tags, Fields}). -spec subscribe(Topic :: binary()) -> no_return(). subscribe(Topic) when is_binary(Topic) -> gen_server:cast(?MODULE, {subscribe, Topic}). %% efka_server为了统一,r对象为字符串;需要2次json_decode -spec request_config() -> {ok, Result :: list() | map()} | {error, Reason :: any()}. request_config() -> {ok, Ref} = gen_server:call(?MODULE, {request_config, self()}), receive {response, Ref, {ok, Reply}} -> Config = jiffy:decode(Reply, [return_maps]), {ok, Config}; {response, Ref, {error, Reason}} -> {error, Reason} after 5000 -> {error, timeout} end. -spec device_offline(DeviceUUID :: binary()) -> no_return(). device_offline(DeviceUUID) when is_binary(DeviceUUID) -> EventBody = jiffy:encode(#{<<"device_uuid">> => DeviceUUID, <<"status">> => 0}, [force_utf8]), send_event(1, EventBody). -spec device_online(DeviceUUID :: binary()) -> no_return(). device_online(DeviceUUID) when is_binary(DeviceUUID) -> EventBody = jiffy:encode(#{<<"device_uuid">> => DeviceUUID, <<"status">> => 1}, [force_utf8]), send_event(1, EventBody). -spec send_event(EventType :: integer(), Params :: binary()) -> no_return(). send_event(EventType, Params) when is_integer(EventType), is_binary(Params) -> gen_server:cast(?MODULE, {send_event, EventType, Params}). %%%=================================================================== %%% API %%%=================================================================== %% @doc Spawns the server and registers the local name (unique) -spec(start_link(ServiceId :: binary(), Host :: string(), Port :: integer()) -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). start_link(ServiceId, Host, Port) when is_binary(ServiceId), is_list(Host), is_integer(Port) -> gen_server:start_link({local, ?MODULE}, ?MODULE, [ServiceId, Host, Port], []). %%%=================================================================== %%% gen_server callbacks %%%=================================================================== %% @private %% @doc Initializes the server -spec(init(Args :: term()) -> {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). init([ServiceId, Host, Port]) -> {ok, Socket} = gen_tcp:connect(Host, Port, [binary, {packet, 4}, {active, true}]), ok = gen_tcp:controlling_process(Socket, self()), PacketId = 1, Packet = jiffy:encode(#{ <<"id">> => PacketId, <<"method">> => <<"register">>, <<"params">> => #{<<"service_id">> => ServiceId} }, [force_utf8]), ok = gen_tcp:send(Socket, <>), lager:debug("[efka_client] will send packet: ~p", [Packet]), receive {tcp, Socket, <>} -> case catch jiffy:decode(Data, [return_maps]) of #{<<"id">> := PacketId, <<"result">> := <<"ok">>} -> {ok, #state{packet_id = PacketId + 1, host = Host, port = Port, socket = Socket}}; #{<<"id">> := PacketId, <<"error">> := #{<<"code">> := Code, <<"message">> := Error}} -> {stop, {error, {Code, Error}}} end after 5000 -> {stop, register_timeout} end. %% @private %% @doc Handling call messages -spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, State :: #state{}) -> {reply, Reply :: term(), NewState :: #state{}} | {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | {stop, Reason :: term(), NewState :: #state{}}). %% 设置主动推送消息的接受进程 handle_call({controller_process, ControllerPid}, _From, State) -> {reply, ok, State#state{controller_process = ControllerPid}}; %% done handle_call({request_config, ReceiverPid}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> Packet = jiffy:encode(#{<<"id">> => PacketId, <<"method">> => <<"request_config">>}, [force_utf8]), ok = gen_tcp:send(Socket, <>), erlang:start_timer(?EFKA_REQUEST_TIMEOUT, self(), {request_timeout, PacketId}), Ref = make_ref(), {reply, {ok, Ref}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}}. %% @private %% @doc Handling cast messages -spec(handle_cast(Request :: term(), State :: #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). %% done handle_cast({send_metric_data, DeviceUUID, Measurement, Tags, Fields}, State = #state{socket = Socket}) -> %% 基于Line Protocol实现数据的传输 Point = efka_point:new(Measurement, Tags, Fields, efka_util:timestamp()), Body = efka_point:normalized(Point), Packet = jiffy:encode(#{ <<"id">> => 0, <<"method">> => <<"metric_data">>, <<"params">> => #{ <<"device_uuid">> => DeviceUUID, <<"metric">> => Body } }, [force_utf8]), ok = gen_tcp:send(Socket, <>), {noreply, State}; %% done handle_cast({send_event, EventType, Body}, State = #state{socket = Socket}) -> Packet = jiffy:encode(#{ <<"id">> => 0, <<"method">> => <<"event">>, <<"params">> => #{ <<"event_type">> => EventType, <<"body">> => Body } }, [force_utf8]), ok = gen_tcp:send(Socket, <>), {noreply, State}; handle_cast({subscribe, Topic}, State = #state{socket = Socket}) -> Packet = jiffy:encode(#{ <<"id">> => 0, <<"method">> => <<"subscribe">>, <<"params">> => #{ <<"topic">> => Topic } }, [force_utf8]), ok = gen_tcp:send(Socket, <>), {noreply, State}; handle_cast(_Info, State = #state{}) -> {noreply, State}. %% @private %% @doc Handling all non call/cast messages -spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). %% 收到请求的响应, client主动向efka发送的异步请求的响应 handle_info({tcp, Socket, <>}, State = #state{socket = Socket, inflight = Inflight}) -> case jiffy:decode(Packet, [return_maps]) of #{<<"id">> := Id, <<"result">> := Result} -> case maps:take(Id, Inflight) of error -> {noreply, State}; {{Ref, ReceiverPid}, NInflight} -> ReceiverPid ! {response, Ref, {ok, Result}}, {noreply, State#state{inflight = NInflight}} end; #{<<"id">> := Id, <<"error">> := #{<<"code">> := Code, <<"message">> := Message}} -> case maps:take(Id, Inflight) of error -> {noreply, State}; {{Ref, ReceiverPid}, NInflight} -> ReceiverPid ! {response, Ref, {error, {Code, Message}}}, {noreply, State#state{inflight = NInflight}} end end; %% 请求超时 handle_info({timeout, _, {request_timeout, PacketId}}, State = #state{inflight = Inflight}) -> case maps:take(PacketId, Inflight) of error -> {noreply, State}; {{Ref, ReceiverPid}, NInflight} -> ReceiverPid ! {response, Ref, {error, {-1, <<"request timeout">>}}}, {noreply, State#state{inflight = NInflight}} end; %% 收到efka推送的参数设置, 必须处理该消息;不设置超时 handle_info({tcp, Socket, <>}, State = #state{socket = Socket, controller_process = ControllerPid}) -> case jiffy:decode(Packet, [return_maps]) of #{<<"id">> := Id, <<"method">> := <<"push_config">>, <<"params">> := #{<<"config">> := ConfigJson}} -> Ref = make_ref(), ControllerPid ! {push_config, self(), Ref, ConfigJson}, Reply = receive {push_config_reply, Ref, ok} -> #{<<"id">> => Id, <<"result">> => <<"ok">>}; {push_config_reply, Ref, {error, Reason}} when is_binary(Reason) -> #{<<"id">> => Id, <<"error">> => #{<<"code">> => -1, <<"message">> => Reason}} end, JsonReply = jiffy:encode(Reply, [force_utf8]), ok = gen_tcp:send(Socket, <>), {noreply, State}; #{<<"id">> := Id, <<"method">> := <<"invoke">>, <<"params">> := #{<<"payload">> := Payload}} -> Ref = make_ref(), ControllerPid ! {invoke, self(), Ref, Payload}, Reply = receive {invoke_reply, Ref, {ok, Result}} -> #{<<"id">> => Id, <<"result">> => Result}; {invoke_reply, Ref, {error, Reason}} when is_binary(Reason) -> #{<<"id">> => Id, <<"error">> => #{<<"code">> => -1, <<"message">> => Reason}} end, JsonReply = jiffy:encode(Reply, [force_utf8]), ok = gen_tcp:send(Socket, <>), {noreply, State} end; %% pub/sub的消息 handle_info({tcp, Socket, <>}, State = #state{socket = Socket, controller_process = ControllerPid}) -> #{<<"topic">> := Topic, <<"content">> := Content} = jiffy:decode(Packet, [return_maps]), ControllerPid ! {pub, Topic, Content}, {noreply, State}; %% 其他消息为非法消息 handle_info({tcp, Socket, Packet}, State = #state{socket = Socket}) -> lager:debug("[efka_client] get unknown packet: ~p", [Packet]), {noreply, State}; handle_info({tcp_closed, Socket}, State = #state{socket = Socket}) -> {stop, tcp_closed, State}. %% @private %% @doc This function is called by a gen_server when it is about to %% terminate. It should be the opposite of Module:init/1 and do any %% necessary cleaning up. When it returns, the gen_server terminates %% with Reason. The return value is ignored. -spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), State :: #state{}) -> term()). terminate(_Reason, _State = #state{socket = Socket}) -> gen_tcp:close(Socket), ok. %% @private %% @doc Convert process state when code is changed -spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, Extra :: term()) -> {ok, NewState :: #state{}} | {error, Reason :: term()}). code_change(_OldVsn, State = #state{}, _Extra) -> {ok, State}. %%%=================================================================== %%% Internal functions %%%=================================================================== %% 采用32位编码 -spec next_packet_id(PacketId :: integer()) -> NextPacketId :: integer(). next_packet_id(PacketId) when PacketId >= 4294967295 -> 1; next_packet_id(PacketId) -> PacketId + 1. %%%=================================================================== %%% simple callbacks %%%===================================================================