From 3db0159573742b1196f6799a1b5e54df2b09272a Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Mon, 18 Aug 2025 17:06:33 +0800 Subject: [PATCH] add kafka support --- apps/iot/src/endpoint/endpoint_kafka.erl | 176 +++++++++++++++++++++++ apps/iot/src/endpoint/endpoint_mqtt.erl | 4 +- 2 files changed, 178 insertions(+), 2 deletions(-) create mode 100644 apps/iot/src/endpoint/endpoint_kafka.erl diff --git a/apps/iot/src/endpoint/endpoint_kafka.erl b/apps/iot/src/endpoint/endpoint_kafka.erl new file mode 100644 index 0000000..c3badcd --- /dev/null +++ b/apps/iot/src/endpoint/endpoint_kafka.erl @@ -0,0 +1,176 @@ + +%%%------------------------------------------------------------------- +%%% @author aresei +%%% @copyright (C) 2023, +%%% @doc +%%% +%%% @end +%%% Created : 06. 7月 2023 12:02 +%%%------------------------------------------------------------------- +-module(endpoint_kafka). + +-include("endpoint.hrl"). +-behaviour(gen_server). + +%% API +-export([start_link/3]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +%% 消息重发间隔 +-define(RETRY_INTERVAL, 5000). + +-define(DISCONNECTED, disconnected). +-define(CONNECTED, connected). + +-record(state, { + endpoint :: #endpoint{}, + buffer :: endpoint_buffer:buffer(), + client_id :: atom(), + client_pid :: undefined | pid(), + status = ?DISCONNECTED +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +%% @doc Creates a gen_statem process which calls Module:init/1 to +%% initialize. To ensure a synchronized start-up procedure, this +%% function does not return until Module:init/1 has returned. +start_link(LocalName, AliasName, Endpoint = #endpoint{}) when is_atom(LocalName), is_atom(AliasName) -> + gen_server:start_link({local, LocalName}, ?MODULE, [AliasName, Endpoint], []). + +%%%=================================================================== +%%% gen_statem callbacks +%%%=================================================================== + +%% @private +%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or +%% gen_statem:start_link/[3,4], this function is called by the new +%% process to initialize. +init([AliasName, Endpoint = #endpoint{id = Id}]) -> + iot_name_server:register(AliasName, self()), + erlang:process_flag(trap_exit, true), + %% 创建转发器, 避免阻塞当前进程的创建,因此采用了延时初始化的机制 + erlang:start_timer(0, self(), connect), + %% 初始化存储 + Buffer = endpoint_buffer:new(Endpoint, 10), + ClientId = list_to_atom("brod_client:" ++ integer_to_list(Id)), + + {ok, #state{endpoint = Endpoint, buffer = Buffer, status = ?DISCONNECTED, client_id = ClientId}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(get_stat, _From, State = #state{buffer = Buffer}) -> + Stat = endpoint_buffer:stat(Buffer), + {reply, {ok, Stat}, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast({forward, ServiceId, Metric}, State = #state{buffer = Buffer}) -> + NBuffer = endpoint_buffer:append({ServiceId, Metric}, Buffer), + {noreply, State#state{buffer = NBuffer}}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info({timeout, _, connect}, State = #state{buffer = Buffer, status = ?DISCONNECTED, client_id = ClientId, + endpoint = #endpoint{title = Title, config = #kafka_endpoint{username = Username, password = Password, bootstrap_servers = BootstrapServers0, topic = Topic}}}) -> + lager:debug("[iot_endpoint] endpoint: ~p, create postman", [Title]), + + BootstrapServers = lists:flatmap(fun(S) -> + case binary:split(S, <<":">>) of + [Host0, Port] -> + [{binary_to_list(Host0), binary_to_integer(Port)}]; + _ -> + [] + end + end, BootstrapServers0), + ClientConfig = [ + {reconnect_cool_down_seconds, 5}, + {socket_options, [{keepalive, true}]} + ], + case brod:start_link_client(BootstrapServers, ClientId, ClientConfig) of + {ok, ClientPid} -> + ok = brod:start_producer(ClientId, Topic, _ProducerConfig = []), + NBuffer = endpoint_buffer:trigger_next(Buffer), + {noreply, State#state{buffer = NBuffer, client_pid = ClientPid, status = ?CONNECTED}}; + {error, Reason} -> + lager:debug("[endpoint_kafka] start_client: ~p, get error: ~p", [ClientId, Reason]), + retry_connect(), + {noreply, State#state{status = ?DISCONNECTED, client_pid = undefined}} + end; + +%% 离线时,忽略数据发送逻辑 +handle_info({next_data, _Id, _Tuple}, State = #state{status = ?DISCONNECTED}) -> + {keep_state, State}; +%% 发送数据到mqtt服务器 +handle_info({next_data, Id, {_ServiceId, Metric}}, State = #state{status = ?CONNECTED, client_pid = ClientPid, + endpoint = #endpoint{config = #kafka_endpoint{topic = Topic}}}) -> + + ReceiverPid = self(), + AckCb = fun(Partition, BaseOffset) -> + lager:debug("[endpoint_kafka] ack partion: ~p, offset: ~p", [Partition, BaseOffset]), + ReceiverPid ! {ack, Id} + end, + ok = brod:produce_cb(ClientPid, Topic, random, <<>>, Metric, AckCb), + + {noreply, State}; + +handle_info({ack, Id}, State = #state{buffer = Buffer}) -> + NBuffer = endpoint_buffer:ack(Id, Buffer), + {noreply, State#state{buffer = NBuffer}}; + +%% postman进程挂掉时,重新建立新的 +handle_info({'EXIT', ClientPid, Reason}, State = #state{client_pid = ClientPid, endpoint = #endpoint{title = Title}}) -> + lager:warning("[endpoint_kafka] endpoint: ~p, conn pid exit with reason: ~p", [Title, Reason]), + retry_connect(), + {noreply, State#state{client_pid = undefined, status = ?DISCONNECTED}}; + +handle_info(Info, State = #state{status = Status}) -> + lager:warning("[endpoint_kafka] unknown message: ~p, status: ~p", [Info, Status]), + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(Reason, #state{endpoint = #endpoint{title = Title}, buffer = Buffer}) -> + lager:debug("[endpoint_kafka] endpoint: ~p, terminate with reason: ~p", [Title, Reason]), + endpoint_buffer:cleanup(Buffer), + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +retry_connect() -> + erlang:start_timer(?RETRY_INTERVAL, self(), connect). \ No newline at end of file diff --git a/apps/iot/src/endpoint/endpoint_mqtt.erl b/apps/iot/src/endpoint/endpoint_mqtt.erl index d501921..2f13a01 100644 --- a/apps/iot/src/endpoint/endpoint_mqtt.erl +++ b/apps/iot/src/endpoint/endpoint_mqtt.erl @@ -82,8 +82,8 @@ handle_call(get_stat, _From, State = #state{buffer = Buffer}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_cast({forward, ServiceId, Format, Metric}, State = #state{buffer = Buffer}) -> - NBuffer = endpoint_buffer:append({ServiceId, Format, Metric}, Buffer), +handle_cast({forward, ServiceId, Metric}, State = #state{buffer = Buffer}) -> + NBuffer = endpoint_buffer:append({ServiceId, Metric}, Buffer), {noreply, State#state{buffer = NBuffer}}. %% @private