iot_cloud/apps/iot/src/endpoint/endpoint_kafka.erl
2025-11-12 15:49:03 +08:00

183 lines
7.4 KiB
Erlang
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

%%%-------------------------------------------------------------------
%%% @author aresei
%%% @copyright (C) 2023, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 06. 7月 2023 12:02
%%%-------------------------------------------------------------------
-module(endpoint_kafka).
-include("endpoint.hrl").
-behaviour(gen_server).
%% API
-export([start_link/2]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
%% 消息重发间隔
-define(RETRY_INTERVAL, 5000).
-define(DISCONNECTED, disconnected).
-define(CONNECTED, connected).
-record(state, {
endpoint :: #endpoint{},
buffer :: endpoint_buffer:buffer(),
client_id :: atom(),
client_pid :: undefined | pid(),
status = ?DISCONNECTED
}).
%%%===================================================================
%%% API
%%%===================================================================
%% @doc Creates a gen_statem process which calls Module:init/1 to
%% initialize. To ensure a synchronized start-up procedure, this
%% function does not return until Module:init/1 has returned.
start_link(LocalName, Endpoint = #endpoint{}) when is_atom(LocalName) ->
gen_server:start_link({local, LocalName}, ?MODULE, [Endpoint], []).
%%%===================================================================
%%% gen_statem callbacks
%%%===================================================================
%% @private
%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or
%% gen_statem:start_link/[3,4], this function is called by the new
%% process to initialize.
init([Endpoint = #endpoint{id = Id, matcher = Matcher}]) ->
endpoint_subscription:subscribe(Matcher, self()),
erlang:process_flag(trap_exit, true),
%% 创建转发器, 避免阻塞当前进程的创建,因此采用了延时初始化的机制
erlang:start_timer(0, self(), connect),
%% 初始化存储
Buffer = endpoint_buffer:new(Endpoint, 10),
ClientId = list_to_atom("brod_client:" ++ integer_to_list(Id)),
{ok, #state{endpoint = Endpoint, buffer = Buffer, status = ?DISCONNECTED, client_id = ClientId}}.
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call(get_stat, _From, State = #state{buffer = Buffer}) ->
Stat = endpoint_buffer:stat(Buffer),
{reply, {ok, Stat}, State}.
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast({forward, ServiceId, Metric}, State = #state{buffer = Buffer}) ->
NBuffer = endpoint_buffer:append({ServiceId, Metric}, Buffer),
{noreply, State#state{buffer = NBuffer}}.
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info({timeout, _, connect}, State = #state{buffer = Buffer, status = ?DISCONNECTED, client_id = ClientId,
endpoint = #endpoint{title = Title, config = #kafka_endpoint{sasl_config = SaslConfig, bootstrap_servers = BootstrapServers, topic = Topic}}}) ->
lager:debug("[endpoint_kafka] endpoint: ~p, create postman", [Title]),
BaseConfig = [
{reconnect_cool_down_seconds, 5},
{socket_options, [{keepalive, true}]}
],
ClientConfig = case SaslConfig of
{Mechanism, Username, Password} ->
[{sasl, {Mechanism, Username, Password}}|BaseConfig];
undefined ->
BaseConfig
end,
case catch brod:start_link_client(BootstrapServers, ClientId, ClientConfig) of
{ok, ClientPid} ->
case brod:start_producer(ClientId, Topic, _ProducerConfig = []) of
ok ->
NBuffer = endpoint_buffer:trigger_next(Buffer),
{noreply, State#state{buffer = NBuffer, client_pid = ClientPid, status = ?CONNECTED}};
{error, Reason} ->
lager:debug("[endpoint_kafka] start_producer: ~p, get error: ~p", [ClientId, Reason]),
retry_connect(),
{noreply, State#state{status = ?DISCONNECTED, client_pid = undefined}}
end;
Error ->
lager:debug("[endpoint_kafka] start_client: ~p, get error: ~p", [ClientId, Error]),
retry_connect(),
{noreply, State#state{status = ?DISCONNECTED, client_pid = undefined}}
end;
%% 离线时,忽略数据发送逻辑
handle_info({next_data, _Id, _Tuple}, State = #state{status = ?DISCONNECTED}) ->
{noreply, State};
%% 发送数据到mqtt服务器
handle_info({next_data, Id, {_ServiceId, Metric}}, State = #state{status = ?CONNECTED, client_pid = ClientPid,
endpoint = #endpoint{config = #kafka_endpoint{topic = Topic}}}) ->
ReceiverPid = self(),
AckCb = fun(Partition, BaseOffset) ->
lager:debug("[endpoint_kafka] ack partion: ~p, offset: ~p", [Partition, BaseOffset]),
ReceiverPid ! {ack, Id}
end,
_ = brod:produce_cb(ClientPid, Topic, random, <<>>, Metric, AckCb),
{noreply, State};
handle_info({ack, Id}, State = #state{buffer = Buffer}) ->
NBuffer = endpoint_buffer:ack(Id, Buffer),
{noreply, State#state{buffer = NBuffer}};
%% postman进程挂掉时重新建立新的
handle_info({'EXIT', ClientPid, Reason}, State = #state{client_pid = ClientPid, endpoint = #endpoint{title = Title}}) ->
lager:warning("[endpoint_kafka] endpoint: ~p, conn pid exit with reason: ~p", [Title, Reason]),
retry_connect(),
{noreply, State#state{client_pid = undefined, status = ?DISCONNECTED}};
handle_info(Info, State = #state{status = Status}) ->
lager:warning("[endpoint_kafka] unknown message: ~p, status: ~p", [Info, Status]),
{noreply, State}.
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(Reason, #state{endpoint = #endpoint{title = Title}, buffer = Buffer}) ->
lager:debug("[endpoint_kafka] endpoint: ~p, terminate with reason: ~p", [Title, Reason]),
endpoint_buffer:cleanup(Buffer),
ok.
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
retry_connect() ->
erlang:start_timer(?RETRY_INTERVAL, self(), connect).