From 8e491c0a54a850b28fc03b365ce5cdeac15219e8 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Wed, 4 Jun 2025 19:09:50 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=AF=B9kafka=E7=9A=84?= =?UTF-8?q?=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/endpoint/src/endpoint.app.src | 1 + apps/endpoint/src/endpoint.erl | 7 +- apps/endpoint/src/endpoint_client_sup.erl | 60 +++++ ...oint_http.erl => endpoint_http_client.erl} | 2 +- apps/endpoint/src/endpoint_id_generator.erl | 104 +++++++++ apps/endpoint/src/endpoint_kafka_client.erl | 208 ++++++++++++++++++ ...oint_mqtt.erl => endpoint_mqtt_client.erl} | 4 +- apps/endpoint/src/endpoint_sup.erl | 48 ++-- rebar.config | 1 + 9 files changed, 401 insertions(+), 34 deletions(-) create mode 100644 apps/endpoint/src/endpoint_client_sup.erl rename apps/endpoint/src/{endpoint_http.erl => endpoint_http_client.erl} (99%) create mode 100644 apps/endpoint/src/endpoint_id_generator.erl create mode 100644 apps/endpoint/src/endpoint_kafka_client.erl rename apps/endpoint/src/{endpoint_mqtt.erl => endpoint_mqtt_client.erl} (99%) diff --git a/apps/endpoint/src/endpoint.app.src b/apps/endpoint/src/endpoint.app.src index f2ced32..4c03dfc 100644 --- a/apps/endpoint/src/endpoint.app.src +++ b/apps/endpoint/src/endpoint.app.src @@ -12,6 +12,7 @@ emqtt, parse_trans, lager, + brod, mnesia, ssl, public_key, diff --git a/apps/endpoint/src/endpoint.erl b/apps/endpoint/src/endpoint.erl index b6697c5..5951c1d 100644 --- a/apps/endpoint/src/endpoint.erl +++ b/apps/endpoint/src/endpoint.erl @@ -21,10 +21,13 @@ -spec start_link(Endpoint :: #endpoint{}) -> {'ok', pid()} | 'ignore' | {'error', term()}. start_link(Endpoint = #endpoint{id = Id, config = #http_endpoint{}}) -> Name = get_name(Id), - endpoint_http:start_link(Name, Endpoint); + endpoint_http_client:start_link(Name, Endpoint); start_link(Endpoint = #endpoint{id = Id, config = #mqtt_endpoint{}}) -> Name = get_name(Id), - endpoint_mqtt:start_link(Name, Endpoint); + endpoint_mqtt_client:start_link(Name, Endpoint); +start_link(Endpoint = #endpoint{id = Id, config = #kafka_endpoint{}}) -> + Name = get_name(Id), + endpoint_kafka_client:start_link(Name, Endpoint); start_link(Endpoint = #endpoint{id = Id, config = #mysql_endpoint{}}) -> Name = get_name(Id), endpoint_mysql:start_link(Name, Endpoint). diff --git a/apps/endpoint/src/endpoint_client_sup.erl b/apps/endpoint/src/endpoint_client_sup.erl new file mode 100644 index 0000000..f7b6b3a --- /dev/null +++ b/apps/endpoint/src/endpoint_client_sup.erl @@ -0,0 +1,60 @@ +%%%------------------------------------------------------------------- +%% @doc endpoint top level supervisor. +%% @end +%%%------------------------------------------------------------------- + +-module(endpoint_client_sup). + +-behaviour(supervisor). +-include("endpoint.hrl"). + +-export([start_link/0]). +-export([ensured_endpoint_started/1, delete_endpoint/1]). + +-export([init/1]). + +-define(SERVER, ?MODULE). + +start_link() -> + supervisor:start_link({local, ?SERVER}, ?MODULE, []). + +%% sup_flags() = #{strategy => strategy(), % optional +%% intensity => non_neg_integer(), % optional +%% period => pos_integer()} % optional +%% child_spec() = #{id => child_id(), % mandatory +%% start => mfargs(), % mandatory +%% restart => restart(), % optional +%% shutdown => shutdown(), % optional +%% type => worker(), % optional +%% modules => modules()} % optional +init([]) -> + SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600}, + ChildSpecs = [], + {ok, {SupFlags, ChildSpecs}}. + +%% internal functions + +-spec ensured_endpoint_started(Endpoint :: #endpoint{}) -> {ok, Pid :: pid()} | {error, Reason :: any()}. +ensured_endpoint_started(Endpoint = #endpoint{}) -> + case supervisor:start_child(?MODULE, child_spec(Endpoint)) of + {ok, Pid} when is_pid(Pid) -> + {ok, Pid}; + {error, {'already_started', Pid}} when is_pid(Pid) -> + {ok, Pid}; + {error, Error} -> + {error, Error} + end. + +delete_endpoint(Id) when is_integer(Id) -> + Name = endpoint:get_name(Id), + supervisor:terminate_child(?MODULE, Name), + supervisor:delete_child(?MODULE, Name). + +child_spec(Endpoint = #endpoint{id = Id}) -> + Name = endpoint:get_name(Id), + #{id => Name, + start => {endpoint, start_link, [Name, Endpoint]}, + restart => permanent, + shutdown => 2000, + type => worker, + modules => ['endpoint']}. \ No newline at end of file diff --git a/apps/endpoint/src/endpoint_http.erl b/apps/endpoint/src/endpoint_http_client.erl similarity index 99% rename from apps/endpoint/src/endpoint_http.erl rename to apps/endpoint/src/endpoint_http_client.erl index 68581ec..9d3dffd 100644 --- a/apps/endpoint/src/endpoint_http.erl +++ b/apps/endpoint/src/endpoint_http_client.erl @@ -6,7 +6,7 @@ %%% @end %%% Created : 07. 5月 2024 11:17 %%%------------------------------------------------------------------- --module(endpoint_http). +-module(endpoint_http_client). -author("anlicheng"). -include("endpoint.hrl"). diff --git a/apps/endpoint/src/endpoint_id_generator.erl b/apps/endpoint/src/endpoint_id_generator.erl new file mode 100644 index 0000000..3396de0 --- /dev/null +++ b/apps/endpoint/src/endpoint_id_generator.erl @@ -0,0 +1,104 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2025, +%%% @doc +%%% +%%% @end +%%% Created : 04. 6月 2025 17:55 +%%%------------------------------------------------------------------- +-module(endpoint_id_generator). +-author("anlicheng"). + +-behaviour(gen_server). + +%% API +-export([start_link/0]). +-export([next_id/0]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, { + id = 1 +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +next_id() -> + gen_server:call(?MODULE, next_id). + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link() -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([]) -> + {ok, #state{}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(next_id, _From, State = #state{id = Id}) -> + {reply, Id, State#state{id = Id +1}}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast(_Request, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info(_Info, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== diff --git a/apps/endpoint/src/endpoint_kafka_client.erl b/apps/endpoint/src/endpoint_kafka_client.erl new file mode 100644 index 0000000..2d6021e --- /dev/null +++ b/apps/endpoint/src/endpoint_kafka_client.erl @@ -0,0 +1,208 @@ + +%%%------------------------------------------------------------------- +%%% @author aresei +%%% @copyright (C) 2023, +%%% @doc +%%% +%%% @end +%%% Created : 06. 7月 2023 12:02 +%%%------------------------------------------------------------------- +-module(endpoint_kafka_client). + +-include("endpoint.hrl"). +-behaviour(gen_server). + +%% API +-export([start_link/2]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +%% 消息重发间隔 +-define(RETRY_INTERVAL, 5000). + +-define(DISCONNECTED, disconnected). +-define(CONNECTED, connected). + +-record(state, { + endpoint :: #endpoint{}, + buffer :: endpoint_buffer:buffer(), + client_id :: atom(), + client_pid :: undefined | pid(), + %% 待确认的数据, #{PacketId :: integer() => Id :: integer()} + inflight = #{}, + + status = disconnected +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +%% @doc Creates a gen_statem process which calls Module:init/1 to +%% initialize. To ensure a synchronized start-up procedure, this +%% function does not return until Module:init/1 has returned. +start_link(Name, Endpoint = #endpoint{}) when is_atom(Name) -> + gen_server:start_link({local, Name}, ?MODULE, [Endpoint], []). + +%%%=================================================================== +%%% gen_statem callbacks +%%%=================================================================== + +%% @private +%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or +%% gen_statem:start_link/[3,4], this function is called by the new +%% process to initialize. +init([Endpoint = #endpoint{id = Id}]) -> + erlang:process_flag(trap_exit, true), + %% 创建转发器, 避免阻塞当前进程的创建,因此采用了延时初始化的机制 + erlang:start_timer(0, self(), create_postman), + %% 初始化存储 + Buffer = endpoint_buffer:new(Endpoint, 10), + %% brod的clientId + ClientId = list_to_atom("brod_client_" ++ integer_to_list(Id)), + {ok, #state{client_id = ClientId, endpoint = Endpoint, buffer = Buffer, status = ?DISCONNECTED}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(get_stat, _From, State = #state{buffer = Buffer}) -> + Stat = endpoint_buffer:stat(Buffer), + {reply, {ok, Stat}, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast({forward, LocationCode, Fields, Timestamp}, State = #state{buffer = Buffer}) -> + NBuffer = endpoint_buffer:append({LocationCode, Fields, Timestamp}, Buffer), + {noreply, State#state{buffer = NBuffer}}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +%% bootstrap_servers的格式为: [<<"localhost:9092">>, <<"localhost:9093">>] +handle_info({timeout, _, create_postman}, State = #state{buffer = Buffer, status = ?DISCONNECTED, client_id = ClientId, + endpoint = #endpoint{title = Title, config = #kafka_endpoint{bootstrap_servers = BootstrapServers0, topic = Topic, username = Username, password = Password}}}) -> + BootstrapServers = lists:flatmap(fun(S) -> + case binary:split(S, <<":">>, [global, trim]) of + [Host, Port] -> + [{binary_to_list(Host), binary_to_integer(Port)}]; + _ -> + [] + end + end, BootstrapServers0), + lager:debug("[kafka_client] endpoint: ~p, create postman, bootstap_servers: ~p", [Title, BootstrapServers]), + + ClientConfig = [{reconnect_cool_down_seconds, 10}], + case brod:start_client(BootstrapServers, ClientId, ClientConfig) of + ok -> + case brod:start_producer(ClientId, Topic, []) of + ok -> + ClientPid = whereis(ClientId), + erlang:monitor(process, ClientPid), + + lager:debug("[kafka_client] start producer success, topic: ~p", [Topic]), + NBuffer = endpoint_buffer:trigger_n(Buffer), + {noreply, State#state{client_pid = ClientPid, status = ?CONNECTED, buffer = NBuffer}}; + {error, Reason} -> + lager:debug("[kafka_client] start producer, get error: ~p", [Reason]), + erlang:start_timer(5000, self(), create_postman), + {noreply, State} + end; + {error, Reason} -> + lager:debug("[kafka_client] start client, get error: ~p", [Reason]), + erlang:start_timer(5000, self(), create_postman), + {noreply, State} + end; + +%% 离线时,忽略数据发送逻辑 +handle_info({next_data, _Id, _Metadata, _Metric}, State = #state{status = ?DISCONNECTED}) -> + {noreply, State}; +%% 发送数据到kafka服务器 +handle_info({next_data, Id, Metadata, Metric}, State = #state{client_id = ClientId, status = ?CONNECTED, buffer = Buffer, + endpoint = #endpoint{config = #kafka_endpoint{topic = Topic}}}) -> + + %% 使用service_id左右hash的key + Args = case maps:get(<<"service_id">>, Metadata, undefined) of + undefined -> + [ClientId, Topic, random, <<"">>, Metric]; + Key -> + [ClientId, Topic, undefined, Key, Metric] + end, + lager:debug("[kafka_client] will publish topic: ~p, message: ~p", [Topic, Metric]), + + case erlang:apply(brod, produce_sync, Args)of + ok -> + NBuffer = endpoint_buffer:ack(Id, Buffer), + {noreply, State#state{buffer = NBuffer}}; + {error, Reason} -> + lager:warning("[kafka_client] send message to topic: ~p, get error: ~p", [Topic, Reason]), + reconnect_ticker(), + {noreply, State} + end; + +%% brod_client 进程挂掉时,重新建立新的 +handle_info({'DOWN', _Ref, process, ClientPid, Reason}, State = #state{client_pid = ClientPid, endpoint = #endpoint{title = Title}}) -> + lager:warning("[kafka_client] endpoint: ~p, brod_client conn pid exit with reason: ~p", [Title, Reason]), + reconnect_ticker(), + {noreply, State#state{client_pid = undefined, status = ?DISCONNECTED}}; + +handle_info(Info, State = #state{status = Status}) -> + lager:warning("[iot_endpoint] unknown message: ~p, status: ~p", [Info, Status]), + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(Reason, #state{endpoint = #endpoint{title = Title}, buffer = Buffer}) -> + lager:debug("[iot_endpoint] endpoint: ~p, terminate with reason: ~p", [Title, Reason]), + endpoint_buffer:cleanup(Buffer), + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +-spec format_props(Props :: [{Key :: any(), Val :: any()}]) -> [{Key1 :: binary(), Val1 :: binary()}]. +format_props(Props) when is_list(Props) -> + format_props(Props, []). +format_props([], Acc) -> + Acc; +format_props([{Key, Val}|T], Acc) when is_binary(Key), is_integer(Val) -> + format_props(T, [{Key, integer_to_binary(Val)}|Acc]); +format_props([{Key, Val}|T], Acc) when is_binary(Key), is_float(Val) -> + NVal = list_to_binary(float_to_list(Val, [{decimals, 2}, compact])), + format_props(T, [{Key, NVal}|Acc]); +format_props([{Key, Val}|T], Acc) when is_binary(Key), is_binary(Val) -> + format_props(T, [{Key, Val}|Acc]); +format_props([_|T], Acc) -> + format_props(T, Acc). + +-spec reconnect_ticker() -> no_return(). +reconnect_ticker() -> + erlang:start_timer(?RETRY_INTERVAL, self(), create_postman). \ No newline at end of file diff --git a/apps/endpoint/src/endpoint_mqtt.erl b/apps/endpoint/src/endpoint_mqtt_client.erl similarity index 99% rename from apps/endpoint/src/endpoint_mqtt.erl rename to apps/endpoint/src/endpoint_mqtt_client.erl index 9ccf463..38794f6 100644 --- a/apps/endpoint/src/endpoint_mqtt.erl +++ b/apps/endpoint/src/endpoint_mqtt_client.erl @@ -7,7 +7,7 @@ %%% @end %%% Created : 06. 7月 2023 12:02 %%%------------------------------------------------------------------- --module(endpoint_mqtt). +-module(endpoint_mqtt_client). -include("endpoint.hrl"). -behaviour(gen_server). @@ -151,7 +151,7 @@ handle_info({next_data, Id, Metadata, Metric}, State = #state{status = ?CONNECTE {stop, Reason, State} end; -%% 3.x版本附加到payload里面 +%% 3.x, 4.x 版本附加到payload里面 handle_info({next_data, Id, Metadata, Metric}, State = #state{status = ?CONNECTED, conn_pid = ConnPid, buffer = Buffer, inflight = InFlight, endpoint = #endpoint{config = #mqtt_endpoint{topic = Topic, qos = Qos}}}) -> diff --git a/apps/endpoint/src/endpoint_sup.erl b/apps/endpoint/src/endpoint_sup.erl index 2bf517e..bb2478e 100644 --- a/apps/endpoint/src/endpoint_sup.erl +++ b/apps/endpoint/src/endpoint_sup.erl @@ -9,7 +9,6 @@ -include("endpoint.hrl"). -export([start_link/0]). --export([ensured_endpoint_started/1, delete_endpoint/1]). -export([init/1]). @@ -29,32 +28,23 @@ start_link() -> %% modules => modules()} % optional init([]) -> SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600}, - ChildSpecs = [], - {ok, {SupFlags, ChildSpecs}}. + ChildSpecs = [ + #{ + id => endpoint_id_generator, + start => {'endpoint_id_generator', start_link, []}, + restart => permanent, + shutdown => 2000, + type => worker, + modules => ['endpoint_id_generator'] + }, -%% internal functions - --spec ensured_endpoint_started(Endpoint :: #endpoint{}) -> {ok, Pid :: pid()} | {error, Reason :: any()}. -ensured_endpoint_started(Endpoint = #endpoint{}) -> - case supervisor:start_child(?MODULE, child_spec(Endpoint)) of - {ok, Pid} when is_pid(Pid) -> - {ok, Pid}; - {error, {'already_started', Pid}} when is_pid(Pid) -> - {ok, Pid}; - {error, Error} -> - {error, Error} - end. - -delete_endpoint(Id) when is_integer(Id) -> - Name = endpoint:get_name(Id), - supervisor:terminate_child(?MODULE, Name), - supervisor:delete_child(?MODULE, Name). - -child_spec(Endpoint = #endpoint{id = Id}) -> - Name = endpoint:get_name(Id), - #{id => Name, - start => {endpoint, start_link, [Name, Endpoint]}, - restart => permanent, - shutdown => 2000, - type => worker, - modules => ['endpoint']}. \ No newline at end of file + #{ + id => endpoint_client_sup, + start => {'endpoint_client_sup', start_link, []}, + restart => permanent, + shutdown => 2000, + type => supervisor, + modules => ['endpoint_client_sup'] + } + ], + {ok, {SupFlags, ChildSpecs}}. \ No newline at end of file diff --git a/rebar.config b/rebar.config index efcd8ce..c6c0b13 100644 --- a/rebar.config +++ b/rebar.config @@ -6,6 +6,7 @@ {jiffy, ".*", {git, "https://github.com/davisp/jiffy.git", {tag, "1.1.1"}}}, {mysql, ".*", {git, "https://github.com/mysql-otp/mysql-otp", {tag, "1.8.0"}}}, {emqtt, ".*", {git, "https://gitea.s5s8.com/anlicheng/emqtt.git", {branch, "main"}}}, + {brod, ".*", {git, "https://github.com/klarna/brod.git", {tag, "4.4.0"}}}, {parse_trans, ".*", {git, "https://github.com/uwiger/parse_trans", {tag, "3.0.0"}}}, {lager, ".*", {git,"https://github.com/erlang-lager/lager.git", {tag, "3.9.2"}}} ]}.