From d54e3852bb05baaeb41452e91272448c6ba60773 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Mon, 18 Aug 2025 21:06:30 +0800 Subject: [PATCH] fix endpoint --- apps/iot/src/database/endpoint_bo.erl | 2 +- apps/iot/src/endpoint/endpoint.erl | 6 +++- apps/iot/src/endpoint/endpoint_buffer.erl | 3 +- apps/iot/src/endpoint/endpoint_kafka.erl | 15 ++++----- apps/iot/src/endpoint/endpoint_sup.erl | 38 +++++++++++++++++++++- apps/iot/src/iot_ai_router.erl | 26 --------------- apps/iot/src/iot_config.erl | 39 ----------------------- apps/iot/src/iot_mqtt_subscriber.erl | 28 +++++++++++++++- apps/iot/src/iot_name_server.erl | 2 +- apps/iot/src/iot_sup.erl | 2 +- apps/iot/src/util/http_client.erl | 34 ++++++++++++++++++++ config/sys-dev.config | 4 +-- 12 files changed, 117 insertions(+), 82 deletions(-) delete mode 100644 apps/iot/src/iot_ai_router.erl delete mode 100644 apps/iot/src/iot_config.erl create mode 100644 apps/iot/src/util/http_client.erl diff --git a/apps/iot/src/database/endpoint_bo.erl b/apps/iot/src/database/endpoint_bo.erl index e05ef97..34d4b28 100644 --- a/apps/iot/src/database/endpoint_bo.erl +++ b/apps/iot/src/database/endpoint_bo.erl @@ -69,7 +69,7 @@ parse_config(<<"kafka">>, #{<<"username">> := Username, <<"password">> := Passwo false end end, BootstrapServers0), - Mechanism = case Mechanism of + Mechanism = case Mechanism0 of <<"sha_256">> -> scram_sha_256; <<"sha_512">> -> diff --git a/apps/iot/src/endpoint/endpoint.erl b/apps/iot/src/endpoint/endpoint.erl index d7b7c76..35fce43 100644 --- a/apps/iot/src/endpoint/endpoint.erl +++ b/apps/iot/src/endpoint/endpoint.erl @@ -28,7 +28,11 @@ start_link(Endpoint = #endpoint{id = Id, name = Name, config = #http_endpoint{}} start_link(Endpoint = #endpoint{id = Id, name = Name, config = #mqtt_endpoint{}}) -> LocalName = get_name(Id), AliasName = get_alias_name(Name), - endpoint_mqtt:start_link(LocalName, AliasName, Endpoint). + endpoint_mqtt:start_link(LocalName, AliasName, Endpoint); +start_link(Endpoint = #endpoint{id = Id, name = Name, config = #kafka_endpoint{}}) -> + LocalName = get_name(Id), + AliasName = get_alias_name(Name), + endpoint_kafka:start_link(LocalName, AliasName, Endpoint). -spec get_name(Id :: integer()) -> atom(). get_name(Id) when is_integer(Id) -> diff --git a/apps/iot/src/endpoint/endpoint_buffer.erl b/apps/iot/src/endpoint/endpoint_buffer.erl index 59d4400..0c32f41 100644 --- a/apps/iot/src/endpoint/endpoint_buffer.erl +++ b/apps/iot/src/endpoint/endpoint_buffer.erl @@ -45,7 +45,7 @@ new(Endpoint = #endpoint{id = Id}, WindowSize) when is_integer(WindowSize), WindowSize > 0 -> %% 初始化存储 EtsName = list_to_atom("endpoint_buffer_ets:" ++ integer_to_list(Id)), - Tid = ets:new(EtsName, [ordered_set, private]), + Tid = ets:new(EtsName, [ordered_set, private, {keypos, 2}]), %% 定义重发器 {ok, TimerPid} = endpoint_timer:start_link(?RETRY_INTERVAL), @@ -75,6 +75,7 @@ trigger_next(Buffer = #buffer{tid = Tid, cursor = Cursor, timer_pid = TimerPid, '$end_of_table' -> Buffer; NKey -> + lager:debug("nkey: ~p", [NKey]), [#north_data{id = Id, tuple = Tuple}] = ets:lookup(Tid, NKey), ReceiverPid = self(), ReceiverPid ! {next_data, Id, Tuple}, diff --git a/apps/iot/src/endpoint/endpoint_kafka.erl b/apps/iot/src/endpoint/endpoint_kafka.erl index bcb92ab..086b41d 100644 --- a/apps/iot/src/endpoint/endpoint_kafka.erl +++ b/apps/iot/src/endpoint/endpoint_kafka.erl @@ -97,12 +97,10 @@ handle_info({timeout, _, connect}, State = #state{buffer = Buffer, status = ?DIS ClientConfig = [ {reconnect_cool_down_seconds, 5}, - {sasl, [ - {mechanism, Mechanism}, - {username, Username}, - {password, Password} - ]}, - {socket_options, [{keepalive, true}]} + %{sasl, {Mechanism, Username, Password}}, + {socket_options, [ + {keepalive, true} + ]} ], case brod:start_link_client(BootstrapServers, ClientId, ClientConfig) of {ok, ClientPid} -> @@ -117,7 +115,7 @@ handle_info({timeout, _, connect}, State = #state{buffer = Buffer, status = ?DIS %% 离线时,忽略数据发送逻辑 handle_info({next_data, _Id, _Tuple}, State = #state{status = ?DISCONNECTED}) -> - {keep_state, State}; + {noreply, State}; %% 发送数据到mqtt服务器 handle_info({next_data, Id, {_ServiceId, Metric}}, State = #state{status = ?CONNECTED, client_pid = ClientPid, endpoint = #endpoint{config = #kafka_endpoint{topic = Topic}}}) -> @@ -127,7 +125,8 @@ handle_info({next_data, Id, {_ServiceId, Metric}}, State = #state{status = ?CONN lager:debug("[endpoint_kafka] ack partion: ~p, offset: ~p", [Partition, BaseOffset]), ReceiverPid ! {ack, Id} end, - ok = brod:produce_cb(ClientPid, Topic, random, <<>>, Metric, AckCb), + Res = brod:produce_cb(ClientPid, Topic, random, <<>>, Metric, AckCb), + lager:debug("[endpoint_kafka] produce_cb res: ~p", [Res]), {noreply, State}; diff --git a/apps/iot/src/endpoint/endpoint_sup.erl b/apps/iot/src/endpoint/endpoint_sup.erl index a2beb50..fce7e19 100644 --- a/apps/iot/src/endpoint/endpoint_sup.erl +++ b/apps/iot/src/endpoint/endpoint_sup.erl @@ -12,6 +12,7 @@ -export([ensured_endpoint_started/1, delete_endpoint/1]). -export([init/1]). +-export([start_kafka_test/0]). -define(SERVER, ?MODULE). @@ -42,6 +43,41 @@ init([]) -> %% internal functions +start_kafka_test() -> + Endpoint = #endpoint{ + id = 1, + %% 全局唯一,在路由规则中通过名称来指定 + name = <<"kafka_test">>, + %% 标题描述 + title = <<"kafka测试"/utf8>>, + %% 配置项, 格式: #{<<"protocol">> => <<"http|https|ws|kafka|mqtt">>, <<"args">> => #{}} + config = #kafka_endpoint{ + username = <<"admin">>, + password = <<"lz4rP5UavRTiGZEZK8G51mxHcM5iPC">>, + mechanism = scram_sha_256, + bootstrap_servers = [ + {"127.0.0.1", 19092} + ], + topic = <<"metric">> + }, + status = 0, + updated_at = 0, + created_at = 0 + }, + {ok, Pid} = ensured_endpoint_started(Endpoint), + ServiceId = <<"service_id_123">>, + Metric = <<"this is a test">>, + endpoint:forward(Pid, ServiceId, Metric), + endpoint:forward(Pid, ServiceId, Metric), + endpoint:forward(Pid, ServiceId, Metric), + endpoint:forward(Pid, ServiceId, Metric), + endpoint:forward(Pid, ServiceId, Metric), + endpoint:forward(Pid, ServiceId, Metric), + endpoint:forward(Pid, ServiceId, Metric), + endpoint:forward(Pid, ServiceId, Metric), + endpoint:forward(Pid, ServiceId, Metric), + endpoint:forward(Pid, ServiceId, Metric). + -spec ensured_endpoint_started(Endpoint :: #endpoint{}) -> {ok, Pid :: pid()} | {error, Reason :: any()}. ensured_endpoint_started(Endpoint = #endpoint{}) -> case supervisor:start_child(?MODULE, child_spec(Endpoint)) of @@ -62,7 +98,7 @@ delete_endpoint(Id) when is_integer(Id) -> child_spec(Endpoint = #endpoint{id = Id}) -> Name = endpoint:get_name(Id), #{id => Name, - start => {endpoint, start_link, [Name, Endpoint]}, + start => {endpoint, start_link, [Endpoint]}, restart => permanent, shutdown => 2000, type => worker, diff --git a/apps/iot/src/iot_ai_router.erl b/apps/iot/src/iot_ai_router.erl deleted file mode 100644 index 964dba3..0000000 --- a/apps/iot/src/iot_ai_router.erl +++ /dev/null @@ -1,26 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author aresei -%%% @copyright (C) 2023, -%%% @doc -%%% -%%% @end -%%% Created : 04. 7月 2023 11:30 -%%%------------------------------------------------------------------- --module(iot_ai_router). --author("aresei"). --include("iot.hrl"). - -%% API --export([route_uuid/3]). - --spec route_uuid(RouterUUID :: binary(), EventType :: integer(), Params :: map()) -> no_return(). -route_uuid(RouterUUID, EventType, Params) when is_binary(RouterUUID), is_integer(EventType), is_map(Params) -> - %% 查找终端设备对应的点位信息 - case redis_client:hget(RouterUUID, <<"location_code">>) of - {ok, undefined} -> - lager:debug("[iot_ai_router] the event_data hget location_code, uuid: ~p, not found", [RouterUUID]); - {ok, LocationCode} when is_binary(LocationCode) -> - iot_jinzhi_endpoint:forward(LocationCode, EventType, Params); - {error, Reason} -> - lager:debug("[iot_ai_router] the event_data hget location_code uuid: ~p, get error: ~p", [RouterUUID, Reason]) - end. \ No newline at end of file diff --git a/apps/iot/src/iot_config.erl b/apps/iot/src/iot_config.erl deleted file mode 100644 index bb7238f..0000000 --- a/apps/iot/src/iot_config.erl +++ /dev/null @@ -1,39 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author licheng5 -%%% @copyright (C) 2023, -%%% @doc -%%% -%%% @end -%%% Created : 17. 4月 2023 16:41 -%%%------------------------------------------------------------------- --module(iot_config). --author("licheng5"). - -%% API --export([emqt_opts/1]). - -emqt_opts(ClientSuffix) when is_binary(ClientSuffix) -> - %% 建立到emqx服务器的连接 - {ok, Props} = application:get_env(iot, emqx_server), - EMQXHost = proplists:get_value(host, Props), - EMQXPort = proplists:get_value(port, Props, 18080), - Username = proplists:get_value(username, Props), - Password = proplists:get_value(password, Props), - RetryInterval = proplists:get_value(retry_interval, Props, 5), - Keepalive = proplists:get_value(keepalive, Props, 86400), - - Node = atom_to_binary(node()), - ClientId = <<"mqtt-client-", Node/binary, "-", ClientSuffix/binary>>, - [ - {clientid, ClientId}, - {host, EMQXHost}, - {port, EMQXPort}, - {owner, self()}, - {tcp_opts, []}, - {username, Username}, - {password, Password}, - {keepalive, Keepalive}, - {auto_ack, true}, - {proto_ver, v5}, - {retry_interval, RetryInterval} - ]. diff --git a/apps/iot/src/iot_mqtt_subscriber.erl b/apps/iot/src/iot_mqtt_subscriber.erl index 6992481..797b6a2 100644 --- a/apps/iot/src/iot_mqtt_subscriber.erl +++ b/apps/iot/src/iot_mqtt_subscriber.erl @@ -51,7 +51,7 @@ start_link() -> {stop, Reason :: term()} | ignore). init([]) -> %% 建立到emqx服务器的连接 - Opts = iot_config:emqt_opts(<<"host-subscriber">>), + Opts = emqt_opts(<<"host-subscriber">>), lager:debug("[opts] is: ~p", [Opts]), case emqtt:start_link(Opts) of {ok, ConnPid} -> @@ -146,3 +146,29 @@ code_change(_OldVsn, State = #state{}, _Extra) -> %%%=================================================================== %%% Internal functions %%%=================================================================== + +emqt_opts(ClientSuffix) when is_binary(ClientSuffix) -> + %% 建立到emqx服务器的连接 + {ok, Props} = application:get_env(iot, emqx_server), + EMQXHost = proplists:get_value(host, Props), + EMQXPort = proplists:get_value(port, Props, 18080), + Username = proplists:get_value(username, Props), + Password = proplists:get_value(password, Props), + RetryInterval = proplists:get_value(retry_interval, Props, 5), + Keepalive = proplists:get_value(keepalive, Props, 86400), + + Node = atom_to_binary(node()), + ClientId = <<"mqtt-client-", Node/binary, "-", ClientSuffix/binary>>, + [ + {clientid, ClientId}, + {host, EMQXHost}, + {port, EMQXPort}, + {owner, self()}, + {tcp_opts, []}, + {username, Username}, + {password, Password}, + {keepalive, Keepalive}, + {auto_ack, true}, + {proto_ver, v5}, + {retry_interval, RetryInterval} + ]. diff --git a/apps/iot/src/iot_name_server.erl b/apps/iot/src/iot_name_server.erl index c98fc41..92e7a33 100644 --- a/apps/iot/src/iot_name_server.erl +++ b/apps/iot/src/iot_name_server.erl @@ -100,7 +100,7 @@ handle_cast(_Request, State = #state{}) -> {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). handle_info({'DOWN', MRef, process, Pid, Reason}, State = #state{refs = Refs, pid_names = PidNames}) -> - lager:debug("[iot_name_server] pid: ~p, down with reason: ~p", [Reason]), + % lager:debug("[iot_name_server] pid: ~p, down with reason: ~p", [Reason]), case lists:member(MRef, Refs) of true -> case maps:take(Pid, PidNames) of diff --git a/apps/iot/src/iot_sup.erl b/apps/iot/src/iot_sup.erl index 02a5395..9c3b9ff 100644 --- a/apps/iot/src/iot_sup.erl +++ b/apps/iot/src/iot_sup.erl @@ -25,7 +25,7 @@ start_link() -> %% type => worker(), % optional %% modules => modules()} % optional init([]) -> - SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600}, + SupFlags = #{strategy => one_for_one, intensity => 10, period => 36}, Specs = [ #{ diff --git a/apps/iot/src/util/http_client.erl b/apps/iot/src/util/http_client.erl new file mode 100644 index 0000000..fb064ab --- /dev/null +++ b/apps/iot/src/util/http_client.erl @@ -0,0 +1,34 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2025, +%%% @doc +%%% +%%% @end +%%% Created : 18. 8月 2025 18:40 +%%%------------------------------------------------------------------- +-module(http_client). +-author("anlicheng"). + +%% API +-export([post/3]). + +%% Headers = [ +%% {<<"content-type">>, <<"application/json">>} +%% ] +-spec post(Url :: string(), Headers :: list(), Body :: binary()) -> {ok, RespBody :: binary()} | {error, Reason :: any()}. +post(Url, Headers, Body) when is_list(Url), is_list(Headers), is_binary(Body) -> + case hackney:request(post, Url, Headers, Body, [{pool, false}]) of + {ok, 200, _, ClientRef} -> + {ok, RespBody} = hackney:body(ClientRef), + lager:debug("[iot_api] send body: ~p, get error is: ~p", [Body, RespBody]), + hackney:close(ClientRef), + {ok, RespBody}; + {ok, HttpCode, _, ClientRef} -> + {ok, RespBody} = hackney:body(ClientRef), + hackney:close(ClientRef), + lager:warning("[iot_api] send body: ~p, get error is: ~p", [Body, {HttpCode, RespBody}]), + {error, {HttpCode, RespBody}}; + {error, Reason} -> + lager:warning("[iot_api] send body: ~p, get error is: ~p", [Body, Reason]), + {error, Reason} + end. \ No newline at end of file diff --git a/config/sys-dev.config b/config/sys-dev.config index edf5fcf..6b22730 100644 --- a/config/sys-dev.config +++ b/config/sys-dev.config @@ -1,14 +1,14 @@ [ {iot, [ {http_server, [ - {port, 18080}, + {port, 18090}, {acceptors, 500}, {max_connections, 10240}, {backlog, 10240} ]}, {tcp_server, [ - {port, 18082}, + {port, 18092}, {acceptors, 500}, {max_connections, 10240}, {backlog, 10240}