增加对kafka的支持

This commit is contained in:
anlicheng 2025-06-04 19:09:50 +08:00
parent 29d99f1da4
commit 8e491c0a54
9 changed files with 401 additions and 34 deletions

View File

@ -12,6 +12,7 @@
emqtt,
parse_trans,
lager,
brod,
mnesia,
ssl,
public_key,

View File

@ -21,10 +21,13 @@
-spec start_link(Endpoint :: #endpoint{}) -> {'ok', pid()} | 'ignore' | {'error', term()}.
start_link(Endpoint = #endpoint{id = Id, config = #http_endpoint{}}) ->
Name = get_name(Id),
endpoint_http:start_link(Name, Endpoint);
endpoint_http_client:start_link(Name, Endpoint);
start_link(Endpoint = #endpoint{id = Id, config = #mqtt_endpoint{}}) ->
Name = get_name(Id),
endpoint_mqtt:start_link(Name, Endpoint);
endpoint_mqtt_client:start_link(Name, Endpoint);
start_link(Endpoint = #endpoint{id = Id, config = #kafka_endpoint{}}) ->
Name = get_name(Id),
endpoint_kafka_client:start_link(Name, Endpoint);
start_link(Endpoint = #endpoint{id = Id, config = #mysql_endpoint{}}) ->
Name = get_name(Id),
endpoint_mysql:start_link(Name, Endpoint).

View File

@ -0,0 +1,60 @@
%%%-------------------------------------------------------------------
%% @doc endpoint top level supervisor.
%% @end
%%%-------------------------------------------------------------------
-module(endpoint_client_sup).
-behaviour(supervisor).
-include("endpoint.hrl").
-export([start_link/0]).
-export([ensured_endpoint_started/1, delete_endpoint/1]).
-export([init/1]).
-define(SERVER, ?MODULE).
start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
%% sup_flags() = #{strategy => strategy(), % optional
%% intensity => non_neg_integer(), % optional
%% period => pos_integer()} % optional
%% child_spec() = #{id => child_id(), % mandatory
%% start => mfargs(), % mandatory
%% restart => restart(), % optional
%% shutdown => shutdown(), % optional
%% type => worker(), % optional
%% modules => modules()} % optional
init([]) ->
SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600},
ChildSpecs = [],
{ok, {SupFlags, ChildSpecs}}.
%% internal functions
-spec ensured_endpoint_started(Endpoint :: #endpoint{}) -> {ok, Pid :: pid()} | {error, Reason :: any()}.
ensured_endpoint_started(Endpoint = #endpoint{}) ->
case supervisor:start_child(?MODULE, child_spec(Endpoint)) of
{ok, Pid} when is_pid(Pid) ->
{ok, Pid};
{error, {'already_started', Pid}} when is_pid(Pid) ->
{ok, Pid};
{error, Error} ->
{error, Error}
end.
delete_endpoint(Id) when is_integer(Id) ->
Name = endpoint:get_name(Id),
supervisor:terminate_child(?MODULE, Name),
supervisor:delete_child(?MODULE, Name).
child_spec(Endpoint = #endpoint{id = Id}) ->
Name = endpoint:get_name(Id),
#{id => Name,
start => {endpoint, start_link, [Name, Endpoint]},
restart => permanent,
shutdown => 2000,
type => worker,
modules => ['endpoint']}.

View File

@ -6,7 +6,7 @@
%%% @end
%%% Created : 07. 5 2024 11:17
%%%-------------------------------------------------------------------
-module(endpoint_http).
-module(endpoint_http_client).
-author("anlicheng").
-include("endpoint.hrl").

View File

@ -0,0 +1,104 @@
%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2025, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 04. 6 2025 17:55
%%%-------------------------------------------------------------------
-module(endpoint_id_generator).
-author("anlicheng").
-behaviour(gen_server).
%% API
-export([start_link/0]).
-export([next_id/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-define(SERVER, ?MODULE).
-record(state, {
id = 1
}).
%%%===================================================================
%%% API
%%%===================================================================
next_id() ->
gen_server:call(?MODULE, next_id).
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link() ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% @private
%% @doc Initializes the server
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([]) ->
{ok, #state{}}.
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call(next_id, _From, State = #state{id = Id}) ->
{reply, Id, State#state{id = Id +1}}.
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast(_Request, State = #state{}) ->
{noreply, State}.
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info(_Info, State = #state{}) ->
{noreply, State}.
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(_Reason, _State = #state{}) ->
ok.
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================

View File

@ -0,0 +1,208 @@
%%%-------------------------------------------------------------------
%%% @author aresei
%%% @copyright (C) 2023, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 06. 7 2023 12:02
%%%-------------------------------------------------------------------
-module(endpoint_kafka_client).
-include("endpoint.hrl").
-behaviour(gen_server).
%% API
-export([start_link/2]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
%%
-define(RETRY_INTERVAL, 5000).
-define(DISCONNECTED, disconnected).
-define(CONNECTED, connected).
-record(state, {
endpoint :: #endpoint{},
buffer :: endpoint_buffer:buffer(),
client_id :: atom(),
client_pid :: undefined | pid(),
%% , #{PacketId :: integer() => Id :: integer()}
inflight = #{},
status = disconnected
}).
%%%===================================================================
%%% API
%%%===================================================================
%% @doc Creates a gen_statem process which calls Module:init/1 to
%% initialize. To ensure a synchronized start-up procedure, this
%% function does not return until Module:init/1 has returned.
start_link(Name, Endpoint = #endpoint{}) when is_atom(Name) ->
gen_server:start_link({local, Name}, ?MODULE, [Endpoint], []).
%%%===================================================================
%%% gen_statem callbacks
%%%===================================================================
%% @private
%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or
%% gen_statem:start_link/[3,4], this function is called by the new
%% process to initialize.
init([Endpoint = #endpoint{id = Id}]) ->
erlang:process_flag(trap_exit, true),
%% ,
erlang:start_timer(0, self(), create_postman),
%%
Buffer = endpoint_buffer:new(Endpoint, 10),
%% brod的clientId
ClientId = list_to_atom("brod_client_" ++ integer_to_list(Id)),
{ok, #state{client_id = ClientId, endpoint = Endpoint, buffer = Buffer, status = ?DISCONNECTED}}.
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call(get_stat, _From, State = #state{buffer = Buffer}) ->
Stat = endpoint_buffer:stat(Buffer),
{reply, {ok, Stat}, State}.
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast({forward, LocationCode, Fields, Timestamp}, State = #state{buffer = Buffer}) ->
NBuffer = endpoint_buffer:append({LocationCode, Fields, Timestamp}, Buffer),
{noreply, State#state{buffer = NBuffer}}.
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
%% bootstrap_servers的格式为: [<<"localhost:9092">>, <<"localhost:9093">>]
handle_info({timeout, _, create_postman}, State = #state{buffer = Buffer, status = ?DISCONNECTED, client_id = ClientId,
endpoint = #endpoint{title = Title, config = #kafka_endpoint{bootstrap_servers = BootstrapServers0, topic = Topic, username = Username, password = Password}}}) ->
BootstrapServers = lists:flatmap(fun(S) ->
case binary:split(S, <<":">>, [global, trim]) of
[Host, Port] ->
[{binary_to_list(Host), binary_to_integer(Port)}];
_ ->
[]
end
end, BootstrapServers0),
lager:debug("[kafka_client] endpoint: ~p, create postman, bootstap_servers: ~p", [Title, BootstrapServers]),
ClientConfig = [{reconnect_cool_down_seconds, 10}],
case brod:start_client(BootstrapServers, ClientId, ClientConfig) of
ok ->
case brod:start_producer(ClientId, Topic, []) of
ok ->
ClientPid = whereis(ClientId),
erlang:monitor(process, ClientPid),
lager:debug("[kafka_client] start producer success, topic: ~p", [Topic]),
NBuffer = endpoint_buffer:trigger_n(Buffer),
{noreply, State#state{client_pid = ClientPid, status = ?CONNECTED, buffer = NBuffer}};
{error, Reason} ->
lager:debug("[kafka_client] start producer, get error: ~p", [Reason]),
erlang:start_timer(5000, self(), create_postman),
{noreply, State}
end;
{error, Reason} ->
lager:debug("[kafka_client] start client, get error: ~p", [Reason]),
erlang:start_timer(5000, self(), create_postman),
{noreply, State}
end;
%% 线
handle_info({next_data, _Id, _Metadata, _Metric}, State = #state{status = ?DISCONNECTED}) ->
{noreply, State};
%% kafka服务器
handle_info({next_data, Id, Metadata, Metric}, State = #state{client_id = ClientId, status = ?CONNECTED, buffer = Buffer,
endpoint = #endpoint{config = #kafka_endpoint{topic = Topic}}}) ->
%% 使service_id左右hash的key
Args = case maps:get(<<"service_id">>, Metadata, undefined) of
undefined ->
[ClientId, Topic, random, <<"">>, Metric];
Key ->
[ClientId, Topic, undefined, Key, Metric]
end,
lager:debug("[kafka_client] will publish topic: ~p, message: ~p", [Topic, Metric]),
case erlang:apply(brod, produce_sync, Args)of
ok ->
NBuffer = endpoint_buffer:ack(Id, Buffer),
{noreply, State#state{buffer = NBuffer}};
{error, Reason} ->
lager:warning("[kafka_client] send message to topic: ~p, get error: ~p", [Topic, Reason]),
reconnect_ticker(),
{noreply, State}
end;
%% brod_client
handle_info({'DOWN', _Ref, process, ClientPid, Reason}, State = #state{client_pid = ClientPid, endpoint = #endpoint{title = Title}}) ->
lager:warning("[kafka_client] endpoint: ~p, brod_client conn pid exit with reason: ~p", [Title, Reason]),
reconnect_ticker(),
{noreply, State#state{client_pid = undefined, status = ?DISCONNECTED}};
handle_info(Info, State = #state{status = Status}) ->
lager:warning("[iot_endpoint] unknown message: ~p, status: ~p", [Info, Status]),
{noreply, State}.
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(Reason, #state{endpoint = #endpoint{title = Title}, buffer = Buffer}) ->
lager:debug("[iot_endpoint] endpoint: ~p, terminate with reason: ~p", [Title, Reason]),
endpoint_buffer:cleanup(Buffer),
ok.
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
-spec format_props(Props :: [{Key :: any(), Val :: any()}]) -> [{Key1 :: binary(), Val1 :: binary()}].
format_props(Props) when is_list(Props) ->
format_props(Props, []).
format_props([], Acc) ->
Acc;
format_props([{Key, Val}|T], Acc) when is_binary(Key), is_integer(Val) ->
format_props(T, [{Key, integer_to_binary(Val)}|Acc]);
format_props([{Key, Val}|T], Acc) when is_binary(Key), is_float(Val) ->
NVal = list_to_binary(float_to_list(Val, [{decimals, 2}, compact])),
format_props(T, [{Key, NVal}|Acc]);
format_props([{Key, Val}|T], Acc) when is_binary(Key), is_binary(Val) ->
format_props(T, [{Key, Val}|Acc]);
format_props([_|T], Acc) ->
format_props(T, Acc).
-spec reconnect_ticker() -> no_return().
reconnect_ticker() ->
erlang:start_timer(?RETRY_INTERVAL, self(), create_postman).

View File

@ -7,7 +7,7 @@
%%% @end
%%% Created : 06. 7 2023 12:02
%%%-------------------------------------------------------------------
-module(endpoint_mqtt).
-module(endpoint_mqtt_client).
-include("endpoint.hrl").
-behaviour(gen_server).
@ -151,7 +151,7 @@ handle_info({next_data, Id, Metadata, Metric}, State = #state{status = ?CONNECTE
{stop, Reason, State}
end;
%% 3.x版本附加到payload里面
%% 3.x, 4.x payload里面
handle_info({next_data, Id, Metadata, Metric}, State = #state{status = ?CONNECTED, conn_pid = ConnPid, buffer = Buffer, inflight = InFlight,
endpoint = #endpoint{config = #mqtt_endpoint{topic = Topic, qos = Qos}}}) ->

View File

@ -9,7 +9,6 @@
-include("endpoint.hrl").
-export([start_link/0]).
-export([ensured_endpoint_started/1, delete_endpoint/1]).
-export([init/1]).
@ -29,32 +28,23 @@ start_link() ->
%% modules => modules()} % optional
init([]) ->
SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600},
ChildSpecs = [],
{ok, {SupFlags, ChildSpecs}}.
%% internal functions
-spec ensured_endpoint_started(Endpoint :: #endpoint{}) -> {ok, Pid :: pid()} | {error, Reason :: any()}.
ensured_endpoint_started(Endpoint = #endpoint{}) ->
case supervisor:start_child(?MODULE, child_spec(Endpoint)) of
{ok, Pid} when is_pid(Pid) ->
{ok, Pid};
{error, {'already_started', Pid}} when is_pid(Pid) ->
{ok, Pid};
{error, Error} ->
{error, Error}
end.
delete_endpoint(Id) when is_integer(Id) ->
Name = endpoint:get_name(Id),
supervisor:terminate_child(?MODULE, Name),
supervisor:delete_child(?MODULE, Name).
child_spec(Endpoint = #endpoint{id = Id}) ->
Name = endpoint:get_name(Id),
#{id => Name,
start => {endpoint, start_link, [Name, Endpoint]},
ChildSpecs = [
#{
id => endpoint_id_generator,
start => {'endpoint_id_generator', start_link, []},
restart => permanent,
shutdown => 2000,
type => worker,
modules => ['endpoint']}.
modules => ['endpoint_id_generator']
},
#{
id => endpoint_client_sup,
start => {'endpoint_client_sup', start_link, []},
restart => permanent,
shutdown => 2000,
type => supervisor,
modules => ['endpoint_client_sup']
}
],
{ok, {SupFlags, ChildSpecs}}.

View File

@ -6,6 +6,7 @@
{jiffy, ".*", {git, "https://github.com/davisp/jiffy.git", {tag, "1.1.1"}}},
{mysql, ".*", {git, "https://github.com/mysql-otp/mysql-otp", {tag, "1.8.0"}}},
{emqtt, ".*", {git, "https://gitea.s5s8.com/anlicheng/emqtt.git", {branch, "main"}}},
{brod, ".*", {git, "https://github.com/klarna/brod.git", {tag, "4.4.0"}}},
{parse_trans, ".*", {git, "https://github.com/uwiger/parse_trans", {tag, "3.0.0"}}},
{lager, ".*", {git,"https://github.com/erlang-lager/lager.git", {tag, "3.9.2"}}}
]}.