fix endpoint

This commit is contained in:
anlicheng 2025-08-18 21:06:30 +08:00
parent 75efadf217
commit d54e3852bb
12 changed files with 117 additions and 82 deletions

View File

@ -69,7 +69,7 @@ parse_config(<<"kafka">>, #{<<"username">> := Username, <<"password">> := Passwo
false
end
end, BootstrapServers0),
Mechanism = case Mechanism of
Mechanism = case Mechanism0 of
<<"sha_256">> ->
scram_sha_256;
<<"sha_512">> ->

View File

@ -28,7 +28,11 @@ start_link(Endpoint = #endpoint{id = Id, name = Name, config = #http_endpoint{}}
start_link(Endpoint = #endpoint{id = Id, name = Name, config = #mqtt_endpoint{}}) ->
LocalName = get_name(Id),
AliasName = get_alias_name(Name),
endpoint_mqtt:start_link(LocalName, AliasName, Endpoint).
endpoint_mqtt:start_link(LocalName, AliasName, Endpoint);
start_link(Endpoint = #endpoint{id = Id, name = Name, config = #kafka_endpoint{}}) ->
LocalName = get_name(Id),
AliasName = get_alias_name(Name),
endpoint_kafka:start_link(LocalName, AliasName, Endpoint).
-spec get_name(Id :: integer()) -> atom().
get_name(Id) when is_integer(Id) ->

View File

@ -45,7 +45,7 @@
new(Endpoint = #endpoint{id = Id}, WindowSize) when is_integer(WindowSize), WindowSize > 0 ->
%%
EtsName = list_to_atom("endpoint_buffer_ets:" ++ integer_to_list(Id)),
Tid = ets:new(EtsName, [ordered_set, private]),
Tid = ets:new(EtsName, [ordered_set, private, {keypos, 2}]),
%%
{ok, TimerPid} = endpoint_timer:start_link(?RETRY_INTERVAL),
@ -75,6 +75,7 @@ trigger_next(Buffer = #buffer{tid = Tid, cursor = Cursor, timer_pid = TimerPid,
'$end_of_table' ->
Buffer;
NKey ->
lager:debug("nkey: ~p", [NKey]),
[#north_data{id = Id, tuple = Tuple}] = ets:lookup(Tid, NKey),
ReceiverPid = self(),
ReceiverPid ! {next_data, Id, Tuple},

View File

@ -97,12 +97,10 @@ handle_info({timeout, _, connect}, State = #state{buffer = Buffer, status = ?DIS
ClientConfig = [
{reconnect_cool_down_seconds, 5},
{sasl, [
{mechanism, Mechanism},
{username, Username},
{password, Password}
]},
{socket_options, [{keepalive, true}]}
%{sasl, {Mechanism, Username, Password}},
{socket_options, [
{keepalive, true}
]}
],
case brod:start_link_client(BootstrapServers, ClientId, ClientConfig) of
{ok, ClientPid} ->
@ -117,7 +115,7 @@ handle_info({timeout, _, connect}, State = #state{buffer = Buffer, status = ?DIS
%% 线
handle_info({next_data, _Id, _Tuple}, State = #state{status = ?DISCONNECTED}) ->
{keep_state, State};
{noreply, State};
%% mqtt服务器
handle_info({next_data, Id, {_ServiceId, Metric}}, State = #state{status = ?CONNECTED, client_pid = ClientPid,
endpoint = #endpoint{config = #kafka_endpoint{topic = Topic}}}) ->
@ -127,7 +125,8 @@ handle_info({next_data, Id, {_ServiceId, Metric}}, State = #state{status = ?CONN
lager:debug("[endpoint_kafka] ack partion: ~p, offset: ~p", [Partition, BaseOffset]),
ReceiverPid ! {ack, Id}
end,
ok = brod:produce_cb(ClientPid, Topic, random, <<>>, Metric, AckCb),
Res = brod:produce_cb(ClientPid, Topic, random, <<>>, Metric, AckCb),
lager:debug("[endpoint_kafka] produce_cb res: ~p", [Res]),
{noreply, State};

View File

@ -12,6 +12,7 @@
-export([ensured_endpoint_started/1, delete_endpoint/1]).
-export([init/1]).
-export([start_kafka_test/0]).
-define(SERVER, ?MODULE).
@ -42,6 +43,41 @@ init([]) ->
%% internal functions
start_kafka_test() ->
Endpoint = #endpoint{
id = 1,
%%
name = <<"kafka_test">>,
%%
title = <<"kafka测试"/utf8>>,
%% , : #{<<"protocol">> => <<"http|https|ws|kafka|mqtt">>, <<"args">> => #{}}
config = #kafka_endpoint{
username = <<"admin">>,
password = <<"lz4rP5UavRTiGZEZK8G51mxHcM5iPC">>,
mechanism = scram_sha_256,
bootstrap_servers = [
{"127.0.0.1", 19092}
],
topic = <<"metric">>
},
status = 0,
updated_at = 0,
created_at = 0
},
{ok, Pid} = ensured_endpoint_started(Endpoint),
ServiceId = <<"service_id_123">>,
Metric = <<"this is a test">>,
endpoint:forward(Pid, ServiceId, Metric),
endpoint:forward(Pid, ServiceId, Metric),
endpoint:forward(Pid, ServiceId, Metric),
endpoint:forward(Pid, ServiceId, Metric),
endpoint:forward(Pid, ServiceId, Metric),
endpoint:forward(Pid, ServiceId, Metric),
endpoint:forward(Pid, ServiceId, Metric),
endpoint:forward(Pid, ServiceId, Metric),
endpoint:forward(Pid, ServiceId, Metric),
endpoint:forward(Pid, ServiceId, Metric).
-spec ensured_endpoint_started(Endpoint :: #endpoint{}) -> {ok, Pid :: pid()} | {error, Reason :: any()}.
ensured_endpoint_started(Endpoint = #endpoint{}) ->
case supervisor:start_child(?MODULE, child_spec(Endpoint)) of
@ -62,7 +98,7 @@ delete_endpoint(Id) when is_integer(Id) ->
child_spec(Endpoint = #endpoint{id = Id}) ->
Name = endpoint:get_name(Id),
#{id => Name,
start => {endpoint, start_link, [Name, Endpoint]},
start => {endpoint, start_link, [Endpoint]},
restart => permanent,
shutdown => 2000,
type => worker,

View File

@ -1,26 +0,0 @@
%%%-------------------------------------------------------------------
%%% @author aresei
%%% @copyright (C) 2023, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 04. 7 2023 11:30
%%%-------------------------------------------------------------------
-module(iot_ai_router).
-author("aresei").
-include("iot.hrl").
%% API
-export([route_uuid/3]).
-spec route_uuid(RouterUUID :: binary(), EventType :: integer(), Params :: map()) -> no_return().
route_uuid(RouterUUID, EventType, Params) when is_binary(RouterUUID), is_integer(EventType), is_map(Params) ->
%%
case redis_client:hget(RouterUUID, <<"location_code">>) of
{ok, undefined} ->
lager:debug("[iot_ai_router] the event_data hget location_code, uuid: ~p, not found", [RouterUUID]);
{ok, LocationCode} when is_binary(LocationCode) ->
iot_jinzhi_endpoint:forward(LocationCode, EventType, Params);
{error, Reason} ->
lager:debug("[iot_ai_router] the event_data hget location_code uuid: ~p, get error: ~p", [RouterUUID, Reason])
end.

View File

@ -1,39 +0,0 @@
%%%-------------------------------------------------------------------
%%% @author licheng5
%%% @copyright (C) 2023, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 17. 4 2023 16:41
%%%-------------------------------------------------------------------
-module(iot_config).
-author("licheng5").
%% API
-export([emqt_opts/1]).
emqt_opts(ClientSuffix) when is_binary(ClientSuffix) ->
%% emqx服务器的连接
{ok, Props} = application:get_env(iot, emqx_server),
EMQXHost = proplists:get_value(host, Props),
EMQXPort = proplists:get_value(port, Props, 18080),
Username = proplists:get_value(username, Props),
Password = proplists:get_value(password, Props),
RetryInterval = proplists:get_value(retry_interval, Props, 5),
Keepalive = proplists:get_value(keepalive, Props, 86400),
Node = atom_to_binary(node()),
ClientId = <<"mqtt-client-", Node/binary, "-", ClientSuffix/binary>>,
[
{clientid, ClientId},
{host, EMQXHost},
{port, EMQXPort},
{owner, self()},
{tcp_opts, []},
{username, Username},
{password, Password},
{keepalive, Keepalive},
{auto_ack, true},
{proto_ver, v5},
{retry_interval, RetryInterval}
].

View File

@ -51,7 +51,7 @@ start_link() ->
{stop, Reason :: term()} | ignore).
init([]) ->
%% emqx服务器的连接
Opts = iot_config:emqt_opts(<<"host-subscriber">>),
Opts = emqt_opts(<<"host-subscriber">>),
lager:debug("[opts] is: ~p", [Opts]),
case emqtt:start_link(Opts) of
{ok, ConnPid} ->
@ -146,3 +146,29 @@ code_change(_OldVsn, State = #state{}, _Extra) ->
%%%===================================================================
%%% Internal functions
%%%===================================================================
emqt_opts(ClientSuffix) when is_binary(ClientSuffix) ->
%% emqx服务器的连接
{ok, Props} = application:get_env(iot, emqx_server),
EMQXHost = proplists:get_value(host, Props),
EMQXPort = proplists:get_value(port, Props, 18080),
Username = proplists:get_value(username, Props),
Password = proplists:get_value(password, Props),
RetryInterval = proplists:get_value(retry_interval, Props, 5),
Keepalive = proplists:get_value(keepalive, Props, 86400),
Node = atom_to_binary(node()),
ClientId = <<"mqtt-client-", Node/binary, "-", ClientSuffix/binary>>,
[
{clientid, ClientId},
{host, EMQXHost},
{port, EMQXPort},
{owner, self()},
{tcp_opts, []},
{username, Username},
{password, Password},
{keepalive, Keepalive},
{auto_ack, true},
{proto_ver, v5},
{retry_interval, RetryInterval}
].

View File

@ -100,7 +100,7 @@ handle_cast(_Request, State = #state{}) ->
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info({'DOWN', MRef, process, Pid, Reason}, State = #state{refs = Refs, pid_names = PidNames}) ->
lager:debug("[iot_name_server] pid: ~p, down with reason: ~p", [Reason]),
% lager:debug("[iot_name_server] pid: ~p, down with reason: ~p", [Reason]),
case lists:member(MRef, Refs) of
true ->
case maps:take(Pid, PidNames) of

View File

@ -25,7 +25,7 @@ start_link() ->
%% type => worker(), % optional
%% modules => modules()} % optional
init([]) ->
SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600},
SupFlags = #{strategy => one_for_one, intensity => 10, period => 36},
Specs = [
#{

View File

@ -0,0 +1,34 @@
%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2025, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 18. 8 2025 18:40
%%%-------------------------------------------------------------------
-module(http_client).
-author("anlicheng").
%% API
-export([post/3]).
%% Headers = [
%% {<<"content-type">>, <<"application/json">>}
%% ]
-spec post(Url :: string(), Headers :: list(), Body :: binary()) -> {ok, RespBody :: binary()} | {error, Reason :: any()}.
post(Url, Headers, Body) when is_list(Url), is_list(Headers), is_binary(Body) ->
case hackney:request(post, Url, Headers, Body, [{pool, false}]) of
{ok, 200, _, ClientRef} ->
{ok, RespBody} = hackney:body(ClientRef),
lager:debug("[iot_api] send body: ~p, get error is: ~p", [Body, RespBody]),
hackney:close(ClientRef),
{ok, RespBody};
{ok, HttpCode, _, ClientRef} ->
{ok, RespBody} = hackney:body(ClientRef),
hackney:close(ClientRef),
lager:warning("[iot_api] send body: ~p, get error is: ~p", [Body, {HttpCode, RespBody}]),
{error, {HttpCode, RespBody}};
{error, Reason} ->
lager:warning("[iot_api] send body: ~p, get error is: ~p", [Body, Reason]),
{error, Reason}
end.

View File

@ -1,14 +1,14 @@
[
{iot, [
{http_server, [
{port, 18080},
{port, 18090},
{acceptors, 500},
{max_connections, 10240},
{backlog, 10240}
]},
{tcp_server, [
{port, 18082},
{port, 18092},
{acceptors, 500},
{max_connections, 10240},
{backlog, 10240}