%%%------------------------------------------------------------------- %%% @author aresei %%% @copyright (C) 2023, %%% @doc %%% %%% @end %%% Created : 06. 7月 2023 12:02 %%%------------------------------------------------------------------- -module(endpoint_kafka). -include("endpoint.hrl"). -behaviour(gen_server). %% API -export([start_link/3]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). %% 消息重发间隔 -define(RETRY_INTERVAL, 5000). -define(DISCONNECTED, disconnected). -define(CONNECTED, connected). -record(state, { endpoint :: #endpoint{}, buffer :: endpoint_buffer:buffer(), client_id :: atom(), client_pid :: undefined | pid(), status = ?DISCONNECTED }). %%%=================================================================== %%% API %%%=================================================================== %% @doc Creates a gen_statem process which calls Module:init/1 to %% initialize. To ensure a synchronized start-up procedure, this %% function does not return until Module:init/1 has returned. start_link(LocalName, AliasName, Endpoint = #endpoint{}) when is_atom(LocalName), is_atom(AliasName) -> gen_server:start_link({local, LocalName}, ?MODULE, [AliasName, Endpoint], []). %%%=================================================================== %%% gen_statem callbacks %%%=================================================================== %% @private %% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or %% gen_statem:start_link/[3,4], this function is called by the new %% process to initialize. init([AliasName, Endpoint = #endpoint{id = Id}]) -> iot_name_server:register(AliasName, self()), erlang:process_flag(trap_exit, true), %% 创建转发器, 避免阻塞当前进程的创建,因此采用了延时初始化的机制 erlang:start_timer(0, self(), connect), %% 初始化存储 Buffer = endpoint_buffer:new(Endpoint, 10), ClientId = list_to_atom("brod_client:" ++ integer_to_list(Id)), {ok, #state{endpoint = Endpoint, buffer = Buffer, status = ?DISCONNECTED, client_id = ClientId}}. %% @private %% @doc Handling call messages -spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, State :: #state{}) -> {reply, Reply :: term(), NewState :: #state{}} | {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | {stop, Reason :: term(), NewState :: #state{}}). handle_call(get_stat, _From, State = #state{buffer = Buffer}) -> Stat = endpoint_buffer:stat(Buffer), {reply, {ok, Stat}, State}. %% @private %% @doc Handling cast messages -spec(handle_cast(Request :: term(), State :: #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). handle_cast({forward, ServiceId, Metric}, State = #state{buffer = Buffer}) -> NBuffer = endpoint_buffer:append({ServiceId, Metric}, Buffer), {noreply, State#state{buffer = NBuffer}}. %% @private %% @doc Handling all non call/cast messages -spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). handle_info({timeout, _, connect}, State = #state{buffer = Buffer, status = ?DISCONNECTED, client_id = ClientId, endpoint = #endpoint{title = Title, config = #kafka_endpoint{sasl_config = SaslConfig, bootstrap_servers = BootstrapServers, topic = Topic}}}) -> lager:debug("[endpoint_kafka] endpoint: ~p, create postman", [Title]), BaseConfig = [ {reconnect_cool_down_seconds, 5}, {socket_options, [{keepalive, true}]} ], ClientConfig = case SaslConfig of {Mechanism, Username, Password} -> [{sasl, {Mechanism, Username, Password}}|BaseConfig]; undefined -> BaseConfig end, case brod:start_link_client(BootstrapServers, ClientId, ClientConfig) of {ok, ClientPid} -> ok = brod:start_producer(ClientId, Topic, _ProducerConfig = []), NBuffer = endpoint_buffer:trigger_next(Buffer), {noreply, State#state{buffer = NBuffer, client_pid = ClientPid, status = ?CONNECTED}}; {error, Reason} -> lager:debug("[endpoint_kafka] start_client: ~p, get error: ~p", [ClientId, Reason]), retry_connect(), {noreply, State#state{status = ?DISCONNECTED, client_pid = undefined}} end; %% 离线时,忽略数据发送逻辑 handle_info({next_data, _Id, _Tuple}, State = #state{status = ?DISCONNECTED}) -> {noreply, State}; %% 发送数据到mqtt服务器 handle_info({next_data, Id, {_ServiceId, Metric}}, State = #state{status = ?CONNECTED, client_pid = ClientPid, endpoint = #endpoint{config = #kafka_endpoint{topic = Topic}}}) -> ReceiverPid = self(), AckCb = fun(Partition, BaseOffset) -> lager:debug("[endpoint_kafka] ack partion: ~p, offset: ~p", [Partition, BaseOffset]), ReceiverPid ! {ack, Id} end, _ = brod:produce_cb(ClientPid, Topic, random, <<>>, Metric, AckCb), {noreply, State}; handle_info({ack, Id}, State = #state{buffer = Buffer}) -> NBuffer = endpoint_buffer:ack(Id, Buffer), {noreply, State#state{buffer = NBuffer}}; %% postman进程挂掉时,重新建立新的 handle_info({'EXIT', ClientPid, Reason}, State = #state{client_pid = ClientPid, endpoint = #endpoint{title = Title}}) -> lager:warning("[endpoint_kafka] endpoint: ~p, conn pid exit with reason: ~p", [Title, Reason]), retry_connect(), {noreply, State#state{client_pid = undefined, status = ?DISCONNECTED}}; handle_info(Info, State = #state{status = Status}) -> lager:warning("[endpoint_kafka] unknown message: ~p, status: ~p", [Info, Status]), {noreply, State}. %% @private %% @doc This function is called by a gen_server when it is about to %% terminate. It should be the opposite of Module:init/1 and do any %% necessary cleaning up. When it returns, the gen_server terminates %% with Reason. The return value is ignored. -spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), State :: #state{}) -> term()). terminate(Reason, #state{endpoint = #endpoint{title = Title}, buffer = Buffer}) -> lager:debug("[endpoint_kafka] endpoint: ~p, terminate with reason: ~p", [Title, Reason]), endpoint_buffer:cleanup(Buffer), ok. %% @private %% @doc Convert process state when code is changed -spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, Extra :: term()) -> {ok, NewState :: #state{}} | {error, Reason :: term()}). code_change(_OldVsn, State = #state{}, _Extra) -> {ok, State}. %%%=================================================================== %%% Internal functions %%%=================================================================== retry_connect() -> erlang:start_timer(?RETRY_INTERVAL, self(), connect). check_produce_result(ok) -> true; check_produce_result({ok, _}) -> true; check_produce_result({ok, _}) -> false.